# 0. Установка и импортирование библиотек

In [None]:
import numpy as np
import scipy.stats as st

from scipy.special import kolmogorov
from scipy.optimize import minimize

In [None]:
!pip install pyjanitor
import janitor

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/


In [None]:
import pandas as pd

import plotly.express as px
import plotly.graph_objects as go

from os import listdir
from os.path import isfile, join

from functools import lru_cache

from numba import jit, njit

# 1. Работа с данными

## 1.1. Сбор данных в один датасет

Функция ниже получает папку с файлами котировок.
Из каждого файла с расширением .csv получаем даты и цены закрытия, соединяем все в одну единственную таблицу.

In [None]:
def get_data(* , column='<CLOSE>', path='./', sep=','):
  files = [f for f in listdir(path) if isfile(join(path, f)) and f[-4:] == '.csv']

  df = pd.DataFrame(columns=['date'])

  for file in files:
    current = (
    pd.read_csv(path+file, sep=sep)
    .rename(columns={'<DATE>':'date', column: file[:-4]})
    .to_datetime('date')
    )

    df = pd.merge(df, current[['date', file[:-4]]], on = "date", how = "outer")

  df.sort_values(by='date', inplace=True, ignore_index=True)

  return df

In [None]:
df = get_data(path='./')
df.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 2511 entries, 0 to 2510
Data columns (total 14 columns):
 #   Column  Non-Null Count  Dtype         
---  ------  --------------  -----         
 0   date    2511 non-null   datetime64[ns]
 1   SNGSP   2511 non-null   float64       
 2   MFGSP   2364 non-null   float64       
 3   KRKNP   1504 non-null   float64       
 4   TATN    2511 non-null   float64       
 5   BANEP   2037 non-null   float64       
 6   TRNFP   2511 non-null   float64       
 7   NVTK    2511 non-null   float64       
 8   ROSN    2511 non-null   float64       
 9   GAZP    2511 non-null   float64       
 10  TATNP   2511 non-null   float64       
 11  LKOH    2511 non-null   float64       
 12  RNFT    784 non-null    float64       
 13  SNGS    2511 non-null   float64       
dtypes: datetime64[ns](1), float64(13)
memory usage: 274.8 KB


## 1.2. Предварительный анализ

### 1.2.1. Число торговых дней по годам

In [None]:
days = df.drop(['date'], axis=1).groupby(df.date.dt.year).count()
px.imshow(days)

Исключим тикеры RNFT, BANEP, MFGSP и KRKNP т.к. они торговались не стабильно на временном отрезке: RNFT, BANEP и KRKNP не торговались в первые годы, а MFGSP имеет пропуски

In [None]:
df = df.drop(['RNFT', 'BANEP', 'KRKNP', 'MFGSP'], axis=1)

Оставшиеся тикеры:

In [None]:
tickers = list(set(df.columns) - {'date'})
tickers

['NVTK', 'ROSN', 'SNGSP', 'LKOH', 'TATN', 'SNGS', 'TATNP', 'TRNFP', 'GAZP']

### 1.2.2. Анализ дневных скачков

In [None]:
def delta(X):
  "Относительные изменения от элемента к элементу"
  res = [0]
  for i in range(1, len(X)):
    res.append((X[i] - X[i-1])/X[i-1] *100)
  return res

In [None]:
grad = df[tickers].apply(delta)
px.imshow(grad.groupby(df.date.dt.year).max())\
  .update_layout(title='Максимальные дневные роста')\
  .show()

In [None]:
px.imshow(grad.groupby(df.date.dt.year).min())\
  .update_layout(title='Максимальные дневные падения')\
  .show()

### 1.2.3. Визуализация

На всех графиках котировок можно:


*   Масштабировть, выделяя прямоугольники мышью
*   Двойной щелчок, чтобы вернуться к первоначальному масштабу
*   Двойной щелчок по тикеру справа позволит рассмотреть только график,
    относящийся к данному тикеру



In [None]:
fig = px.line(df.sort_values(by='date'), x="date", y=tickers,
              title='Цены закрытия')
fig.update_layout(xaxis_title='Дата', yaxis_title='Цена закрытия')
fig.show()

### 1.2.4. Расчет логарифмической доходности

In [None]:
def to_log(X):
  res = [0]
  for i in range(1, len(X)):
    res.append(np.log(X[i] / X[i-1]))
  return res

In [None]:
log_df = df.apply(lambda x: to_log(x) if x.name in tickers else x)

In [None]:
fig = px.line(log_df, x="date", y=tickers,
              title='Динамика дневной логарифмической доходности')
fig.update_layout(xaxis_title='Дата', yaxis_title='Лог. доходность')
fig.show()

# 2. Реализации критериев

## 2.1. Критерий 𝜒²

In [None]:
def p_val(n, chi_val, estimated=2):
  """ n - число степеней свободы
      chi_val - полученное значение статистики хи-квадрат
      estimated - число оцененных параметров.
        по умолчанию =2, т.к. в случае проверки нормальности, 
        данные предварительно центрируются:
        вычитается выборочное среднее и выборка делится на
        свое стандартное отколнение"""

  return st.chi2(n - estimated - 1).sf(chi_val)

def chi2val(n, alpha=.05, estimated=2):
  """ n - число степеней свободы
      alpha - уровень значимости
      estimated - число оцененных параметров.
        по умолчанию =2, т.к. в случае проверки нормальности, 
        данные предварительно центрируются:
        вычитается выборочное среднее и выборка делится на
        свое стандартное отколнение"""
  return st.chi2(n - estimated -1).isf(alpha)
  
assert chi2val(42, .42) == 40.127881667399166
assert p_val(42, 40.127881667399166) == 0.4200000000000004
assert chi2val(10) == 14.067140449340167
#тесты


In [None]:
def center(X):
  X = np.array(X)
  return (X - np.mean(X))/np.std(X)

In [None]:
def segmentate(X, bins=15):
  """ Делим выборку на bins=15 сегментов.
      Первый сегмент = (-oo; Q1 - 1.5 IQR)
      Последний = (Q3 + 1.5 IQR; +oo)"""
      
  Q1 = np.quantile(X, .25)
  Q3 = np.quantile(X, .75)
  IQR = Q3 - Q1

  return np.linspace(Q1 - 1.5*IQR, Q3 + 1.5*IQR, bins - 1)

In [None]:
def form_bins(X, *, bounds=None, bins=None, getBounds=False):
  """По полученным границам отрезков функция подсчитывает число точек,
     попавших в каждый из них"""

  if not (bounds is None) ^ (bins is None):
    raise SyntaxError("Должно быть указано что-то одно: число отрезков или границы")

  if bounds is None:
    bounds = segmentate(X, bins)

  res = [sum(X < bounds[0])]
  # Первый и последний сегмент - особые случаи. Они не ограничены с одной из сторон

  for i in range(1, len(bounds)):
    # сегменты посередине имеют обе конечные границы
    res.append(sum((X >= bounds[i-1]) * (X < bounds[i])))
  
  res.append(sum(X >= bounds[-1]))
  
  if getBounds: return np.array(res), bounds
  return np.array(res)

In [None]:
def Chi2(X, *, bins=10):

  X = np.array(X)
  X = center(X)

  assert len(X) > 50

  chi2 = 0
 
  counts, bounds = form_bins(X, bins=bins, getBounds=True)

  cdf = lambda x: st.norm().cdf(x)
  sf = lambda x: st.norm().sf(x)

  chi2 = (counts[0] - cdf(bounds[0]) * len(X))**2 / (cdf(bounds[0]) * len(X))\
        +(counts[-1] - sf(bounds[-1])* len(X))**2 / (sf(bounds[-1]) * len(X))

  for i in range(bins - 2):
    expected = (sf(bounds[i]) - sf(bounds[i+1])) * len(X)
    chi2 += (counts[i+1] - expected)**2 / expected

  return chi2, p_val(bins, chi2)

In [None]:
def Chi2_test(X, *, bins=10, alpha=.05, show_values=False):

  chi, p_value = Chi2(X=X, bins=bins)
  crit_value = chi2val(bins, alpha=alpha)

  if show_values:
    print(f'Статистика критерия: {chi:.4f}',
          f'Выборка была разделена на {bins} сегментов',
          f'P-value: {p_value:.4f}',
          f'Критическое значение для α = {alpha}: {crit_value:.4f}',
          sep='\n')

  return chi <= crit_value

## 2.2. Критерий Колмогорова-Смирнова. Критерий Лиллиефорса

In [None]:
@np.vectorize
def K_cdf(x, max_k=100):

  if x < .2:
    return 0.

  elif x > 2.2:
    return 1.

  k = np.arange(-max_k, max_k + 1)

  return sum((-1.)**k * np.exp(-2 * k**2 * x**2))

@np.vectorize
def K_sf(x, max_k=100):
  return 1 - K_cdf(x, max_k)
  
@np.vectorize
@lru_cache(maxsize=5)
def K_isf(alpha, max_k=100):
  return minimize(lambda x: (K_sf(x, max_k) - alpha)**2,
                  x0=1.5,
                  method='Nelder-Mead').x

In [None]:
x = np.linspace(0, 2, 10**3)

fig = px.line(
    pd.DataFrame(np.transpose([K_sf(x), kolmogorov(x), K_cdf(x)]),
                 index=x,
                 columns=['My SF', 'scipy.stats SF', 'My CDF']))
fig.update_layout(xaxis_title='x')
fig.show()

In [None]:
fig = px.line(x=x, y=np.gradient(K_cdf(x)))
fig.update_layout(title='Kolmogorov distribution pdf')
fig.show()

In [None]:
t = np.linspace(0, 1, 10**3)
px.line(x=t, y=K_isf(t))

In [None]:
def KS_stat(X, *, F=st.norm().cdf):
  X = np.array(sorted(X))
  n = len(X)
  
  D_upper = max([i/n - F(X[i-1]) for i in range(1, len(X)+1)])
  D_lower = max([F(X[i-1]) - (i-1)/n for i in range(1, len(X)+1)])

  D = max(D_upper, D_lower)

  return D

In [None]:
def L_test(X, *, alpha=.05, show_values=False):
  n = len(X)
  mean, std = X.mean(), X.std()
  stat = KS_stat(X, F=st.norm(mean, std).cdf)

  crit_values = {.15: .775, .1: .819, .05: .895, .03: .955, .01: 1.035}
  assert alpha in crit_values, "Нет табличного значения для заданного alpha!"
  crit_val = crit_values[alpha]

  val = stat * (n**.5 - .01 + .85*n**-.5)

  if show_values:
    print(f'Статистика КС: {stat:.4f}',
          f'Статистика критерия: {val:.4f}',
          f'Критическое значение для α = {alpha}: {crit_val:.4f}',
          sep='\n')
    
  return val <= crit_val

Построим распределение Лиллиефорса с помощью моделирования

In [None]:
times = 10000
lil = []
for _ in range(times):
  X = st.norm(10, 5).rvs(size=250)
  mean, std = X.mean(), X.std()
  lil.append(KS_stat(X, F=st.norm(mean, std).cdf))
lil = np.array(lil)

10000

In [None]:
fig = px.histogram(lil* 250**.5, histnorm='probability')
fig.show()

## 2.3. Критерий Харке-Бера

Статистика критерия распределена по закону 𝜒² с двумя степенями свободы. Ниже приведены функции для рассчета критических значений и *p*-значений

In [None]:
@np.vectorize
@lru_cache(maxsize=5)
def JB_chi_val(alpha):
  return st.chi2(2).isf(alpha)

def JB_p_val(chi_val):
  return st.chi2(2).sf(chi_val)

assert JB_chi_val(0.05) == 5.991464547107983
assert float(JB_p_val(9)) == 0.011108996538242308

Рассчитаем критические значения для каждого α в списке

In [None]:
JB_chi_val([.1, .05, .03, .01])

array([4.60517019, 5.99146455, 7.01311579, 9.21034037])

In [None]:
def S(X):
  Y = X - np.mean(X)
  return np.mean(Y**3) / np.mean(Y**2)**1.5

def K(X):
  Y = X - np.mean(X)
  return np.mean(Y**4) / np.mean(Y**2)**2


def JB(X, use_stats=False):

  if use_stats:
    s = st.skew(X)
    k = st.kurtosis(X) + 3
  
  else:
    s, k = S(X), K(X)

  n = len(X)

  stat = (n / 6) * (s**2 + (k - 3)**2 / 4)


  return stat, float(JB_p_val(stat))
  

In [None]:
def JB_test(X, *, alpha=.05, show_values=False):
  
  stat, p_value =  JB(X)
  crit_val = JB_chi_val(alpha)

  if show_values:
    print(f'Статистика критерия: {stat:.4f}',
          f'P-value: {p_value:.4f}',
          f'Критическое значение для α = {alpha}: {JB_chi_val(alpha):.4f}',
          sep='\n')

  return stat <= crit_val

Сравним по времени работу моей реализации с реализацией в библиотеке scipy.stats. Значения, возвращаемые функцией идентичны

In [None]:
X = st.norm(12, 3).rvs(size=10**6)

In [None]:
%time JB(X)

CPU times: user 196 ms, sys: 1.9 ms, total: 198 ms
Wall time: 201 ms


(1.445920624092368, 0.48531344810400323)

In [None]:
%time st.jarque_bera(X)

CPU times: user 195 ms, sys: 2.82 ms, total: 198 ms
Wall time: 202 ms


(1.445920624092368, 0.48531344810400323)

# 3. Практическая часть

## 3.1. Модельные данные

Рассчитаем квантили распределений критических значений

### Chi-2

In [None]:
# Рассчитаем статистику критерия для 10000 выборок
times = 10000
chi = []
chi_pv = []
for _ in range(times):
  X = st.norm(10, 5).rvs(size=250)
  stat = Chi2(X)
  chi.append(stat[0])
  chi_pv.append(stat[1])

chi = np.array(chi)
chi_pv = np.array(chi_pv)

In [None]:
px.histogram(chi_pv, histnorm='probability').update_layout(title='Распределение p-значений')

In [None]:
px.histogram(chi, histnorm='probability').update_layout(title='Распределение статистики критерия')

### Jarque-Bera

In [None]:
# Харке-Бера
times = 10000
jb, jb_pv = [], []
for _ in range(times):
  X = st.norm(10, 5).rvs(size=250)
  stat = JB(X)
  jb.append(stat[0])
  jb_pv.append(stat[1])
jb = np.array(jb)
jb_pv = np.array(jb_pv)

In [None]:
px.histogram(jb_pv, histnorm='probability').update_layout(title='Распределение p-значений')

In [None]:
px.histogram(jb, histnorm='probability').update_layout(title='Распределение статистики критерия')

### Lilliefors

In [None]:
# Воспользуемся данными, сгенерированными в п. 2.2. для рассчета квантилей распределения Лиллиефорса
# Статистики критерия были рассчитаны для выборок размером 250. Рассчитаем статистики с поправкой из модификации критерия
n = 250
lil_vals = lil * (n**.5 - .01 + .85*n**-.5)
qs = np.arange(.001, 1, .001)
lil_quantiles = np.quantile(lil_vals, qs)
lil_q_df = pd.DataFrame(lil_quantiles, index=qs, columns=['Значение'])
lil_q_df

Unnamed: 0,Значение
0.001,0.317781
0.002,0.333545
0.003,0.345937
0.004,0.350251
0.005,0.352957
...,...
0.995,1.072070
0.996,1.078233
0.997,1.082372
0.998,1.110926


In [None]:
# Короткая версия
short_qs = np.array([.5, .75, .8, .85, .9, .95, .97, .98, .99])
lil_qs_short = np.quantile(lil_vals, short_qs)
lil_q_short = pd.DataFrame(lil_qs_short, index=short_qs, columns=['Значение'])
lil_q_short

Unnamed: 0,Значение
0.5,0.616106
0.75,0.717408
0.8,0.741648
0.85,0.778745
0.9,0.825842
0.95,0.892885
0.97,0.952172
0.98,0.992478
0.99,1.031681


In [None]:
#сохраним обе версии
lil_q_df.to_excel('quantiles.xlsx')
lil_q_short.to_excel('qs_short.xlsx')

## 3.2. Оценка мощности

In [None]:
def bench(test, *, distr=st.norm(), size=250, times=200, target=True, alpha=.05, **kwargs):
  count = 0
  Xs = distr.rvs(size=(times, size))

  for X in Xs:
    count += test(X, alpha=alpha, **kwargs)
  
  accuracy = count/times
  accuracy = accuracy if target else 1. - accuracy

  return accuracy

In [None]:
if not input('Внимание! следующие функции работают долго.\n'
             'Для того, чтобы продолжить введите любой символ: '):
  raise KeyboardInterrupt

Внимание! следующие функции работают долго.
Для того, чтобы продолжить введите любой символ: e


In [None]:
distrs = [st.norm(31, 41), st.cauchy(), st.chi2(10), st.t(5), st.t(15)]
bins = [10, 20]
size = 250
times = 1000
alpha = .05
bench_results= []

bench_results += [[bench(L_test, distr=distr, size=size, times=times, alpha=alpha) for distr in distrs]]

bench_results += [[bench(Chi2_test, distr=distr, size=size, times=times, bins=b, alpha=alpha) for distr in distrs] for b in bins]

bench_results += [[bench(JB_test, distr=distr, size=size, times=times, alpha=alpha) for distr in distrs]]

In [None]:
bench_results = pd.DataFrame(bench_results,
                             columns=['N(31, 41)', 'Cauchy', 'Chi2(10)', 't(5)', 't(15)'],
                             index=['Lilliefors', 'Chi-2 (10 bins)', 'Chi-2 (20 bins)', 'Jarque-Bera'])

bench_results[['Cauchy', 'Chi2(10)', 't(5)', 't(15)']] = 1 - bench_results[['Cauchy', 'Chi2(10)', 't(5)', 't(15)']]
bench_results 

Unnamed: 0,"N(31, 41)",Cauchy,Chi2(10),t(5),t(15)
Lilliefors,0.938,1.0,0.948,0.619,0.087
Chi-2 (10 bins),0.956,1.0,0.966,0.526,0.1
Chi-2 (20 bins),0.966,1.0,0.917,0.326,0.077
Jarque-Bera,0.961,1.0,1.0,0.905,0.298


In [None]:
# сохранить для вставки в работу
bench_results.to_excel('pow.xlsx')

## 3.3. Реальные данные

Для каждого тикера и каждого года найду *p*-значения критерия χ²

In [None]:
def apply_by_years(df, func, index=1, **kwargs):
  res_df = []
  for year in set(df.date.dt.year):
    cur = []
    for ticker in tickers:
      if index is None:
        cur.append(func(df.where(df.date.dt.year == year)[ticker].dropna(), **kwargs))
      else:
        cur.append(func(df.where(df.date.dt.year == year)[ticker].dropna(), **kwargs)[index])
        
    res_df.append(cur)
  return pd.DataFrame(res_df, columns=tickers)

### Chi-2

In [None]:
chisquare_df = apply_by_years(log_df, Chi2, bins=20)
chisquare_df

In [None]:
px.imshow(chisquare_df,
          y=sorted(list(set(df.date.dt.year))))

Двойным кликом по названию тикера справа можно отобразить данные по определенному тикеру

In [None]:
fig = px.histogram(chisquare_df, range_x=[0., 1.], nbins=10)
fig.update_layout(title='Гистограмма p-value Chi2',
                  xaxis_title='p-value',
                  yaxis_title='кол-во')
fig.show()

### Lilliefors

In [None]:
lil_df = apply_by_years(log_df, L_test, index=None)
lil_df.index = range(2010, 2020)
px.imshow(lil_df)

### Jarque-Bera

In [None]:
jb_df = apply_by_years(log_df, JB)
jb_df

In [None]:
px.imshow(jb_df,
          y=sorted(list(set(log_df.date.dt.year))))