In [None]:
#import the packages
import os
import sys
import pandas as pd
import numpy as np
from sklearn.preprocessing import MinMaxScaler
from sklearn.model_selection import TimeSeriesSplit
from tensorflow.keras.models import Sequential, Model
from tensorflow.keras.layers import LSTM, Bidirectional, Dense, Conv1D, Flatten, MaxPooling1D, RepeatVector, \
    TimeDistributed, LayerNormalization, Dropout, MultiHeadAttention, Input
from tensorflow.keras.optimizers import Adam
import statsmodels.api as sm
from pmdarima import auto_arima
from scipy import stats
from statsmodels.tsa.arima.model import ARIMA
import matplotlib.pyplot as plt
import tensorflow as tf

In [None]:
# import packages for measuring the performance
import quantstats as qs
import yfinance as yf
from datetime import datetime

In [None]:
tf.random.set_seed(7)

In [None]:
# Read the input data
def read_data(path, dim_type, gold_data=None, fed_rate=None, use_percentage=1):
    df = pd.read_csv(path)
    data_len = df.shape[0]
    data = None
    if dim_type!='Multi':
        data = df[dim_type].values.reshape((data_len, 1))
    else:
        # Multi
        df["Date"]=pd.to_datetime(df["Date"], format='%Y-%m-%d %H:%M:%S').dt.strftime('%Y-%m-%d')
        #df['CO'] = df['Close'] - df['Open']
        #diff_data = df['CO'].values.reshape((data_len, 1))
        open_data = df["Open"].values.reshape((data_len, 1))
        high_data = df["High"].values.reshape((data_len, 1))
        low_data = df["Low"].values.reshape((data_len, 1))
        close_data = df["Close"].values.reshape((data_len, 1))
        volume_data = df["Volume"].values.reshape((data_len, 1))
        # input gold price
        if gold_data is not None:
            gold_data['Date']=gold_data['Date'].dt.strftime('%Y-%m-%d')
            df['Gold']=df['Date']
            
            # calc the gold series
            df['Gold'] = df['Gold'].apply(lambda x: gold_data['Close'][x==gold_data['Date']].values[0] if x in gold_data['Date'].values else np.nan)  
            # fillna using interpolating
            df['Gold'] = df['Gold'].interpolate(limit_direction="both")

            gold_data_fill=df["Gold"].values.reshape((data_len, 1))
            #input the federal rate 
            if fed_rate is not None:
                fed_rate['DATE']=fed_rate['DATE'].dt.strftime('%Y-%m-%d')
                df['fed']=df['Date']
                
                # calc the gold series
                df['fed'] = df['fed'].apply(lambda x: fed_rate['FEDFUNDS'][x==fed_rate['DATE']].values[0] if x in fed_rate['DATE'].values else np.nan)  
                # fillna using interpolating
                df['fed'] = df['fed'].interpolate(limit_direction="both")
    
                fed_data=df["fed"].values.reshape((data_len, 1))
                data = np.hstack((close_data, open_data, high_data, low_data, volume_data, gold_data_fill, fed_data))
            else:
                data = np.hstack((close_data, open_data, high_data, low_data, volume_data, gold_data_fill))
        else:
            data = np.hstack((close_data, open_data, high_data, low_data, volume_data))
    return data[0:int(np.floor(data_len * use_percentage))], np.floor(data_len * use_percentage)


In [None]:
# Set the X and Y value
def split_sequence(sequence, dim_type, n_steps_in, n_steps_out):
    X, y = list(), list()
    for i in range(len(sequence)):
        # find the end of the input pattern
        end_ix = i + n_steps_in
        # find the end of the output pattern
        out_end_ix = end_ix + n_steps_out
        # check if we are beyond the dataset
        if out_end_ix > len(sequence):
            break
        if dim_type == 'Multi':
            # gather input and output parts of the pattern
            seq_x = sequence[i:end_ix, 1:]
            seq_y = sequence[end_ix:out_end_ix, 0]
        else:
            seq_x, seq_y = sequence[i:end_ix], sequence[end_ix:out_end_ix]
        X.append(seq_x)
        y.append(seq_y)
    return np.array(X), np.array(y)

In [None]:
# Normalize the data 
def data_trasform(data, anti=False, scaler=None):
    '''
    说明以及例子
    MinMax data and anti MinMax data
    :param data: the data source
    :param model: MinMax and anti MinMax
    :param scaler: anti MinMax scaler
    :return: the transformed data

    '''
    if not anti:
        # 归一化
        # 创建一个空字典来存储每一列的 scaler
        scalers = {}
        # 归一化数据的容器
        normalized_data = np.zeros_like(data)
        # 循环每一列
        for i in range(data.shape[1]):  # data.shape[1] 是列的数量
            # 为每一列创建一个新的 MinMaxScaler
            scaler = MinMaxScaler()
            # 将列数据调整为正确的形状，即(-1, 1)
            column_data = data[:, i].reshape(-1, 1)
            # 拟合并转换数据
            normalized_column = scaler.fit_transform(column_data)
            # 将归一化的数据存回容器中
            normalized_data[:, i] = normalized_column.ravel()
            # 存储scaler以便后续使用
            scalers[i] = scaler
        # 现在 normalized_data 是完全归一化的数据
        # scalers 字典包含每一列的 MinMaxScaler 实例
        return normalized_data, scalers
    else:
        # 反归一化
        # 如果data是三维数组，去除最后一个维度
        if data.ndim == 3 and data.shape[2] == 1:
            data = data.squeeze(axis=2)

        restored_data = np.zeros_like(data)
        for i in range(data.shape[1]):  # 遍历所有列
            column_data = data[:, i].reshape(-1, 1)
            restored_data[:, i] = scaler.inverse_transform(column_data).ravel()
        return restored_data

In [None]:
def create_model(model_type, n_features, n_steps_in, n_steps_out):
    '''
        create model
        :param model_type:  LSTM,BD LSTM(bidirectional LSTM),ED LSTM(Encoder-Decoder LSTM),CNN
        :param n_features:
        :param n_steps_in:
        :param n_steps_out:
        :return: the created model
    '''
    model = Sequential()
    adam_optimizer = Adam(learning_rate=0.001)
    if model_type == 'LSTM':
        # LSTM
        model.add(LSTM(100, activation='sigmoid', return_sequences=True, input_shape=(n_steps_in, n_features)))
        model.add(LSTM(100, activation='sigmoid'))
        model.add(Dense(n_steps_out))

    elif model_type == 'BD LSTM':
        # bidirectional LSTM
        model.add(Bidirectional(LSTM(50, activation='sigmoid'), input_shape=(n_steps_in, n_features)))
        model.add(Dense(n_steps_out))

    elif model_type == 'ED LSTM':
        # Encoder-Decoder LSTM
        # Encoder
        model.add(LSTM(100, activation='sigmoid', input_shape=(n_steps_in, n_features)))
        # Connector
        model.add(RepeatVector(n_steps_out))
        # Decoder
        model.add(LSTM(100, activation='sigmoid', return_sequences=True))
        model.add(TimeDistributed(Dense(1)))

    elif model_type == 'CNN':
        # CNN
        model.add(Conv1D(filters=64, kernel_size=2, activation='relu', input_shape=(n_steps_in, n_features)))
        model.add(MaxPooling1D(pool_size=2))
        model.add(Flatten())
        model.add(Dense(20, activation='relu'))
        model.add(Dense(n_steps_out))

    elif model_type == 'Convolutional LSTM':
        # Convolutional LSTM
        model.add(Conv1D(filters=64, kernel_size=2, activation='relu', input_shape=(n_steps_in, n_features)))
        model.add(LSTM(20, activation='relu', return_sequences=False))
        model.add(Dense(20, activation='relu'))
        model.add(Dense(n_steps_out))

    elif model_type == 'Transformer':
        model = create_transformer_model(n_steps_in, n_steps_out, n_features, d_model=64,
                                         num_heads=12, ff_dim=64, num_transformer_blocks=3)

    elif model_type == 'MLP':
        # 多层感知机 (MLP)
        model.add(Dense(20, activation='relu', input_shape=(n_steps_in, n_features)))
        model.add(Flatten())
        model.add(Dense(20, activation='relu'))
        model.add(Dense(n_steps_out))

    elif model_type == 'ARIMA':
        if n_features != 1:
            print("ARIMA model only supports univariate time series data")
            print("ARIMA model has no parameter n_steps_in")
            return None
        model = 'ARIMA'
        return model
    
    else:
        print("no model")
    model.compile(optimizer=adam_optimizer, loss='mse')
    return model

In [None]:
def train_and_forecast(model, n_features, dim_type, data_X, data_Y, n_steps_in, n_steps_out, ech):
    # train the model 
    # hide the output 
    sys.stdout = open(os.devnull, 'w')
    sys.stderr = open(os.devnull, 'w')

    X, y = split_sequence(data_X, dim_type, n_steps_in, n_steps_out)
    # 对于多维数据，调整最后一个维度为特征数
    X = X.reshape((X.shape[0], X.shape[1], n_features))

#######################################################################################
    if model == 'ARIMA':
        # # 自动确定 ARIMA 模型的参数
        # auto_model = auto_arima(data_X, seasonal=False, trace=True, error_action='ignore', suppress_warnings=True)
        # # 输出最佳 ARIMA 模型的参数
        # print(auto_model.summary())
        
        # 使用最佳参数拟合 ARIMA 模型
        # ARIMA 模型只接受单变量时间序列，这里假设 data_X 和 data_Y 是一维数组
        # order = auto_model.order
        order = (6,1,6)
        arma_model = ARIMA(data_X, order=order)
        model_fit = arma_model.fit()
        # 拟合结果
        fit_result = model_fit.fittedvalues
        # 使用模型进行滚动预测
        history = list(data_X)
        test_result = []
        for t in range(len(data_Y)):
            model = ARIMA(history, order=order)
            model_fit = model.fit()
            output = model_fit.forecast(n_steps_out)
            test_result.append(output)
            history.append(data_Y[t])  # update the history state
        test_result = np.array(test_result)
        fit_result = fit_result.reshape(len(fit_result), 1)
        return fit_result, test_result
#######################################################################################

    # traing the model
    model.fit(X, y, epochs=ech, batch_size=32, verbose=1)

    # store the fitting result
    fit_result = []
    for index, ele in enumerate(X):
        print(f'Fitting {index}th data')
        pred = model.predict(ele.reshape((1, n_steps_in, n_features)))
        fit_result.append(pred)
    fr = np.array(fit_result)
    fit_result = fr.reshape(len(fit_result), n_steps_out)
    # show the test result
    test_x, test_y = split_sequence(data_Y, dim_type, n_steps_in, n_steps_out)
    test_x = test_x.reshape((test_x.shape[0], test_x.shape[1], n_features))
    test_result = []
    for index, ele in enumerate(test_x):
        print(f'Predicting {index}th data')
        pred = model.predict(ele.reshape((1, n_steps_in, n_features)))
        test_result.append(pred)
    tr = np.array(test_result)
    test_result = tr.reshape(len(test_result), n_steps_out)

    # 恢复输出
    sys.stdout = sys.__stdout__
    sys.stderr = sys.__stderr__
    return fit_result, test_result

# Predict Close Price using 6 Neural Networks

In [None]:
gold = yf.download(tickers="GC=F")

start = datetime(2013, 4, 29)
end = datetime(2021, 7, 6)

filtered = gold[start: end]

In [None]:
fed_rate = pd.read_csv('FEDFUNDS.csv')
fed_rate["DATE"] = pd.to_datetime(fed_rate["DATE"])

In [None]:
path = 'coin_Bitcoin.csv'
gold_data = None 
fed_rate = None 
dim_type = 'Multi'
use_percentage = 1

n_steps_in = 5  # take prior 5 values as input 
n_steps_out = 1 # predict t+1 value 

percentage = 0.7  # 训练集百分比
epochs = 25  # 迭代次数
#rounds = 3  # Number of exp

In [None]:
data, data_len = read_data(path, dim_type, gold_data, fed_rate, use_percentage)
data, scalers = data_trasform(data)

# split into train and test
train_set = data[0:int(np.floor(data_len * percentage))]  # train
test_set = data[int(np.floor(data_len * percentage)):]  # test 

In [None]:
# find the test dataset for the backtesting
test_bit_data = bit_data[int(np.floor(data_len * percentage)):]
update_dataset_model = test_bit_data[n_steps_in:]

In [None]:
# extract the number of features
n_features = len(train_set[0]) - 1 if len(train_set[0]) > 1 else 1 # features of the input 

In [1]:
#set the model type
model_type =['LSTM', 'BD LSTM', 'CNN', 'MLP', 'ED LSTM', 'Convolutional LSTM']

pred_name = []
for i in model_type:
    pred_name.append('prediction_'+i)

In [None]:
for i in range(len(model_type)):
    model_use = model_type[i]
    Model = create_model(model_use, n_features, n_steps_in, n_steps_out)
    
    train_result, test_result = train_and_forecast(Model, n_features, dim_type, train_set, test_set, n_steps_in,
                                                    n_steps_out, epochs)
    
    # ----------------------evaluation--------------------
    train_result = data_trasform(train_result, True, scalers[0])  # 反归一化
    test_result = data_trasform(test_result, True, scalers[0])  # 反归一化
    
    test_result_list = test_result.tolist()
    t_list = list()
    for j in range(len(test_result_list)):
        number = test_result_list[j][0]
        t_list.append(number)
    update_dataset_model[pred_name[i]] = t_list

# Predict Close Price Using Arima

In [None]:
path = 'coin_Bitcoin.csv'
gold_data_ARIMA = None  
fed_rate_ARIMA = None 
dim_type_ARIMA = 'Close' #ARIMA ony accepts one dimensional data
use_percentage = 1

n_steps_in = 5  # take prior 5 values as input 
n_steps_out = 1 # predict t+1 value 

percentage = 0.7  # 训练集百分比
epochs = 25  # 迭代次数
#rounds = 3  # Number of exp

In [None]:
data_ARIMA, data_len_ARIMA = read_data(path, dim_type_ARIMA, gold_data_ARIMA, fed_rate_ARIMA, use_percentage)
data_ARIMA, scalers_ARIMA = data_trasform(data_ARIMA)

# split into train and test
train_set_ARIMA = data_ARIMA[0:int(np.floor(data_len_ARIMA * percentage))]  
test_set_ARIMA = data_ARIMA[int(np.floor(data_len_ARIMA * percentage)):]  

In [None]:
n_features_ARIMA = len(train_set_ARIMA[0]) - 1 if len(train_set_ARIMA[0]) > 1 else 1 # features of the input 

In [None]:
model_type = 'ARIMA' # ARIMA

Model = create_model(model_type, n_features_ARIMA, n_steps_in, n_steps_out)

order = (6,1,6)
arma_model = ARIMA(train_set_ARIMA, order=order)
model_fit = arma_model.fit()
# 拟合结果
fit_result = model_fit.fittedvalues
# 使用模型进行滚动预测
history = list(train_set_ARIMA)
test_result = []
for t in range(len(test_set_ARIMA)):
    model = ARIMA(history, order=order)
    model_fit = model.fit()
    output = model_fit.forecast(n_steps_out)
    test_result.append(output)
    history.append(test_set_ARIMA[t])  
test_result = np.array(test_result)
fit_result = fit_result.reshape(len(fit_result), 1)

In [None]:
# ----------------------evaluation--------------------
test_result_ARIMA = data_trasform(test_result, True, scalers[0])  # denormalize

test_result_list_ARIMA = test_result_ARIMA.tolist()
t_list_ARIMA = list()
for j in range(len(test_result_list_ARIMA)):
    number = test_result_list_ARIMA[j][0]
    t_list_ARIMA.append(number)

In [None]:
update_dataset_model['Prediction_ARIMA'] = t_list_ARIMA_updated

In [None]:
# show the prediction result
update_dataset_model