In [6]:
import numpy as np 
import pandas as pd 

# Data Pre-processing
from gtda.time_series import SlidingWindow
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import MinMaxScaler, StandardScaler, MaxAbsScaler
from imblearn.under_sampling import RandomUnderSampler

# Persistent Homology
from gtda.homology import VietorisRipsPersistence
from gtda.diagrams import BettiCurve

# Artificial neural network
from sklearn.neural_network import MLPClassifier
from sklearn.model_selection import StratifiedKFold, GridSearchCV
from sklearn.metrics import accuracy_score

import warnings
warnings.filterwarnings('ignore')

In [7]:
def getBettiSeq (VRdiagram_array, dimension:int, binscount = 1000):
    '''create and obtain (x,y) coordinates on filtration parameters and betti numbers from the collection of Betti Curve plots representing the PSE Vietoris Rips diagrams'''
    # Generating the Betti Curve plot for the Vietoris Rips diagrams
    BC = BettiCurve(n_bins=binscount)
    bettiNumbers = BC.fit_transform(VRdiagram_array)
    if dimension == 0:
        for index in range(len(bettiNumbers)):
            bettiNumbers[index][0] = [x+1 for x in bettiNumbers[index][0]]
    filtrationParameters = list(BC.samplings_.values())

    # Creating the tuple for the Betti numbers and Filtration parameters in the Betti Curve plot
    bettiCollection = []
    for index in range(len(bettiNumbers)):
        bettiCoordinates = [
            (x, y) 
            for x, y in zip(filtrationParameters[0], bettiNumbers[index][0])
        ]
        bettiCollection.append(bettiCoordinates)
    return bettiCollection

def createBettiSeqPartition (homology_list, size:int):
    '''outputs the Betti Sequence from a collection of giotto Betti Curve plot coordinates as a list'''  
    #obtaining the minimum and maximum filtration parameters (provided, the parameter range is the whole plot)
    minParameter = homology_list[0][0][0]
    maxParameter = homology_list[0][-1][0]

    #obtaining the middle filtration parameter values of each partition size
    midPartition = (maxParameter - minParameter) / size
    midPartitionParameters = [
        minParameter + ((((2*i) - 1) * midPartition) / 2)
        for i in range(1, size + 1)
    ]

    # creating the Betti sequence of each partition of specified size
    sequences = []
    for homList in homology_list:
        midBettiNumbers = []
        for partition in range(size):
            midParameter = midPartitionParameters[partition]
            for index, element in enumerate(homList):
                if element[0] > midParameter: 
                    midBettiNumbers.append(homList[index-1][1])
                    break
        sequences.append(midBettiNumbers)

    return sequences

def filterDataOnYear(data, date_column, year: int):
    '''obtaining the data and column based on indicated year'''
    yearDates = list(data[date_column][data[date_column].dt.year == year])
    filteredData = data.loc[(data[date_column] >= yearDates[0]) & (data[date_column] <= yearDates[-1])]
    return filteredData

def filterDataSlidingWindow(data, date_column,year:int, window_size: int):
    '''obtaining the data to be used for the Sliding Windows'''
    yearDates = list(data[date_column][data[date_column].dt.year == year])
    prevYearDates = list(data[date_column][data[date_column].dt.year == (year-1)])
    prevYearDatesForSW = prevYearDates[(-window_size+1):]
    SWDates = prevYearDatesForSW + yearDates
    filteredData = data.loc[(data[date_column] >= SWDates[0]) & (data[date_column] <= SWDates[-1])]
    return filteredData


In [8]:
# VARIABLES AND PARAMETERS

# Years covered
startYear = 2012
endYearExclusive = 2024 # Range will cover up to this year minus one

# Scaler used
scaler = MinMaxScaler()

# Persistent homology dimensions
dimensionPH = [0,1]

# Persistent homology sequence parameters (i.e., sliding window size and sequence partition size)
windowSizes = [80,100,120]
partitionSizes = [30,60,100]

# Activation function
activationANN = 'sigmoid'
# 'ReLU' for Rectified Linear Unit function

# Parameter values for Artificial Neural Network model combinations
unitParameters = [x for x in range(5,21,5)]
epochParameters = [y for y in range(20,101,20)]
parameterGrid = {'hidden_layer_sizes': unitParameters,  
    'max_iter': epochParameters,
    } 

In [9]:
# DATA EXTRACTION

# Importing the main dataframe
dataFrameStockReturns = pd.read_csv(r'..\PSE-SR.csv')
dataFrameStockReturns = dataFrameStockReturns.dropna()
dataFrameStockReturns['Date'] = pd.to_datetime(dataFrameStockReturns['Date'])

# Separating the stock returns columns to append Persistent Homology input vectors
stockReturnsColumns = ['PSEi_SR','PSE SER_SR','PSE IND_SR']
dataFramePHVectors = dataFrameStockReturns.drop(stockReturnsColumns,axis=1)
underSampler = RandomUnderSampler(random_state=42, sampling_strategy = 'majority')

In [None]:
# Running the code on each year and the dimensions

gridResults = []

for dimension in dimensionPH:

    for year in range(startYear, endYearExclusive):

        baseDFBestParams = filterDataOnYear(dataFramePHVectors, 'Date', year)

        # Creating the persistent homology features in 
        for window in windowSizes:

            #Converting the data to sliding windows
            slidingWindow = SlidingWindow(size = window)
            dataFrameByYear = filterDataSlidingWindow(dataFrameStockReturns, 'Date', year, window)
            stockReturnsByYear = dataFrameByYear.loc[:, stockReturnsColumns].values.tolist()
            stockReturnsForVR = np.array(slidingWindow.fit_transform(stockReturnsByYear))

            # Initiating Vietoris Rips Persistence
            VR = VietorisRipsPersistence(
                    homology_dimensions=[dimension]
                )
            VRdiagrams = VR.fit_transform(stockReturnsForVR)
            
            # Creating partitions on generated Betti Curve coordinates
            bettiSeqlist = getBettiSeq(VRdiagrams, dimension=dimension)
            for partition in partitionSizes:
                bettiSeqVectors = createBettiSeqPartition(bettiSeqlist, partition)
                headerName = 'Hom'+str(dimension)+'_W'+ str(window) + '_P' + str(partition)  
                baseDFBestParams[headerName] = bettiSeqVectors

        # Running the model on all combinations of homology on the machine learning model
        homColumns = baseDFBestParams.drop(['Date','Class'], axis=1)
        
        bestGridScore = 0
        bestParams_svc = 0
        bestParams_ph = None

        for column in homColumns.columns:

            # Setting and splitting the train and test data (with undersampling)
            X = pd.DataFrame(homColumns[column])
            y = baseDFBestParams.Class 
            X, y = underSampler.fit_resample(X, y)

            # Splitting the training data of each year with training testing split (with under sampling)
            XTrain, XTest, yTrain, yTest = train_test_split(X, y, test_size=0.5,stratify=y, random_state=5)
            XTrain, yTrain = underSampler.fit_resample(XTrain, yTrain)
            XTest, yTest = underSampler.fit_resample(XTest, yTest)
            
            # Apply scalers on the list of vectors
            XTrain = list(XTrain[column])
            XTest = list(XTest[column])
            scaler = MinMaxScaler()
            XTrain = scaler.fit_transform(XTrain)
            XTest = scaler.transform(XTest)
            
            # Running the SVMs on the combinations on kernel functions, cost values and deg/gamma
            classifier = MLPClassifier(activation='logistic', random_state=20, learning_rate_init=0.01)
            gridSearch = GridSearchCV(
                estimator=classifier,
                param_grid=parameterGrid,
                scoring='accuracy',
                cv = StratifiedKFold(n_splits=12,shuffle=True, random_state=50),
                n_jobs=10)
            gridSearch.fit(XTrain, yTrain) 
            gridSearchScore = gridSearch.best_score_ 

            # Defining the variables for the Grid Search results
            if gridSearchScore >= bestGridScore:
                bestGridScore = gridSearchScore
                bestParams_svc = gridSearch.best_params_
                bestSVC = gridSearch.best_estimator_
                bestParams_ph = str(column)

        # Running the best model on the parameters to predict on the Test Data
        bestSVC.fit(XTrain,yTrain)
        bestTrainAcc = bestSVC.fit(XTrain,yTrain).score(XTrain,yTrain)
        yPred = bestSVC.predict(XTest)
        bestTestAcc = accuracy_score(yTest, yPred)

        gridResults.append({
            'Dimension': dimension,
            'Year': year, 
            'Best Window and Partition': bestParams_ph, 
            'Best Parameters': bestParams_svc, 
            'Best Grid Search Score': bestGridScore, 
            'Train Acc on Best Model' : bestTrainAcc, 
            'Test Accuracy on Best Model': bestTestAcc
        })

gridResultsPerYear = pd.DataFrame(gridResults) 

print(scaler)
gridResultsPerYear