In [9]:
# Initial imports
import numpy as np
import pandas as pd
from sklearn.preprocessing import MinMaxScaler
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import LSTM, Dense, Dropout
import hvplot.pandas

# LSTM Functions
---

In [10]:
# Set the random seed for reproducibility
# Note: This is used for model prototyping, but it is good practice to comment this out and run multiple experiments to evaluate your model.
from numpy.random import seed

seed(1)
from tensorflow import random

random.set_seed(2)

In [14]:
'''
This function accepts the column number for the features (X) and the target (y).
It chunks the data up with a rolling window of X - window to predict y.
It returns two numpy arrays of X and y.
'''

def window_data(df, window, feature_col_1, feature_col_2, target_col):
    X = []
    y = []
    for i in range(len(df) - window):
        features = df.iloc[i : (i + window), feature_col_1:feature_col_2]
        target = df.iloc[(i + window), target_col]
        X.append(features)
        y.append(target)
    return np.array(X), np.array(y).reshape(-1, 1)


## 4D LSTM

In [38]:
def window_data(df, window, chunk_rows, feature_col_1, feature_col_2, target_col):
    X_list = df.iloc[:,feature_col_1:feature_col_2].values.tolist()
    X_chunks=[X_list[i:i + chunk_rows] for i in range(0, len(X_list), chunk_rows)]
    X = [X_chunks[i:i+window] for i in range (len(X_chunks)-window)]
    y_list=df.iloc[:,target_col].values.tolist()
    y_chunks = [y_list[i + chunk_rows-1] for i in range(0, len(y_list), chunk_rows)]
    y = [y_chunks[i+window] for i in range(len(y_chunks)-window)]
    return np.array(X), np.array(y)

In [19]:
'''
This function splits X and y into training and testing sets, scales the data with MinMaxScaler and reshapes features data for the LSTM model .
'''
def data_splited_scaled(df, window, feature_col_1,feature_col_2, target_col):  
    X, y = window_data(df, window, feature_col_1,feature_col_2, target_col)
    # Use 70% of the data for training and the remainder for testing
    split = int(0.7 * len(X))
    X_train = X[: split]
    X_test = X[split:]
    y_train = y[: split]
    y_test = y[split:]

    # Create a MinMaxScaler object
    scaler = MinMaxScaler()

    # Fit the MinMaxScaler object with the training feature data X_train
    scaler.fit(X_train.ravel().reshape(-1,1))

    # Scale the features training and testing sets
    X_train_scaled= scaler.transform(X_train.ravel().reshape(-1,1))
    X_test_scaled = scaler.transform(X_test.ravel().reshape(-1,1))

    # Fit the MinMaxScaler object with the training target data y_train
    scaler.fit(y_train)

    # Scale the target training and testing sets
    y_train_scaled = scaler.transform(y_train)
    y_test_scaled = scaler.transform(y_test)

    # Reshape the features for the model
    feature_num = feature_col_2 - feature_col_1
    X_train_scaled = X_train_scaled.reshape((X_train.shape[0], X_train.shape[1], feature_num))
    X_test_scaled = X_test_scaled.reshape((X_test.shape[0], X_test.shape[1], feature_num))

    return X_train_scaled, X_test_scaled, y_train_scaled, y_test_scaled, scaler


In [32]:
'''
This function builds and trains a 3-layer LSTM model

'''
def lstm_model(df, window, feature_col_1, feature_col_2, target_col, number_units):

    X_train_scaled, _, y_train_scaled, _ ,_= data_splited_scaled(df, window, feature_col_1,feature_col_2, target_col)

    # Define the LSTM RNN model.
    lstm_model = Sequential()

    dropout_fraction = 0.2
    # calculate
    X_train_scaled, _, _, _ ,_= data_splited_scaled(df, window, feature_col_1, feature_col_2, target_col)
    # Layer 1
    feature_num = feature_col_2 - feature_col_1
    lstm_model.add(LSTM(
    units=number_units,
    return_sequences=True,
    input_shape=(X_train_scaled.shape[1], feature_num))
    )
    lstm_model.add(Dropout(dropout_fraction))
    # Layer 2
    lstm_model.add(LSTM(units=number_units, return_sequences=True))
    lstm_model.add(Dropout(dropout_fraction))
    # Layer 3
    lstm_model.add(LSTM(units=number_units))
    lstm_model.add(Dropout(dropout_fraction))
    # Output layer
    lstm_model.add(Dense(1))

    # Compile the lstm_model
    lstm_model.compile(optimizer="adam", loss="mean_squared_error")

    # Train the lstm_model
    lstm_model.fit(X_train_scaled, y_train_scaled, epochs=10, shuffle=False, batch_size=1, verbose=1)

    return lstm_model

In [33]:
'''
This function evaluates the LSTM model
'''
def lstm_evaluation(df, window, feature_col_1, feature_col_2, target_col, number_units):
    _, X_test_scaled, _, y_test_scaled,_ =data_splited_scaled(df, window, feature_col_1, feature_col_2, target_col)
    model = lstm_model(df, window, feature_col_1, feature_col_2, target_col, number_units)
    score = model.evaluate(X_test_scaled, y_test_scaled,verbose=0)
    return score
    

In [34]:
'''
This function predicts y values and recover the original prices, and then creates a dataframe of Acural and Predicted values of y
'''
def lstm_prediction(df, window, feature_col_1, feature_col_2,target_col, number_units):
    _, X_test_scaled, _, y_test_scaled,scaler =data_splited_scaled(df, window, feature_col_1,feature_col_2, target_col)
    model= lstm_model(df, window, feature_col_1, feature_col_2, target_col, number_units)
    y_predicted = model.predict(X_test_scaled)

    # Recover the original prices instead of the scaled version
    predicted_prices = scaler.inverse_transform(y_predicted)
    actual_prices = scaler.inverse_transform(y_test_scaled.reshape(-1, 1))

    prediction_df = pd.DataFrame({
        "Actual":actual_prices.ravel(),
        "Predicted":predicted_prices.ravel(),
    })

    return prediction_df


# Ploting Funtions
---

In [95]:
def history_plotting(df, title):

    '''
    This function plots the history data.
    '''

    return df.hvplot(
                    ylabel = 'Price in $',
                    width= 1000,
                    height=400,
                    title=title)
     

In [94]:
def moving_avgs_plotting(df,ma_column_1, ma_column_2, title):
    '''
    This function plots the overlay of the original prices and moving averages.
    '''
    prices = df['close'].hvplot(line_color='lightgray',
                        ylabel='Price in $',
                        width=1000,
                        height=400,
                        title=title
                        )

    moving_avgs = df[[ma_column_1, ma_column_2]].hvplot(
                            ylabel='Price in $',
                            width=1000,
                            height=400
    )   

    return prices*moving_avgs

    

In [91]:
def predicted_price_plotting(df,title):
    '''
    This function plots the actual prices vs. the predicted prices.
    '''
    return df[["actual","predicted"]].hvplot(
                    ylabel='Price in $',
                    width=1000,
                    height=400,
                    title=title
)


In [88]:
def bands_plotting(df, title ):
    '''
    This function plots the range of the prices.
    '''
    close = df['close'].hvplot(
        line_color="lightgray",
        ylabel="Price in $",
        width=1000,
        height=400,
        title=title
    )

    upper_band = df['upper'].hvplot(
        line_color="purple",
        ylabel = "Price in $",
        width=1000,
        height=400,
    )

    middle = df['middle'].hvplot(
        line_color="orange",
        ylabel = "Price in $",
        width = 1000,
        height = 400
    )


    lower_band = df['lower'].hvplot(
        line_color="blue",
        ylabel = "Price in $",
        width = 1000,
        height = 400
    )

    # Overlay plots
    return close * upper_band *middle *lower_band

In [87]:
def entry_exit_positions(df, title):
    '''
    This function plots the entry/exit positions using ranges.
    '''
    entry = df[df["signal"] == 1.0]["close"].hvplot.scatter(
    color="green",
    marker="^",
    size=200,
    legend=False,
    ylabel="Price in $",
    title=title,
    width=1000,
    height=400,
)

# Visualize exit position relative to close price
    exit = df[df["signal"] == -1.0]["close"].hvplot.scatter(
    color="red",
    marker="v",
    size=200,
    legend=False,
    ylabel="Price in $",
    width=1000,
    height=400
)

# Visualize close price for the investment
    close = df[["close"]].hvplot(
    line_color="lightgray",
    ylabel="Price in $",
    width=1000,
    height=400
)

    upper = df[["upper"]].hvplot(
    line_color="purple",
    ylabel="Price in $",
    width=1000,
    height=400
)

    middle = df[["middle"]].hvplot(
    line_color="orange",
    ylabel="Price in $",
    width=1000,
    height=400
)

    lower = df[["Lower"]].hvplot(
    line_color="blue",
    ylabel="Price in $",
    width=1000,
    height=400
)


# Overlay plots
    return close * upper * middle * lower * entry * exit