# Utilities

**Author**: Maleakhi Agung Wijaya  
**Email**: *maw219@cam.ac.uk*  
**Description**: This file contains utility functions and constants used in other notebooks.

## Constants

In [None]:
## CONSTANTS
CLASS_NAME = "MOVEMENT"
DATASET_DJI = "Datasets/Processed_DJI.csv"
DATASET_NASDAQ = "Datasets/Processed_NASDAQ.csv"
DATASET_NYSE = "Datasets/Processed_NYSE.csv"
DATASET_RUSSELL = "Datasets/Processed_RUSSELL.csv"
DATASET_SP = "Datasets/Processed_S&P.csv"

## Functions

In [None]:
def load_dataset(file_path):
    """
    Load a single dataset.
    """
    
    df_data = pd.read_csv(file_path, index_col="Date")
    
    return df_data

In [None]:
def load_aggregated_datasets(file_paths):
    """
    Load and pre-process datasets from various markets.
    """
    
    market_orders = [] # store the order which markets are processed
    n_markets = 0 # number of markets used
    aggregated_datasets = {}
    
    # Iterate over different indices data to load and process them
    for file_path in file_paths:
        df_data = load_dataset(file_path)
        
        ## Store information on the order of datasets that are processed
        data_name = df_data["Name"][0]
        market_orders.append(data_name)
        del df_data["Name"]
        n_markets += 1
        
        ## Preprocess data
        label = (df_data["Close"][1:] / df_data["Close"][:-1].values).astype(int)
        df_data = df_data[:-1]
        label.index = df_data.index
        
        # do not use the first 200 data as we use moving average as one of the feature
        df_data = df_data[200:]
        df_data[CLASS_NAME] = label
        
        ## Store in dictionary
        aggregated_datasets[data_name] = df_data
    
    return market_orders, n_markets, aggregated_datasets

In [None]:
# The above code generate vanilla datasets, the following generate
# sequential datasets.
def generate_sequential_data(df_data, sequence_length):
    """
    Given a dataframe and sequence length, generate sequential data.
    """
    
    label = list(df_data[CLASS_NAME])
    df_data = df_data.drop(columns=[CLASS_NAME])
    sequential_data = [] # used to store sequential data
    sequential_target = []
    
    ## Sequencing data
    for idx in range(df_data.shape[0]-sequence_length+1):
        sequential_data.append(df_data[idx:idx+sequence_length])
        sequential_target.append(label[idx+sequence_length-1])
    
    ## Notes:
    # - If using conv net, add 1 dimension by reshape later.
    # - convert the sequential_data list of df to np array later
    
    return sequential_data, sequential_target

In [None]:
# The above code generate vanilla datasets, the following generate
# sequential datasets.
def generate_sequential_data_3d(data, target, sequence_length):
    """
    Given a dataframe and sequence length, generate sequential data (for 3d cnn pred).
    """
    sequential_data = []
    sequential_target = []
    
    ## Sequencing data
    for idx in range(data.shape[1]-sequence_length+1):
        sequential_data.append(data[:, idx:idx+sequence_length])
        sequential_target.append(target[idx+sequence_length-1])
    
    sequential_data = np.array(sequential_data)
    sequential_target = np.array(sequential_target)
    
    return sequential_data, sequential_target

In [None]:
def generate_all_sequential_data(sequence_length, df_datas=None):
    """
    Generate aggregated dataset from all markets.
    """
    
    sequential_data = []
    sequential_target = []
    
    # Load datasets
    ## If the datasets are not specified, used all datas
    if df_datas is None:
        market_orders, n_markets, aggregated_datasets = load_aggregated_datasets([DATASET_DJI, 
                                                                              DATASET_NASDAQ, 
                                                                              DATASET_NYSE,
                                                                              DATASET_RUSSELL, 
                                                                              DATASET_SP])
        
        # Iterate over all datasets and generate sequential version of it
        for market in market_orders:
            seq_data, seq_target = generate_sequential_data(aggregated_datasets[market], sequence_length)
            sequential_data.extend(seq_data)
            sequential_target.extend(seq_target)
            
    else:
        df_datas = df_datas
        
        for df_data in df_datas:
            seq_data, seq_target = generate_sequential_data(df_data, sequence_length)
            sequential_data.extend(seq_data)
            sequential_target.extend(seq_target)

    
    return sequential_data, sequential_target

In [None]:
def sequential_reshape(X_seq, reshape_size):
    """
    Reshape sequential data into required format.
    """
    
    X_seq_new = [X_seq[i].to_numpy() for i in range(len(X_seq))]
    X_seq_new = np.array(X_seq_new)
    X_seq_new = X_seq_new.reshape(reshape_size)
    
    return X_seq_new

In [None]:
def analyse_cv(model, X_train, y_train, cv, scoring):
    """
    Do cross validation and compute relevant statistics.
    """
    
    scores = cross_val_score(model, X_train, y_train,
                             scoring=scoring, cv=cv)
    
    print("Scores:", scores)
    print("Mean Scores:", scores.mean())
    print("Standard deviation:", scores.std())

In [None]:
def stacking_classifier():
    """
    Create stack classifier.
    """
    
    # define the base models
    level0 = list()
    level0.append(('lr', LogisticRegression()))
    level0.append(('knn', KNeighborsClassifier()))
    level0.append(('cart', DecisionTreeClassifier()))
    level0.append(('svm', SVC()))
    level0.append(('bayes', GaussianNB()))

    # define meta learner model
    level1 = LogisticRegression()

    # define the stacking ensemble
    model = StackingClassifier(estimators=level0, final_estimator=level1, cv=5)
    
    return model

In [None]:
def cnnpred_2d(sequence_length, n_feature, n_filters, dropout_rate=0.1):
    """
    Build model using architecture that is specified on the paper
    (Hoseinzade and Haratizadeh).
    """
    
    model = keras.Sequential([
        # Layer 1
        keras.Input(shape=(sequence_length, n_feature, 1)),
        layers.Conv2D(n_filters[0], (1, n_feature), activation="relu"),
        
        # Layer 2
        layers.Conv2D(n_filters[1], (3, 1), activation="relu"),
        layers.MaxPool2D(pool_size=(2, 1)),
        
        # Layer 3
        layers.Conv2D(n_filters[2], (3, 1), activation="relu"),
        layers.MaxPool2D(pool_size=(2, 1)),
        
        # FFNN
        layers.Flatten(),
        layers.Dropout(dropout_rate),
        layers.Dense(1, activation="sigmoid")
    ])
    
    return model

In [None]:
def cnnpred_3d(n_markets, sequence_length, n_feature, n_filters):
    """
    Build model using architecture that is specified on the paper
    (Hoseinzade and Haratizadeh).
    """
    
    model = keras.Sequential([
        # layer 1
        layers.Conv2D(n_filters[0], (1, 1), activation='relu', 
                      input_shape=(n_markets,sequence_length,n_feature), data_format='channels_last'),
        
        # layer 2
        layers.Conv2D(n_filters[1], (n_markets, 3), activation="relu"),
        layers.MaxPool2D(pool_size=(1, 2)),
        
        # layer 3
        layers.Conv2D(n_filters[2], (1, 3), activation="relu"),
        layers.MaxPool2D(pool_size=(1, 2)),
        
        # FFNN
        layers.Flatten(),
        layers.Dropout(0.1),
        layers.Dense(1, activation="sigmoid")
    ])
    
    return model

In [1]:
def lstm(win_length, num_features):
    """
    Build LSTM model for predicting stock market direction.
    """
    
    model = tf.keras.Sequential()
    model.add(layers.LSTM(128, input_shape=(win_length, num_features), return_sequences=True))
    model.add(layers.LeakyReLU(alpha=0.5))
    model.add(layers.Dropout(0.3))
    model.add(layers.LSTM(64, return_sequences=False))
    model.add(layers.Dropout(0.3))
    model.add(layers.Dense(1, activation="sigmoid"))
    
    return model

In [None]:
def f1(y_true, y_pred):
    def recall(y_true, y_pred):
        """Recall metric.

        Only computes a batch-wise average of recall.

        Computes the recall, a metric for multi-label classification of
        how many relevant items are selected.
        """
        true_positives = K.sum(K.round(K.clip(y_true * y_pred, 0, 1)))
        possible_positives = K.sum(K.round(K.clip(y_true, 0, 1)))
        recall = true_positives / (possible_positives + K.epsilon())
        return recall

    def precision(y_true, y_pred):
        """Precision metric.

        Only computes a batch-wise average of precision.

        Computes the precision, a metric for multi-label classification of
        how many selected items are relevant.
        """
        true_positives = K.sum(K.round(K.clip(y_true * y_pred, 0, 1)))
        predicted_positives = K.sum(K.round(K.clip(y_pred, 0, 1)))
        precision = true_positives / (predicted_positives + K.epsilon())
        return precision

    precision_pos = precision(y_true, y_pred)
    recall_pos = recall(y_true, y_pred)
    precision_neg = precision((K.ones_like(y_true) - y_true), (K.ones_like(y_pred) - K.clip(y_pred, 0, 1)))
    recall_neg = recall((K.ones_like(y_true) - y_true), (K.ones_like(y_pred) - K.clip(y_pred, 0, 1)))
    f_posit = 2 * ((precision_pos * recall_pos) / (precision_pos + recall_pos + K.epsilon()))
    f_neg = 2 * ((precision_neg * recall_neg) / (precision_neg + recall_neg + K.epsilon()))

    return (f_posit + f_neg) / 2

In [None]:
def plot_confusion_matrix(y_test, y_predict, labels, cmap="Blues"):
    # Plot confusion matrix and normalised confusion matrix
    fig1 = plt.figure(figsize=(4,4))
    fig2 = plt.figure(figsize=(4,4))
    ax1 = fig1.add_subplot(111)
    ax2 = fig2.add_subplot(111)
    cm = confusion_matrix(y_test, y_predict)
    disp = ConfusionMatrixDisplay(cm)
    disp = disp.plot(ax=ax1, cmap=cmap)
    cm_n = confusion_matrix(y_test, y_predict, normalize="true")
    disp_n = ConfusionMatrixDisplay(cm_n)
    disp_n = disp_n.plot(ax=ax2, cmap=cmap)
            
    ax1.set_xticklabels(labels)
    ax1.set_yticklabels(labels, rotation=90)
    ax2.set_xticklabels(labels)
    ax2.set_yticklabels(labels, rotation=90)
    ax1.set_xlabel("$Predicted$")
    ax2.set_xlabel("$Predicted$")
    ax1.set_ylabel("$True$")
    ax2.set_ylabel("$True$")
    plt.show()

In [None]:
def plot_pr_vs_threshold(precisions, recalls, thresholds):
    """
    Plot precision and recall against threshold.
    """
    
    fig = plt.figure(figsize=(6,4))
    ax = plt.subplot2grid((1,1), (0,0))
    ax.plot(thresholds, precisions[:-1], "b--", label="Precision")
    ax.plot(thresholds, recalls[:-1], "g--", label="Recall")
    plt.xlabel("Threshold")
    plt.legend(loc="upper right")
    plt.ylim([0, 1])
    
    return ax

In [None]:
def plot_roc_curve(fpr, tpr, label=None):
    """
    Plot roc curve.
    """
    
    fig = plt.figure(figsize=(6,4))
    ax = plt.subplot2grid((1,1), (0,0))
    ax.plot(fpr, tpr, linewidth=2, label=label)
    ax.plot([0, 1], [0, 1], "k--")
    ax.axis([0, 1, 0, 1.01])
    plt.xlabel("False positive rate (fpr)")
    plt.ylabel("True positive rate (tpr)")
    
    return ax