In [155]:
import pandas as pd
import numpy as np
import os
from datetime import datetime,date,timedelta,timezone
import json

import matplotlib.pyplot as plt
import matplotlib.dates as mdates
import seaborn as sns

from google.cloud import bigquery
from google.oauth2 import service_account
from google.cloud.exceptions import NotFound
from google.api_core.exceptions import BadRequest


# Init parameter

In [156]:
dictCollectPerf={}

In [157]:
model_id='qqq-ema1-30t5-ds0115t0523'  
model_id='spy-ema1-60t10-ds0115t0523'
model_id="spy-signal-60t10-ds0115t0523"

no_days=5 # work day
date_col='date'



# Create Start to End Date By Getting Last Date of Week

In [158]:
# startX  and endX are  prediction date that both must be contained fin_movement_forecast
# startX is the starting day to use backward/the past 60 data to make prediction the next 10 day
# # use 60 past days since startX  to predict 2026-06-02 To 2026-06-11'

log_date=datetime(2023,8,27)

# Date to run is Sunday Or Saturday (Weekend)
if log_date.weekday()!=6:
  raise Exception("Allow Collection forcasting result only Sunday")     

log_date=datetime(2023,8,27)
log_timestamp=datetime(2023,8,27,1,0,0)
# log_timestampe=datetime.now(timezone.utc)

# 2023-08-25 00:00:00  get last prediction of the week from fin_movement_forecast
endX='2023-08-25' 
#endX='2023-06-09' # the first week to predict data    '2023-05-31 - 2023-06-02' 
# get  prev prediction  from  get last prediction 
startX=datetime.strptime(endX,'%Y-%m-%d')+timedelta(days=1)+timedelta(days=-no_days)
startX=startX.strftime('%Y-%m-%d')

print(f"Date to collect data on {log_date.strftime('%A %d-%m-%Y')}  at {log_timestamp}")
print(f"Collection data from {startX} - {endX}")


Date to collect data on Sunday 27-08-2023  at 2023-08-27 01:00:00
Collection data from 2023-08-21 - 2023-08-25


# BigQuery Setting

In [159]:
projectId='pongthorn'
dataset_id='FinAssetForecast'

table_data_id=f"{projectId}.{dataset_id}.fin_data"
table_id = f"{projectId}.{dataset_id}.fin_movement_forecast"
table_model_id= f"{projectId}.{dataset_id}.model_ts_metadata"

table_perf_id= f"{projectId}.{dataset_id}.model_forecast_performance"

print(table_id)
print(table_data_id)
print(table_model_id)
print(table_perf_id)

client = bigquery.Client(project=projectId )

def load_data_bq(sql:str):
 query_result=client.query(sql)
 df=query_result.to_dataframe()
 return df

pongthorn.FinAssetForecast.fin_movement_forecast
pongthorn.FinAssetForecast.fin_data
pongthorn.FinAssetForecast.model_ts_metadata
pongthorn.FinAssetForecast.model_forecast_performance


# Start Loop

# Get Model Meta

In [160]:
def get_model_metadata(model_id):
    sqlModelMt=f"""
    SELECT * FROM `{table_model_id}`  where model_id='{model_id}'
    """
    print(sqlModelMt)
    dfModelMeta=load_data_bq(sqlModelMt)
    return  dfModelMeta

dfModelMeta=get_model_metadata(model_id)

if dfModelMeta.empty==False:
    modelMeta=dfModelMeta.iloc[0,:]
    print(modelMeta)
    asset_name=modelMeta['asset']
    prediction=modelMeta['prediction']
else: 
    raise Exception(f"Not found model id  {model_id}")


    SELECT * FROM `pongthorn.FinAssetForecast.model_ts_metadata`  where model_id='spy-signal-60t10-ds0115t0523'
    
model_id                                       spy-signal-60t10-ds0115t0523
asset                                                                   SPY
prediction                                                           SIGNAL
input_sequence_length                                                    60
output_sequence_length                                                   10
gs_model_path              gs://demo-ts-forecast-pongthorn/model_spy_signal
local_model_path                                     model/model_spy_signal
model_file                       SIGNAL_60To10_SPY_E150S20-Y2015-2023_ma.h5
scaler_file                  scaler_SIGNAL_60To10_SPY_E150S20-Y2015-2023.gz
scaler_pred_file          scaler_pred_SIGNAL_60To10_SPY_E150S20-Y2015-20...
Name: 0, dtype: object


# Filter only trading day on given period and Check data in tables

In [161]:
from pandas.tseries.holiday import USFederalHolidayCalendar
from pandas.tseries.offsets import CustomBusinessDay
us_bd = CustomBusinessDay(calendar=USFederalHolidayCalendar())
dateRange=pd.date_range(start=startX,end=endX,freq=us_bd)

print(dateRange)

# check all of prediction data are in forecase table
# check  all of prediction data are not in for performance table

DatetimeIndex(['2023-08-21', '2023-08-22', '2023-08-23', '2023-08-24',
               '2023-08-25'],
              dtype='datetime64[ns]', freq='C')


# Retrive forecasting result data to Dictionary

In [162]:
def get_forecasting_result_data(request):

    if   request is not None:  
        start_date=request["start_date"]
        prediction_name=request["prediction_name"]
        asset_name=request["asset_name"]
        model_id=request["model_id"]
    else:
        raise Exception("No request parameters such as start_date,prediction_name,asset_name")

    print("1.How far back in time does model want to apply as input to make prediction")

    sqlInput=f"""
    select t.pred_timestamp,t.asset_name,t.prediction_name,
    t_feature.input_date as {date_col},t_feature.input_feature as {prediction_name}
    from  `{table_id}` t
    cross join unnest(t.feature_for_prediction) t_feature
    where t.prediction_date='{start_date}' and t.model_id='{model_id}'
    order by  t.pred_timestamp,t_feature.input_date
    """
    print(sqlInput)
    dfInput=load_data_bq(sqlInput)
    dfInput=dfInput.drop_duplicates(subset=[date_col,'asset_name','prediction_name'],keep='last',)
    dfInput[date_col]=pd.to_datetime(dfInput[date_col],format='%Y-%m-%d')
    dfInput.set_index(date_col,inplace=True)

    input_sequence_length=len(dfInput)
    print(f"input_sequence_length={input_sequence_length}")
    

    print(dfInput.info())
    print(dfInput[['asset_name','prediction_name' ,prediction_name]])
    print("================================================================================================")

    print("2.How far in advance does model want to  make prediction")


    sqlOutput=f"""
    select t.pred_timestamp,t.asset_name,t.prediction_name,
    t_pred.output_date as {date_col},t_pred.output_value as {prediction_name}
    from  `pongthorn.FinAssetForecast.fin_movement_forecast` t
    cross join unnest(t.prediction_result) t_pred
    where t.prediction_date='{start_date}'  and t.model_id='{model_id}'
    order by  t.pred_timestamp,t_pred.output_date
    """
    print(sqlOutput)
    dfOutput=load_data_bq(sqlOutput)
    dfOutput=dfOutput.drop_duplicates(subset=[date_col,'asset_name','prediction_name'],keep='last',)
    dfOutput[date_col]=pd.to_datetime(dfOutput[date_col],format='%Y-%m-%d')
    dfOutput.set_index(date_col,inplace=True)

    output_sequence_length=len(dfOutput)
    print(f"output_sequence_length={output_sequence_length}")
    

    print(dfOutput.info())
    print(dfOutput[['asset_name','prediction_name' ,prediction_name]])
    print("================================================================================================")
    print("3.Get Real Data  to compare to prediction")

    #get actual data since the fist day of input and the last day of output(if covered)
    startFinData=dfInput.index.min().strftime('%Y-%m-%d')
    endFindData=dfOutput.index.max().strftime('%Y-%m-%d')

    sqlData=f"""
    select Date as {date_col},{prediction_name}, ImportDateTime, from `{table_data_id}` 
    where (Date>='{startFinData}' and Date<='{endFindData}') and Symbol='{asset_name}'
    order by ImportDateTime,Date
    """
    print(sqlData)

    dfRealData=load_data_bq(sqlData)
    dfRealData=dfRealData.drop_duplicates(subset=[date_col],keep='last',)
    dfRealData[date_col]=pd.to_datetime(dfRealData[date_col],format='%Y-%m-%d')
    dfRealData.set_index(date_col,inplace=True)
    
    print(dfRealData.info())
    print(dfRealData[[prediction_name]])
    print("================================================================================================")

    return {'actual_price':dfRealData,'input':dfInput,'output':dfOutput }



In [163]:
listDate =  [  d.strftime("%Y-%m-%d")  for d in dateRange.tolist()]
dictData={}
for d in listDate:
    print(f"==================================={d}===================================")
    request={'start_date':d,'prediction_name':prediction,'asset_name':asset_name,'model_id':model_id}
    data=get_forecasting_result_data(request)
    dictData[d]=data  
    print(f"========================================================================")

1.How far back in time does model want to apply as input to make prediction

    select t.pred_timestamp,t.asset_name,t.prediction_name,
    t_feature.input_date as date,t_feature.input_feature as SIGNAL
    from  `pongthorn.FinAssetForecast.fin_movement_forecast` t
    cross join unnest(t.feature_for_prediction) t_feature
    where t.prediction_date='2023-08-21' and t.model_id='spy-signal-60t10-ds0115t0523'
    order by  t.pred_timestamp,t_feature.input_date
    
input_sequence_length=60
<class 'pandas.core.frame.DataFrame'>
DatetimeIndex: 60 entries, 2023-05-25 to 2023-08-21
Data columns (total 4 columns):
 #   Column           Non-Null Count  Dtype              
---  ------           --------------  -----              
 0   pred_timestamp   60 non-null     datetime64[ns, UTC]
 1   asset_name       60 non-null     object             
 2   prediction_name  60 non-null     object             
 3   SIGNAL           60 non-null     float64            
dtypes: datetime64[ns, UTC](1), floa

# Create Predictive and Actual Value dataframe

In [164]:
dfAllForecastResult=pd.DataFrame(columns=['date','pred_value','actual_value','prediction_date'])
for date,data in dictData.items():    
  
 dfPred=data['output'][[prediction]]
 dfPred.columns=[f'pred_value']

 dfX=data['actual_price'][[prediction]]
 dfX.columns=[f'actual_value']

 # print(dfPred)
 # print(dfX)
 dfCompare=pd.merge(left=dfPred,right=dfX,how='inner',right_index=True,left_index=True)
 dfCompare.reset_index(inplace=True)   
 dfCompare['prediction_date']=date      
 print(f"============={date}=============")    
 print(dfCompare) 

 dfAllForecastResult= pd.concat([dfAllForecastResult,dfCompare],ignore_index=True)
print("========================All values dataframe========================")
print(dfAllForecastResult.info())
dfAllForecastResult

        date  pred_value  actual_value prediction_date
0 2023-08-22   -0.467377       -0.7765      2023-08-21
1 2023-08-23   -0.663445       -0.9826      2023-08-21
2 2023-08-24   -0.848760       -1.1870      2023-08-21
3 2023-08-25   -0.956304       -1.5368      2023-08-21
4 2023-08-28   -0.940718       -2.0933      2023-08-21
5 2023-08-29   -0.909320       -2.1338      2023-08-21
6 2023-08-30   -0.874731       -1.9440      2023-08-21
        date  pred_value  actual_value prediction_date
0 2023-08-23   -0.584690       -0.9826      2023-08-22
1 2023-08-24   -0.689455       -1.1870      2023-08-22
2 2023-08-25   -0.784489       -1.5368      2023-08-22
3 2023-08-28   -0.797250       -2.0933      2023-08-22
4 2023-08-29   -0.700850       -2.1338      2023-08-22
5 2023-08-30   -0.581056       -1.9440      2023-08-22
        date  pred_value  actual_value prediction_date
0 2023-08-24   -1.272937       -1.1870      2023-08-23
1 2023-08-25   -1.597689       -1.5368      2023-08-23
2 2023-08-

Unnamed: 0,date,pred_value,actual_value,prediction_date
0,2023-08-22,-0.467377,-0.7765,2023-08-21
1,2023-08-23,-0.663445,-0.9826,2023-08-21
2,2023-08-24,-0.84876,-1.187,2023-08-21
3,2023-08-25,-0.956304,-1.5368,2023-08-21
4,2023-08-28,-0.940718,-2.0933,2023-08-21
5,2023-08-29,-0.90932,-2.1338,2023-08-21
6,2023-08-30,-0.874731,-1.944,2023-08-21
7,2023-08-23,-0.58469,-0.9826,2023-08-22
8,2023-08-24,-0.689455,-1.187,2023-08-22
9,2023-08-25,-0.784489,-1.5368,2023-08-22


# Calculate Metric

In [165]:
metric_name='mae' # per schedule job
metric_value=1

# Create Collection Performance Info Dataframe and Store 


In [166]:
df=pd.DataFrame(data=[ [log_date,model_id,metric_name,metric_value,log_timestamp] ],
                columns=["collection_date","model_id","metric_name","metric_value","collection_timestamp"])
print(df.info())
dictCollectPerf[model_id]=(df,dfAllForecastResult)

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 1 entries, 0 to 0
Data columns (total 5 columns):
 #   Column                Non-Null Count  Dtype         
---  ------                --------------  -----         
 0   collection_date       1 non-null      datetime64[ns]
 1   model_id              1 non-null      object        
 2   metric_name           1 non-null      object        
 3   metric_value          1 non-null      int64         
 4   collection_timestamp  1 non-null      datetime64[ns]
dtypes: datetime64[ns](2), int64(1), object(2)
memory usage: 168.0+ bytes
None


# End Loop

# Create Json Data 

In [167]:
jsonDataList=[]
for model_id,dataTuple in  dictCollectPerf.items():
    print(model_id)
    
    masterDF=dataTuple[0]
    masterDF["collection_date"]=masterDF["collection_date"].dt.strftime('%Y-%m-%d')
    masterDF["collection_timestamp"]=masterDF["collection_timestamp"].dt.strftime('%Y-%m-%d %H:%M:%S')
    master_perf = json.loads(masterDF.to_json(orient = 'records'))[0] # 1 main dataframe has 1 records
    
    detailDF=dataTuple[1]  
    # print(detailDF.info())
    
    #detailDF["prediction_date"] is date string in the first place
    detailDF["date"]=detailDF["date"].dt.strftime('%Y-%m-%d')
    
    detail_perf= json.loads(detailDF.to_json(orient = 'records'))
    master_perf["pred_actual_data"]=detail_perf
    
    jsonDataList.append(master_perf)
    
with open("fin_forecast_performance.json", "w") as outfile:
    json.dump( jsonDataList, outfile)

spy-signal-60t10-ds0115t0523


# Ingest Data to BigQuery

In [168]:
try:
    table=client.get_table(table_perf_id)
    print("Table {} already exists.".format(table_id))
    print(table.schema)
except Exception as ex :
    print(str(ex))
    
job_config = bigquery.LoadJobConfig()

job_config.source_format = bigquery.SourceFormat.NEWLINE_DELIMITED_JSON
job_config.write_disposition = bigquery.WriteDisposition.WRITE_APPEND 
# job_config.write_disposition = bigquery.WriteDisposition.WRITE_TRUNCATE
job = client.load_table_from_json(jsonDataList,table_perf_id, job_config = job_config)
if job.errors is not None:
    print(job.error_result)
    print(job.errors)
else:
    print(f"Import to bigquery successfully  {len(jsonDataList)} records")
    
#job_config.schema
# truncate table`pongthorn.FinAssetForecast.model_forecast_performance` 

Table pongthorn.FinAssetForecast.fin_movement_forecast already exists.
[SchemaField('collection_timestamp', 'TIMESTAMP', 'NULLABLE', None, None, (), None), SchemaField('model_id', 'STRING', 'NULLABLE', None, None, (), None), SchemaField('metric_value', 'INTEGER', 'NULLABLE', None, None, (), None), SchemaField('pred_actual_data', 'RECORD', 'REPEATED', None, None, (SchemaField('prediction_date', 'DATE', 'NULLABLE', None, None, (), None), SchemaField('actual_value', 'FLOAT', 'NULLABLE', None, None, (), None), SchemaField('pred_value', 'FLOAT', 'NULLABLE', None, None, (), None), SchemaField('date', 'DATE', 'NULLABLE', None, None, (), None)), None), SchemaField('metric_name', 'STRING', 'NULLABLE', None, None, (), None), SchemaField('collection_date', 'DATE', 'NULLABLE', None, None, (), None)]
Import to bigquery successfully  1 records


In [169]:
# Load all value include recently ingested value to calculate MAE
# get sum(ABS) and count of total rows 
# sum+ recent sumb  and count+no recent rows

# from sklearn.metrics import mean_absolute_error
# MAE = mean_absolute_error(dfAllForecastResult[f'actual_value'], dfAllForecastResult[f'pred_value'])
# MAE