In [2]:
import os
import pandas as pd
import numpy as np
import scipy.stats as stats
import matplotlib.pylab as plt

In [3]:
def simulate_Y_m(x, m):
    n = len(x)
    Y = []
    for t in range(m, n):
        Y_m = np.log(x[t] / x[t-m])
        Y.append(Y_m)
    return Y

In [4]:
def AR1_Estimation(data):
    import numpy as np
    
    n = len(data)
    
    # Define the objective function
    def obj_function(k):
        a1 = sum([data[i-1]*data[i] for i in range(1, int(k)+1)])/sum([data[i-1] for i in range(1, int(k)+1)])
        a2 = sum([data[i-1]*data[i] for i in range(int(k)+1, n)])/sum([data[i-1] for i in range(int(k)+1, n)])
        sigma2 = (1/(2*n)) * (sum([(data[i]- a1*data[i-1])**2 for i in range(1, int(k)+1)])+ sum([(data[i]- a2*data[i-1])**2 for i in range(int(k)+1, n)]))
        return -n * np.log(sigma2)
    
    # Calculate the likelihood for each k
    mle = [obj_function(i) for i in range(1, n-1)]
    
    # Find the index of the maximum likelihood
    k = np.argmax(mle)  +1

    # Calculate a1 and a2 using optimized k
    a1 = sum([data[i-1]*data[i] for i in range(1, k+1)]) / sum([data[i-1] for i in range(1, k+1)])
    a2 = sum([data[i-1]*data[i] for i in range(k+1, n)]) / sum([data[i-1] for i in range(k+1, n)])
    
    # Calculate sigma2 using optimized k
    sigma2 = (1/(2*n)) * (sum([(data[i]- a1*data[i-1])**2 for i in range(1, k+1)]) + sum([(data[i]- a2*data[i-1])**2 for i in range(k+1, n)]))
    
    # Store results in a dictionary
    Resultats = {
        'a1': a1,
        'a2': a2,
        'sigma2': sigma2,
        'k': k
    }
    # print(f'a1={a1}')
    # print(f'a2={a2}')
    # print(f'sigma2={sigma2}')
    # print(f'k={k}')
    return Resultats


In [5]:
Liste_donnees=os.listdir('.\.\DonCAC40\\')

m_values = [1, 2, 3, 4]
result_dict = {}

for m in m_values:
    Frame_Dict = {}
    Frame_Dict['actions'] = [t[:-4] for t in Liste_donnees]
    
    estimator = ['a1', 'a2', 'sigma2', 'k']
    for x in estimator:
        Frame_Dict[x] = []
        
    for i in range(len(Liste_donnees)): 
        F = '.\.\DonCAC40\\' + Liste_donnees[i]
        data = pd.read_csv(F, delimiter=',')
        data = data[data[data.columns[5]].notnull()]  # Remove missing data
        X = data[data.columns[5]]
        Y = simulate_Y_m(X, m)
        
        D = AR1_Estimation(Y)
        for cle, valeur in D.items():
            Frame_Dict[cle].append(valeur)
    
    Resultats_S = pd.DataFrame.from_dict(Frame_Dict)
    Resultats_S = Resultats_S.set_index('actions')

    result_dict[m] = Resultats_S


In [None]:
result_dict

In [None]:
#m=1
result_dict[1]


In [None]:
# m = 2
result_dict[2]

# K-S test

In [9]:

def eCDF(sample, t, sort = False):
    # sorts the sample, if unsorted
    if sort:
        sample.sort()
    # counts how many observations are below x
    res = sum(sample <= t)
    # divides by the total number of observations
    res = res / len(sample)
    return res
def decision(p_value, alpha):
        if (p_value <= alpha):
            return "reject null hypothesis" #X1...Xk and Xk...XN do not follow the same distribution (breakpoint exists)
        else:
            return "accept null hypothesis" #otherwise
def ks_2sample_test(sample1, sample2):
    # gets all observations
    observations = np.concatenate((sample1, sample2))
    observations.sort()
    # sorts the samples
    sample1.sort()
    sample2.sort()
    # find K-S statistic
    list_D = [] 
    for x in observations:
        cdf_sample1 = eCDF(sample = sample1, t = x)
        cdf_sample2 = eCDF(sample = sample2, t = x)
        list_D.append(abs(cdf_sample1 - cdf_sample2))
    ks_stat = max(list_D)
    # calculates the p-value based on the two-sided test
    # the p-Value comes from the KS Distribution Survival Function (SF = 1-CDF)
    n1, n2 = float(len(sample1)), float(len(sample2))
    en = n1 * n2 / (n1 + n2)
    p_value = stats.kstwo.sf(ks_stat, np.round(en))
    dec = decision(p_value, 0.05) # alpha = 0.05
        
    return {"ks_stat": ks_stat, "p_value" : p_value,"decision": dec}

In [10]:
result_test = {}
for m in [1,2,3,4]:
    Frame_Dict = {}
    Frame_Dict['actions'] = [t[:-4] for t in Liste_donnees]
    result = ['ks_stat', 'p_value','decision']
    for x in result:
        Frame_Dict[x] = []
        
    for i in range(len(Liste_donnees)): 
        F = '.\.\DonCAC40\\' + Liste_donnees[i]
        data = pd.read_csv(F, delimiter=',')
        data = data[data[data.columns[5]].notnull()]  # Remove missing data
        X = data[data.columns[5]]
        k = result_dict[m]['k'][i]
        data1 = np.asarray(X[:k])
        data2 = np.asarray(X[k:])
        D = ks_2sample_test(data1, data2)
        for cle, valeur in D.items():
            Frame_Dict[cle].append(valeur)
    
    Resultats_test = pd.DataFrame.from_dict(Frame_Dict)
    Resultats_test = Resultats_test.set_index('actions')

    result_test[m] = Resultats_test
    

In [None]:
result_test

In [None]:
#m=1
result_test[1]

In [None]:
#m=2
result_test[2]

In [None]:
for i in range(len(Liste_donnees)): 
    F='.\.\DonCAC40\\'+Liste_donnees[i]
    data = pd.read_csv(F, delimiter=',')
    data=data[data[data.columns[5]].notnull()]  # supprimer les données manquantes

    ### colonnes 6  : AdjClose
    ### print(data.columns[5])
    k1 = result_dict[1]['k'][i]
    k2 = result_dict[2]['k'][i]
    k3 = result_dict[3]['k'][i]
    k4 = result_dict[4]['k'][i]

    fig = plt.figure(figsize=(10, 7))
    ax = fig.add_subplot(111)
    # cacher axes  'haut' , 'droit' et 'gauche':
    for side in ['right','top']:
        ax.spines[side].set_visible(False)

    data[data.columns[5]].plot()
    
    ### on affiche les dates modulo 60
    indices=[i for i in range(len(data[data.columns[0]])) if i%70==0]
    XX_ticks=[data[data.columns[0]][i] for i in indices]
    plt.xticks(indices,XX_ticks, rotation=45, fontsize=9)
    #####
    plt.axvline(k1,color='red',label='k1',lw=2)
    plt.axvline(k2,color='yellow',label='k2',lw=2)
    plt.axvline(k3,color='green',label='k3',lw=2)
    plt.axvline(k4,color='blue',label='k4',lw=2)
    
    plt.title("données de : %s"%Liste_donnees[i][:-4])
    
    plt.legend()
    plt.show()

In [None]:
for i in range(len(Liste_donnees)): 
    F = './DonCAC40/' + Liste_donnees[i]
    data = pd.read_csv(F, delimiter=',')
    data = data[data[data.columns[5]].notnull()]  # Remove missing data

    # Assuming that k_values are indices within the DataFrame
    k_values = [result_dict[t]['k'][i] for t in range(1, 5)]

    fig = plt.figure(figsize=(10, 7))
    ax = fig.add_subplot(111)

    # Hide unwanted spines
    for side in ['right', 'top']:
        ax.spines[side].set_visible(False)

    data[data.columns[5]].plot(ax=ax)
    
    # Customizing x-ticks to show specific dates
    indices = [j for j in range(len(data[data.columns[0]])) if j % 70 == 0]
    XX_ticks = [data[data.columns[0]][j] for j in indices]
    plt.xticks(indices, XX_ticks, rotation=45, fontsize=9)

    # Drawing vertical lines at rupture points and adding annotations
    colors = ['red', 'black', 'green', 'blue']
    for idx, k in enumerate(k_values):
        plt.axvline(k, color=colors[idx], label=f'k{idx+1}', lw=2)
        # Ensure the date is properly formatted as a string if needed
        day_at_rupture = data[data.columns[0]].iloc[k]
        plt.annotate(f'{day_at_rupture}', xy=(k, data[data.columns[5]].iloc[k]), 
                     xytext=(k, data[data.columns[5]].max() * 0.95),
                     fontsize=9, color=colors[idx], rotation=45)

    plt.title(f"Data from: {Liste_donnees[i][:-4]}")
    plt.legend()

    # Adding a table of k values with dates
    cell_text = [[f'k{index+1}: {k:.2f}, Date: {data[data.columns[0]].iloc[k]}'] for index, k in enumerate(k_values)]
    the_table = plt.table(cellText=cell_text, colLabels=['K_Value'], 
                          bbox=[0.98, 0.25, 0.5, 0.5], edges="open", cellLoc='left')  # Adjust bbox for text size
    
    # Customize table aesthetics
    for i, _ in enumerate(cell_text):
        the_table[i, 0]._loc = 'left'
        the_table[i, 0]._text.set_horizontalalignment('left') 
    
    the_table.auto_set_font_size(False)
    the_table.set_fontsize(12)
    the_table.scale(1, 2.5)  # Adjust scaling for better readability
    
    plt.subplots_adjust(left=0.2, right=0.8)  # Adjust right margin to fit the table
    plt.show()