In [2]:
import pandas as pd
import numpy as np
from tqdm import tqdm
from itertools import product
from scipy.spatial.distance import cdist
from qlib.data import D

from utlis import get_base_company, get_data

In [3]:
selected_tickers = get_base_company('2014-01-01', '2020-01-01')
all_timestamps, all_tickers, all_data = get_data('2014-01-01', '2016-06-30', selected_tickers, market='nasdaq100')

In [4]:
def attractor(series, E, tau, L):
    att = np.full((L, E), np.nan)
    for i in range((1 + (E-1)*tau), L):
        for j in range(E):
            att[i, j] = series[i - (j-1)*tau]
    return pd.DataFrame(att).dropna()

def PC_estimater(X, Y, E, tau, L):
    Mx = attractor(X, E, tau, L-1)
    My = attractor(Y, E, tau, L-1)
    L_m = Mx.shape[0]

    # Distance Matrix, metric='euclidean'/'cityblock'
    Dx = cdist(Mx.to_numpy(), Mx.to_numpy(), metric='euclidean')
    Dy = cdist(My.to_numpy(), My.to_numpy(), metric='euclidean')

    # The nearest neighbors' time indices for all dot in Mx or My
    X_nn = np.zeros((L_m, E+1), dtype=int)
    Y_nn = np.zeros((L_m, E+1), dtype=int)

    for i in range(L_m):
        X_nn[i, :] = np.argsort(Dx[i, :])[1:(E+2)]
        Y_nn[i, :] = np.argsort(Dy[i, :])[1:(E+2)]

    # The nearest neighbors' weights
    Wx = np.zeros((L_m, E+1))
    Wy = np.zeros((L_m, E+1))

    for i in range(L_m):
        Wx[i, :] = np.exp(-Dx[i, X_nn[i, :]])
        Wy[i, :] = np.exp(-Dy[i, Y_nn[i, :]])
        Wx[i, :] /= np.sum(Wx[i, :])
        Wy[i, :] /= np.sum(Wy[i, :])

    sx = np.zeros((L_m, E-1))
    sy = np.zeros((L_m, E-1))

    for i in range(E-1):
        sx[:, i] = (Mx.iloc[:, i+1] - Mx.iloc[:, i]) / Mx.iloc[:, i]
        sy[:, i] = (My.iloc[:, i+1] - My.iloc[:, i]) / My.iloc[:, i]

    # Average signature
    Sx = np.zeros((L_m, E-1))
    # Sy = np.zeros((L_m, E-1))

    for i in range(L_m):
        for j in range(E-1):
            Sx[i, j] = np.sum(sx[X_nn[i, :], j] * Wx[i, :])
            # Sy[i, j] = np.sum(sy[Y_nn[i, :], j] * Wy[i, :])


    Sx_hat = np.zeros((L_m, E-1))

    for i in range(L_m):
        for j in range(E-1):
            Sx_hat[i, j] = np.sum(sx[Y_nn[i, :], j] * Wy[i, :])

    hyper_k = 0.2
    kx = hyper_k*Sx.std()
    
    # ky = hyper_k*Sy.std()
    kx_hat = hyper_k*Sx_hat.std()

    Px = np.where(Sx > kx, 1, np.where(Sx < -kx, -1, 0))
    # Py = np.where(Sy > ky, 1, np.where(Sy < -ky, -1, 0))
    Px_hat = np.where(Sx_hat > kx_hat, 1, np.where(Sx_hat < -kx_hat, -1, 0))

    signature = [-1, 0, 1] # ↘，→，↗
    pattren_num = 3**(E-1)
    PC_x2y = np.zeros((pattren_num, pattren_num))

    patterns = list(product(signature, repeat=(E-1)))
    for i, pattern_x in enumerate(patterns):
        idx = np.where(np.all(Px == pattern_x, axis=1))[0]
        for j, pattern_x_hat in enumerate(patterns):
            true_num = np.all(Px_hat[idx] == pattern_x_hat, axis=1).sum()
            causal_strength = true_num/len(idx)
            PC_x2y[i,j] = causal_strength

    return PC_x2y

In [5]:
data = all_data.loc[:,'feature'].loc[:,'$close'].swaplevel()
L = len(all_timestamps) - 1
E = 3
tau = 1
h = 1
stock_num = len(all_tickers)

In [3]:
all_PC = np.zeros([stock_num, stock_num, 3**(E-1), 3**(E-1)])
## pattern causality from i to j
for id_i, stock_i in enumerate(tqdm(all_tickers)):
    for id_j, stock_j in enumerate(all_tickers):
        # if id_i == id_j:
        #     continue
        X = data.loc[stock_i, :][:-1]
        Y = data.loc[stock_j, :].shift(-1)[:-1]
        PC_x2y = PC_estimater(X, Y, E, tau, L)
        all_PC[id_i, id_j] = PC_x2y

np.save('./PC_x2y_nasdaq100_t2t+1_14-16.npy', all_PC)