In [None]:
import yfinance
import mplfinance as mpf
import matplotlib. pyplot as plt
import numpy as np
import pandas as pd

from sklearn import preprocessing
from stockstats import StockDataFrame
from statsmodels.tsa.stattools import adfuller
from typing import Tuple

# Download Data

In [None]:
data = yfinance.download(
            tickers=['AAPL'],
            start='2021-01-01',
            end=None,
            interval='1d',
            auto_adjust=True,
            prepost=False
        )

In [None]:
data[['Close']].plot()

In [None]:
# Compute technical indicators
TECHNICAL_INDICATORS = ['macd', 'macds', 'rsi_30', 'rsi_12', 'rsi_26']
stock = StockDataFrame.retype(data.copy())
for technical_indicator_name in TECHNICAL_INDICATORS:
    oscillator_data = stock[technical_indicator_name]
    data[technical_indicator_name] = oscillator_data
data = data.bfill(axis='rows')

In [None]:
data.head()

In [None]:
data.tail()

# Bring Prices & Volume to Log Scale

In [None]:
data[['Open', 'High', 'Low', 'Close', 'Volume']] = data[['Open', 'High', 'Low', 'Close', 'Volume']].apply(np.log)
data.head()

In [None]:
data.tail()

# Compute Weights Functions

In [None]:
def get_weights(d: int, thres: float) -> np.ndarray:
    w, k = [1.], 1
    
    while True:
        w_ = -w[-1] / k * (d - k + 1)
        if abs(w_) < thres:
            break
        w.append(w_)
        k += 1
        
    return np.array(w[::-1]).reshape(-1, 1)



def get_fixed_weights(d: float, size: int) -> np.ndarray:
    w = [1.]
    
    for k in range(1, size):
        w_ = -w[-1] / k * (d - k + 1)
        w.append(w_)
    w = np.array(w[::-1]).reshape(-1, 1)
    
    return w



def plot_weights(d_range: list, n_plots: int, size: int) -> None:
    if len(d_range) == 1:
        d_values = d_range
    else:
        d_values = np.linspace(d_range[0], d_range[1], n_plots)
        
    w = pd.DataFrame()
    for d in d_values:
        w_ = get_fixed_weights(d, size)
        w_ = pd.DataFrame(w_, index=range(w_.shape[0])[::-1], columns=[d])
        w = w.join(w_, how='outer')
        
    ax = w.plot()
    ax.legend(loc='upper left')
    ax.set_xlabel('K = Number of Observations')
    ax.set_ylabel('W = Weights')
    plt.show()

In [None]:
plot_weights(d_range=[0, 1], n_plots=5, size=6)

In [None]:
plot_weights(d_range=[1, 2], n_plots=5, size=6)

In [None]:
plot_weights(d_range=[4, 5], n_plots=5, size=8)

# Apply Weights to Time Series

In [None]:
def frac_diff(data: pd.DataFrame, d: float, thres=1e-3) -> Tuple[pd.DataFrame, np.ndarray]:
    # Constant width window
    w = get_weights(d, thres)
    width = len(w) - 1
    
    df = {}
    for name in data.columns:
        column_data = data[[name]].fillna(method='ffill').dropna()
        differentiated_column_data = pd.Series(dtype=np.float32)
        for end_iloc in range(width, column_data.shape[0]):
            start_loc = column_data.index[end_iloc - width]
            end_loc = column_data.index[end_iloc]
            if not np.isfinite(data.loc[end_loc, name]):
                # Exclude NaNs
                continue
            differentiated_column_data[end_loc] = np.dot(w.T, column_data.loc[start_loc:end_loc]).item()
        df[name] = differentiated_column_data.copy(deep=True)
    df = pd.concat(df, axis=1)
    
    return df, w


def frac_diff_fixed(data: pd.DataFrame, d: float, size: int) -> Tuple[pd.DataFrame, np.ndarray]:
    # Constant width window
    w = get_fixed_weights(d, size)
    width = len(w) - 1
    
    df = {}
    for name in data.columns:
        column_data = data[[name]].fillna(method='ffill').dropna()
        differentiated_column_data = pd.Series(dtype=np.float32)
        for end_iloc in range(width, column_data.shape[0]):
            start_loc = column_data.index[end_iloc - width]
            end_loc = column_data.index[end_iloc]
            if not np.isfinite(data.loc[end_loc, name]):
                # Exclude NaNs
                continue
            differentiated_column_data[end_loc] = np.dot(w.T, column_data.loc[start_loc:end_loc]).item()
        df[name] = differentiated_column_data.copy(deep=True)
    df = pd.concat(df, axis=1)
    
    return df, w


def search_parameters(data: pd.DataFrame) -> pd.DataFrame:
    parameters = pd.DataFrame(columns=['ADF value', 'p value', 'lags', 'nObs', '95% conf', 'corr'])
    for d in np.linspace(0, 1, 11):
        df1 = data[['Close']].resample('1D').last()
        df2, _ = frac_diff_fixed(df1, d, size=5)
        corr = np.corrcoef(df1.loc[df2.index, 'Close'], df2['Close'])[0, 1]
        df2 = adfuller(df2['Close'], maxlag=1, regression='c', autolag=None)
        
        parameters.loc[d] = list(df2[:4]) + [df2[4]['5%']] + [corr]
    
    ax = parameters[['ADF value', 'corr']].plot(secondary_y='ADF value')
    ax.set_xlabel('d value')
    ax.set_ylabel('correlation')
    plt.axhline(parameters['95% conf'].mean(), linewidth=1, color='r', linestyle='dotted')
    
    d_value = find_d_value(parameters)
    if d_value is not None:
        plt.axvline(d_value, linewidth=1, color='g', linestyle='dotted')
    
    return parameters, d_value


def find_d_value(parameters: pd.DataFrame) -> float:
    conf_95 = parameters['95% conf'].mean()
    for d_value, row in parameters.iterrows():
        if row['ADF value'] <= conf_95:
            return d_value
        
    return None

In [None]:
data_frac_diff = data.copy()

In [None]:
parameters, d_value = search_parameters(data_frac_diff)
print('\nPossible parameters')
print(parameters.head())
print(f'd_value: {d_value}')

In [None]:
data_frac_diff, w_frac_diff = frac_diff(data_frac_diff, d_value)
# If d_value = 0.5 it is considered the classic integer differentiation method.
data_integer_diff, w_interger_diff = frac_diff(data_frac_diff, 1)

In [None]:
plot_weights(d_range=[d_value], n_plots=1, size=len(w_frac_diff))
print(w_frac_diff[:10])

In [None]:
plot_weights(d_range=[1], n_plots=1, size=len(w_interger_diff))

### Let's See How the Prices & Volume Are Looking

In [None]:
data[['Open', 'High', 'Low', 'Close']].plot()

In [None]:
data_integer_diff[['Open', 'High', 'Low', 'Close']].plot()

We can see that the classic method of integer differentiation is almost flat & very sensitive to outliers.

In [None]:
data_frac_diff[['Open', 'High', 'Low', 'Close']].plot()

We can definetly see that the mean & variance is almost the same for any random slice of the time series. 

In [None]:
data[['Volume']].plot()

In [None]:
data_integer_diff[['Volume']].plot()

In [None]:
data_frac_diff[['Volume']].plot()

### Let's See How the Technical Indicators Are Looking

In [None]:
data['macd'].plot()

In [None]:
data_integer_diff['macd'].plot()

In [None]:
data_frac_diff['macd'].plot()

In [None]:
data['rsi_30'].plot()

In [None]:
data_integer_diff['rsi_30'].plot()

In [None]:
data_frac_diff['rsi_30'].plot()