# Bведение

Данный ноутбук основан на посте https://quantrum.me/875-parnyj-trejding-opisanie-strategii-na-python/

Стратегия парного трейдинга очень популярна на рынке. Она основана на чистой статистике, что делает ее привлекательной для алгоритмической торговли. Общий смысл сводится к нескольким шагам: найти пару, проверить ее поведение, определить границы входа в позицию и направление (лонг/шорт).

Пары ищут с помощью корреляции, но корреляция в чистом виде может сослужить плохую службу. Спред пар должен быть стационарным и обладать коинтегрированностью.

В статье рассмотрены:

Введение в корреляцию/коинтеграцию на простом примере.
Корреляция без коинтеграции.
Коинтеграция без корреляции.

Основные понятия
Временной ряд — статистические данные исследуемого процесса собранные в разные моменты времени.

Корреляция — статистическая взаимосвязь двух и более случайных величин. В нашем случае временных рядов.

Коинтеграция — свойство нескольких нестационарных временных рядов, заключающееся в существовании некоторой их стационарной линейной комбинации.

Стационарность — свойство процесса не менять свои характеристики со временем.

P-значение — величина используемая при тестировании статистических гипотез.

Простыми словами: две акции будут коинтегрированы, когда спред разницы их истории цен будет находиться в пределах фиксированных границ и не будет обладать трендом.

In [None]:
## Пример на сгенерированных данных

Для начала сгенерируем два датасета.
Второй - это то же самое что и первый, сдвинутый на некоторую величину и с добавленным случайным шумом.


In [None]:
import numpy as np
import pandas as pd
import statsmodels
from statsmodels.tsa.stattools import coint,adfuller
import seaborn
import matplotlib.pyplot as plt

 
# устанавливаем зерно для повторимости случайных чисел
np.random.seed(107)
 
X_returns = np.random.normal(0, 1, 100) # генерируем историю доходности
X = pd.Series(np.cumsum(X_returns), name='X') + 50 # суммируем и смещаем на произвольную величину
 
some_noise = np.random.normal(0, 1, 100) # немного шума для второго ряда
Y = X + 5 + some_noise
Y.name = 'Y'
pd.concat([X, Y], axis=1).plot() # рисуем оба ряда


In [None]:
(Y-X).plot() # рисуем спрэд
plt.axhline((Y-X).mean(), color='red', linestyle='--') # добавляем среднее

Проверив значения коинтеграции и корреляции, видим что спред пары стационарен, p-значение рядом с нулем, и имеет высокую корреляцию около 95%. Код для получения значений оценки ниже:

In [None]:

def check_coint_corr(X, Y):
    # проверим стационарность двух рядов
    score, pvalue, _ = coint(X,Y)
    # проверим корреляцию двух рядов
    corr = X.corr(Y)
    print("p-значение: %.4f" % pvalue, "корреляция: %s" % corr)
check_coint_corr(X, Y)

Дополнительно проведем тест Дики-Фуллера:
    Он показывает, есть ли  зависимость от времени (тренд) внутри самой серии.
    Если серия стационарна, полученное значение должно быть меньше чем critical value  (1% 5% или 10%- выбирать вам)

In [None]:
def dft(X):
    """
    X- pandas series.
    """
    result = adfuller(X)
    print(f'Checking {X.name} for stationarity')
    score = result[0]
    pvalue = result[1]
    crit = result[4]
    print(f'score is {score}')
    print (f'p-value is {pvalue}')
    print (f'critical values are {crit}')

In [None]:
dft(X)


В данном случае мы не можем сказать, что серия стационарна (score > critical value для любого доверительного интервала).

Теперь проверим разницу X-Y:


In [None]:
dft(X-Y)


В данном случае (как и ожидалось) разница двух рядов стационарна.

Однако, есть случаи, когда ряды имеют высокую корреляцию без коинтеграции. Пара с таким свойством не подходит для парного трейдинга. Для изучения сформируем два независимых произвольных времянных ряда:

In [None]:
X_returns = np.random.normal(1, 1, 100)
Y_returns = np.random.normal(2, 1, 100)
X_diverging = pd.Series(np.cumsum(X_returns), name='X')
Y_diverging = pd.Series(np.cumsum(Y_returns), name='Y')
pd.concat([X_diverging, Y_diverging], axis=1).plot()
 
check_coint_corr(X_diverging, Y_diverging)

И нарисуем спред:

In [None]:
(Y_diverging-X_diverging).plot() # рисуем спрэд
plt.axhline((Y_diverging-X_diverging).mean(), color='red', linestyle='--') # добавляем среднее

Видим, что оба ряда имеют высокую корреляцию около 99% и проваливают тест на коинтеграцию с p-значением 0.881 (рядом с единицей). Мы видим, что спред имеет тенденцию роста со временем.

Так же проведем тест Дики-фуллера на одной из переменных и на их разнице:

In [None]:
dft(X_diverging)


Серия не стационарна

In [None]:
dft(Y_diverging-X_diverging)


То же самое можно сказать о разнице: если мы сравним с доверительным интервалом хотя бы 5% ( это означает что есть 5% вероятность того что полученные результаты случайны)- увидим что разница имеет тренд- что опять же не есть хорошо.

## Реальный пример

В данном примере мы возьмем две случайные акции, и проведем такой же визуальный анализ как показано выше.

In [None]:
#importing the libraries
import pandas as pd
import sys
sys.path.append('../src/data')
import ClickhouseHelper as chh

Скачаем минутные данные по ETF с начала 2020 года:

    
Кстати, ETF я использую исключительно для примера- размер получаемого датафрейма самый маленький.
В дальнейшем подобный подход будем использовать на акциях.

In [None]:
df=chh.query_data_by_time(
    channels_list=[],
    startTime=None,
    endTime=None,
    days_span=360,    
    server_ip="192.168.1.128",
    table_name="minutes",
    instrument_type="Stock",
    data_freq="min",
)

In [None]:
df.shape

In [None]:
df.columns

In [None]:
df.tail()

In [None]:
# отфильтруем только по рублевым ETF
df=df[df.currency=='RUB']

In [None]:
# опять же для примера- нарисуем графики ETF vs Time

In [None]:
fig, ax = plt.subplots(figsize=(15,15))

for n, grp in df.groupby('ticker'):
    ax.scatter(x = "day", y = "c", data=grp, label=n)
ax.legend(title="Label")

plt.show()

Видим, что по коричневой и красным кривым ( FXUK, FXWO данные почему то скачались только до середины февраля.)

In [None]:
#Проверим длину ( количество значений) по каждому из ETF.
#Для анализа критичным является то, что при сравнении и расчете коинтеграции и корелляции длина временных рядов должна быть одинаковой.
all_lengths=[]
for etf in df.ticker.unique():
    print(f'shape of ETF {etf} {df[df.ticker==etf].name.unique()[0]} is: {df[df.ticker==etf].shape[0]}')
    
    all_lengths.append(df[df.ticker==etf].shape[0])

In [None]:
length_limit = int(np.percentile(all_lengths, 10))

In [None]:
length_limit

In [None]:
#keeping only tickers with len>len_limit:
for etf in df.ticker.unique():
    if df[df.ticker==etf].shape[0]<=length_limit:
        df = df[df.ticker != etf]


In [None]:
def get_performance(a):
    """
    Конвертируем историю в относительные величины
    
    """
    a=a.values
    return np.insert(np.cumsum(np.diff(a) / a[:-1] * 100.), 0, 0)

In [None]:
#Готовим данные для анализа: рассчитываем кумулятивную сумму дельт ( в % роста или падения от предыдущего дня)
df['c_cumsum']=df.groupby('ticker')['c'].transform(get_performance)
df['o_cumsum']=df.groupby('ticker')['o'].transform(get_performance)


In [None]:
# проверяем коинтеграцию и удаляем стационарные ряды
for ticker in df.ticker.unique():
    result = adfuller(df[df.ticker==ticker].c_cumsum.values)
    score = result[0]
    pvalue = result[1]
    crit = result[4]
    if score<crit['1%']: 
        print(f'ticker {ticker} is cointegrated. Deleting it.')
        df=df[df.ticker!=ticker]

In [None]:


for n, grp in df.groupby('ticker'):
    fig, ax = plt.subplots(figsize=(8,2))
    ax.scatter(x = "day", y = "c_cumsum", data=grp, label=n)
    ax.legend(title="Label")
    plt.show()



In [None]:
#let's pivot table for simplicity
c_cumsum=df.pivot(index='day',columns='ticker',values='c_cumsum')
o_cumsum=df.pivot(index='day',columns='ticker',values='o_cumsum')

Следующий шаг- выберем случайно две колонки и построим их значения, так же значения их разницы.


In [None]:
c_cumsum.columns

In [None]:
ABRD=c_cumsum.ABRD
AFKS=c_cumsum.AFKS
pd.concat([ABRD, AFKS], axis=1).plot()
 
check_coint_corr(ABRD, AFKS)

plt.show()
plt.figure()
(ABRD-AFKS).plot() # рисуем спрэд
plt.axhline((ABRD-AFKS).mean(), color='red', linestyle='--') # добавляем среднее
plt.show()

Видим, что оба ряда имеют низкую корреляцию около 10% и проваливают тест на коинтеграцию с p-значением 0.91 (рядом с единицей). Мы видим, что спред имеет временную зависимость.

Проведем тест Дики-Фуллера:
    

In [None]:
dft(ABRD)

Серия не стационарна

In [None]:
dft(ABRD-AFKS)

Аналогично, данная серия не стационарна ( на горизонте 12-ти месяцев, и использовать ее нельзя)

In [None]:
## Автоматический подбор пар

Ну а сейчас - немного мяса. 
Давайте поищем пары среди ETF для парной торговли.


Данный тест проверяет времянной ряд (историю изменения цены) на стационарность (наличие коинтеграции). Осуществляется проверка наличия у времянного ряда единичного корня, о чем подробнее написано в Вики. Реализован в библиотеке statsmodels.

Функция проверки стационарности:
[code python]statsmodels.tsa.stattools.adfuller(X)[/code]

Выбираем пары с оценкой ниже 5% критического порога и p-значением меньше 0,001. Код поиска пар ниже:

In [None]:
c_cumsum.columns

In [None]:
c_cumsum.head()

In [None]:
c_cumsum.shape

In [None]:
c_cumsum.isnull().sum()

In [None]:
c_cumsum.dropna(axis=0, how='any', inplace=True)

In [None]:
c_cumsum.shape

In [None]:
n = len(c_cumsum.columns)
score_matrix = np.zeros((n, n))
pvalue_matrix = np.ones((n, n))

symbols = c_cumsum.columns.to_list()
pairs = []

In [None]:
symbols

In [None]:
for i in range(n):
        for j in range(i+1, n):
            S1 = c_cumsum[symbols[i]]
            S2 = c_cumsum[symbols[j]]
            
            # подготавливаем ряды, если надо
            #if need_preparation:
            #   S1, S2 = prepare_vectors(S1, S2, to_performance=True)
                
            # проверяем коинтеграцию
            print(f"comparing {S1.name} and {S2.name}")
            if S1.shape[0]==S2.shape[0]:
                result = adfuller(S1-S2)
                # заполняем матрицы значений
                score = result[0]
                pvalue = result[1]
                crit = result[4]
                score_matrix[i, j] = score
                pvalue_matrix[i, j] = pvalue
                # добавляем пары без единичных корней с p-значением менее 0.001
                if score < crit['5%'] and pvalue < 0.001:
                    pairs.append((symbols[i], symbols[j], pvalue))



In [None]:
pairs

In [None]:
# сортируем пары по возрастанию p-значения
import operator
sorted_pairs = sorted(pairs, key=operator.itemgetter(2))

In [None]:
sorted_pairs

In [None]:
#Давайте теперь сравним парочку значений более детально:
MGNT=c_cumsum.MGNT
TRMK=c_cumsum.TRMK
pd.concat([MGNT, TRMK], axis=1).plot()
 
check_coint_corr(MGNT, TRMK)

plt.show()
plt.figure()
(MGNT-TRMK).plot() # рисуем спрэд
plt.axhline((MGNT-TRMK).mean(), color='red', linestyle='--') # добавляем среднее
plt.show()

In [None]:
def find_cointegrated_pairs_adf(df, need_preparation=False):
    # готовим матрицы для сбора оценок и p-значений
    n = len(df.columns)
    score_matrix = np.zeros((n, n))
    pvalue_matrix = np.ones((n, n))
    
    symbols = df.columns.to_list()
    pairs = []
    
    for i in range(n):
        for j in range(i+1, n):
            S1 = symbol_prices[symbols[i]]
            S2 = symbol_prices[symbols[j]]
            
            # подготавливаем ряды, если надо
            if need_preparation:
                S1, S2 = prepare_vectors(S1, S2, to_performance=True)
                
            # проверяем коинтеграцию
            result = adfuller(S1-S2)
            # заполняем матрицы значений
            score = result[0]
            pvalue = result[1]
            crit = result[4]
            score_matrix[i, j] = score
            pvalue_matrix[i, j] = pvalue
            # добавляем пары без единичных корней с p-значением менее 0.001
            if score < crit['5%'] and pvalue < 0.001:
                pairs.append((symbols[i], symbols[j], pvalue))
                
    # сортируем пары по возрастанию p-значения
    sorted_pairs = sorted(pairs, key=operator.itemgetter(2))
    return score_matrix, pvalue_matrix, sorted_pairs