
```
-- 数据查询
-- 根据日期（单位：月）查询可以交易的期货
-- 逻辑：交易开始时间-截止时间在对应月份
select
    Name, Date(ftdate) as start, Date(lasttrade_date) as end
from future
where Name like 'IH%' and toYYYYMM(Date('2021-12-01')) between toYYYYMM(ftdate) and toYYYYMM(lasttrade_date)
order by Name;

-- 查询可交易的期权
-- 逻辑：
--   1. 交易开始时间-截止时间在对应月份的所有期权
--   2. 期权根据认购、认沽并同一价格形成 pair
--   3. pair: C:认购价格 P:认沽价格 K:行权价格 Pair:认购后4位-认沽后4位 begin、end：开始结束时间
CREATE temporary TABLE opt AS
    SELECT Name,
       startdate,
       lasttradingdate,
       concat(splitByChar('.', Code)[2], splitByChar('.', Code)[1]) AS code,
       toInt64(extract(Name, '月(\d+)')) AS price,
       IF(match(Name, '沽'), -toInt64(extract(Name, '月(\d+)')), toInt64(extract(Name, '月(\d+)'))) AS directionprice
       FROM option WHERE Name LIKE '50ETF%'
       AND toYYYYMM(Date('2021-12-01')) between toYYYYMM(Date(startdate)) and toYYYYMM(Date(lasttradingdate))
    ORDER BY code;;

select
    code as C, b.code as P, price as K,
    concat(substring(code, 7, 4), '-', substring(b.code, 7, 4)) as Pair,
    date(a.startdate) as begin, date(a.lasttradingdate) as end
from
    (select * from opt where directionprice > 0) a
    join
    (select * from opt where directionprice < 0) b
on a.startdate = b.startdate and a.lasttradingdate = b.lasttradingdate
and a.price = b.price
order by K limit 30
```

In [50]:
import pandas as pd
import pymysql
from clickhouse_driver import Client


class pickup_db(object):
    def __init__(self, storage):
        if storage == "mysql":
            self.client = pymysql.connect(host='127.0.0.1', user='root', password='root', database='stock_arbitrage',
                                          autocommit=True)
            self.cursor = self.client.cursor()
        elif storage == "clickhouse":
            self.client = Client('127.0.0.1', database='stock_arbitrage')
            self.cursor = self.client
        else:
            raise Exception("error storage")

    def __enter__(self):
        return self.cursor

    def __exit__(self, type, value, traceback):
        try:
            self.client.close()
        except Exception as e:
            pass


# load price
# 读取类型type，指定codes，从 start 至 end 的价格
# 读取价格的字段为 field， 默认读取 Price
def load(type, codes, start, end, field="Price"):
    types_table = {
        "stock": "sh_sz_bin",
        "future": "cfe",
        "option": "ashr_option",
    }
    with pickup_db("clickhouse") as db:
        sql = "SELECT Code, toStartOfDay(toDateTime(Time)) as Time, avg({field}) as Price, avg(Volume) as Volume, avg(Amount) as Amount"" FROM {table} WHERE Code in ({codes}) AND Time between '{start}' and '{end}' ""group by Code, Time  order by Code, Time ".format(
            table=types_table[type],
            codes=",".join(list("'%s'" % c for c in codes)),
            start=start,
            end=end,
            field=field,
        )
        rows = db.execute(sql)
        data = []
        for row in rows:
            data.append([row[0], row[1], row[2], row[3], row[4]])

        columns = ['Code', 'Time', 'Price', 'Volume', 'Amount']
        df = pd.DataFrame(data, columns=columns)
        df = df.set_index("Time")
        return df


def load_options():
    with pickup_db("clickhouse") as db:
        sql = """
-- 查询可交易的期权
CREATE temporary TABLE opt AS
    SELECT Name,
       startdate,
       lasttradingdate,
       concat(splitByChar('.', Code)[2], splitByChar('.', Code)[1]) AS code,
       toInt64(extract(Name, '月(\d+)')) AS price,
       IF(match(Name, '沽'), -toInt64(extract(Name, '月(\d+)')), toInt64(extract(Name, '月(\d+)'))) AS directionprice
       FROM option WHERE Name LIKE '50ETF%'
       AND toYYYYMM(Date('2021-12-01')) between toYYYYMM(Date(startdate)) and toYYYYMM(Date(lasttradingdate))
    ORDER BY code;;

select
    code as C, b.code as P, price as K,
    concat(substring(code, 7, 4), '-', substring(b.code, 7, 4)) as Pair,
    toYYYYMMDD(date(a.startdate)) as begin, toYYYYMMDD(date(a.lasttradingdate)) as end
from
    (select * from opt where directionprice > 0) a
    join
    (select * from opt where directionprice < 0) b
on a.startdate = b.startdate and a.lasttradingdate = b.lasttradingdate
and a.price = b.price
order by K
"""
        for s in sql.split(";;"):
            rows = db.execute(s)
            data = []
            for row in rows:
                # 0: 认购  1：认沽  2：行权价  3：pair 名称 4:start 5:end
                data.append([row[0], row[1], row[2], row[3], row[4], row[5]])
        return data


# 50ETF_options → 50ETF → 上证50股指期货（IH）returns
start, end = "2021-12-01 00:00:00", "2021-12-31 23:59:59"
# 0: 卖期货 买期权COPY
# 1： 买期货 卖期权COPY
reverse_arbitrage_direction = 0

# etf 价格数据
stock_50etf = load("stock", ["SH510050"], start, end)
# 50 index 价格数据
stock_50index = load("stock", ["SH000016"], start, end)
# 期货 价格数据
# 因为是卖期货，所以取买价 BP1
future_names = ['IH2112', 'IH2201', 'IH2202', 'IH2203', 'IH2206']
if reverse_arbitrage_direction == 0:
    stock_50future = load("future", future_names, start, end, "BP1")
else:
    stock_50future = load("future", future_names, start, end, "SP1")

# 2021.12月期间内所有的期权
options = load_options()

if reverse_arbitrage_direction == 0:
    # 期权价格数据
    # 买入认购使用卖价平均 卖出认购使用买价格平均
    stock_50option_0 = load("option", [opt[0] for opt in options], start, end, '(SP1 + SP2 + SP3 + SP4 + SP5) / 5')
    stock_50option_1 = load("option", [opt[1] for opt in options], start, end, '(BP1 + BP2 + BP3 + BP4 + BP5) / 5')
    stock_50option = stock_50option_0.append(stock_50option_1)
else:
    stock_50option_0 = load("option", [opt[0] for opt in options], start, end, '(BP1 + BP2 + BP3 + BP4 + BP5) / 5')
    stock_50option_1 = load("option", [opt[1] for opt in options], start, end, '(SP1 + SP2 + SP3 + SP4 + SP5) / 5')
    stock_50option = stock_50option_0.append(stock_50option_1)

# 讲期权价格pair 并按照天级别时序进行对齐
freq = '1d'
time = pd.date_range(pd.Timestamp(start), pd.Timestamp(end), freq=freq)
stock_50options = pd.DataFrame({'Tm': time})
stock_50options = stock_50options.set_index("Tm")

for opt in options:
    for i in range(0, 2):
        right = stock_50option[stock_50option['Code'] == opt[i]]['Price']
        right = right[~right.index.duplicated(keep='first')]
        stock_50options = stock_50options.join(right)
        stock_50options.rename(columns={"Price": opt[i]}, inplace=True)
        stock_50options.reset_index(drop=True)

stock_50options.dropna(axis=0, how='all', inplace=True)
for i, opt in enumerate(options):
    if reverse_arbitrage_direction == 0:
        stock_50options['50etf_copy_%s.%d.%d' % (options[i][3], options[i][2], options[i][5])] = stock_50options[options[i][0]] - \
                                                                           stock_50options[options[i][1]]
    else:
        stock_50options['50etf_copy_%s.%d.%d' % (options[i][3], options[i][2], options[i][5])] = stock_50options[options[i][1]] - stock_50options[options[i][0]]

# print(stock_50options)

In [51]:
import numpy as np

# 上一步骤得到了几个标的（）的基本价格
#     stock_50options 本月所有可交易的期权
#     stock_50future 包括 future_names 的期货
#     stock_50etf  50etf 价格
#     stock_50index 50index  价格
# 按照天时间对齐后，组合起来
# 并通过 50index 价格得到 50index_returns ，利用returns 换算 50etf 和 50etf_copy_xxx 的价格
# 50etf = 50etf * (1 + 50index_returns) * 1000
# 50etf_copy_xxx = 50etf_copy_xxx * (1 + 50index_returns) * 1000 + pair_K(期权的行权价)
# 最终价格汇集进入 stocks_price

time = pd.date_range(pd.Timestamp(start), pd.Timestamp(end), freq=freq)
stocks_price = pd.DataFrame({'Tm': time})
stocks_price = stocks_price.set_index("Tm")

for name in future_names:
    right = stock_50future[stock_50future['Code'] == name]['Price']
    right = right[~right.index.duplicated(keep='first')]
    stocks_price = stocks_price.join(right)
    stocks_price.rename(columns={"Price": name}, inplace=True)
    stocks_price.reset_index(drop=True)

right = stock_50index['Price']
right = right[~right.index.duplicated(keep='first')]
stocks_price = stocks_price.join(right)
stocks_price.rename(columns={"Price": "50index"}, inplace=True)

right = stock_50etf['Price']
right = right[~right.index.duplicated(keep='first')]
stocks_price = stocks_price.join(right)
stocks_price.rename(columns={"Price": "50etf"}, inplace=True)

for i, opt in enumerate(options):
    right = stock_50options['50etf_copy_%s.%d.%d' % (opt[3], opt[2], opt[5])]
    right = right[~right.index.duplicated(keep='first')]
    stocks_price = stocks_price.join(right)

stocks_price.dropna(axis=0, how='all', inplace=True)
stocks_price['50index_returns'] = np.log(stocks_price['50index'] / stocks_price['50index'].shift(1))
stocks_price['50index_returns'][0] = 0
stocks_price["50etf"] = stocks_price["50etf"] * (1 + stocks_price['50index_returns']) * 1000

for i, opt in enumerate(options):
    name = '50etf_copy_%s.%d.%d' % (opt[3], opt[2], opt[5])
    stocks_price[name] = stocks_price[name] * (1 + stocks_price['50index_returns']) * 1000 + opt[2]

# print(stocks_price)


invalid value encountered in log



In [52]:
import pandas as pd
import plotly.express as px
import plotly.graph_objects as go

stocks_price['id'] = stocks_price.index
colors = px.colors.qualitative.Plotly
fig = go.Figure()

for i, name in enumerate(future_names):
    fig.add_traces(go.Scatter(x=stocks_price['id'], y=stocks_price[name],
                          mode='lines', line=dict(color=colors[3 + i], dash='dash'), name=name))

fig.add_traces(go.Scatter(x=stocks_price['id'], y=stocks_price['50index'],
                          mode='lines', line=dict(color=colors[0]), name="50index"))
fig.add_traces(go.Scatter(x=stocks_price['id'], y=stocks_price['50etf'],
                          mode='lines', line=dict(color=colors[1]), name="50etf"))
for i, opt in enumerate(options):
    name = '50etf_copy_%s.%d.%d' % (opt[3], opt[2], opt[5])
    fig.add_traces(go.Scatter(x=stocks_price['id'],
                              y=stocks_price[name],
                              mode='lines', line=dict(color=colors[2], dash='longdash', width=0.5), name=name))

fig.update_layout(autosize=False, width=1200, height=800,
                  margin=dict(l=50, r=50, b=100, t=100, pad=4))
fig.show(width=16, height=16)

In [53]:
columns = stocks_price.columns
ih_columns = []
etf_copy_columns = []
for col in columns:
    if col.startswith("IH"):
        ih_columns.append(col)
    if col.startswith("50etf_copy"):
        etf_copy_columns.append(col)

def load_future_end(names):
    with pickup_db("clickhouse") as db:
        sql = """
        select Name as code, toYYYYMMDD(date(lasttrade_date)) as end from future
        where Name in ({names});
""".format(names=",".join(["'%s'"%name for name in names]))
        rows = db.execute(sql)
        data = {}
        for row in rows:
            data[row[0]] = row[1]
        return data

future_end_map = load_future_end(ih_columns)

# latest_future 取最晚到期
def latest_future_field(idx):
    def latest_future(row):
        for col in sorted(ih_columns, reverse=True):
            if not np.isnan(row[col]):
                return row[col] if idx == 0 else pd.to_datetime(future_end_map[col], format='%Y%m%d')
        return  np.NAN if idx == 0 else np.NAN
    return latest_future

# lower_option 取低价格
def lower_option_field(idx):
    def lower_option(row):
        minv = 100000000
        mint = np.NAN
        for cl in etf_copy_columns:
            if not np.isnan(row[cl]):
                mint = mint if row[cl] >= minv else cl
                minv = minv if row[cl] >= minv else row[cl]
        return minv if idx == 0 else pd.to_datetime(mint.split('.')[-1], format='%Y%m%d')
    return lower_option

def higher_option_field(idx):
    def higher_option(row):
        maxv = 0
        maxt = np.NAN
        for cl in etf_copy_columns:
            if not np.isnan(row[cl]):
                maxt = maxt if row[cl] <= maxv else cl
                maxv = maxv if row[cl] <= maxv else row[cl]
        return maxv if idx == 0 else pd.to_datetime(maxt.split('.')[-1], format='%Y%m%d')
    return higher_option

# IH_latest 取期货最早到期
# 50etf_copy_lower 取期权copy的最低价格
# 50etf_copy_higher 取期权copy的最高价格
# 并计算收益率和累积收益率
stocks_price['IH_latest'] = stocks_price.apply(latest_future_field(0), axis=1)
stocks_price['50etf_copy_lower'] = stocks_price.apply(lower_option_field(0), axis=1)
stocks_price['50etf_copy_higher'] = stocks_price.apply(higher_option_field(0), axis=1)

def returns(row):
    # 收益计算
    # IH * 0.10 10%的保证金比例  50etf_copy * 0.12 12%的保证金比例
    # 差价利润率 0.01 作为价差回归平仓线
    if reverse_arbitrage_direction == 0:
        v = (row['IH_latest'] - row['50etf_copy_lower']) / (row['IH_latest'] * 0.10 + row['50etf_copy_lower'] * 0.12)
    else:
        v = (row['50etf_copy_higher'] - row['IH_latest']) / (row['IH_latest'] * 0.10 + row['50etf_copy_higher'] * 0.12)
    return v - 0.01 if v >= 0.01 else 0

stocks_price['returns'] = stocks_price.apply(returns, axis=1)
stocks_price['cum_returns'] = stocks_price['returns'].cumsum()
print(stocks_price[['IH_latest', '50etf_copy_lower', 'returns', 'cum_returns', '50etf_copy_higher']])

              IH_latest  50etf_copy_lower   returns  cum_returns  \
Tm                                                                 
2021-12-01  3176.053957       3181.891487  0.000000     0.000000   
2021-12-02  3187.688496       3194.092408  0.000000     0.000000   
2021-12-03  3206.132987       3209.078712  0.000000     0.000000   
2021-12-06  3237.036703       3236.840401  0.000000     0.000000   
2021-12-07  3243.889005       3247.828722  0.000000     0.000000   
2021-12-08  3277.737752       3278.469690  0.000000     0.000000   
2021-12-09  3367.215926       3350.692261  0.012365     0.012365   
2021-12-10  3362.658555       3349.809562  0.007405     0.019770   
2021-12-13  3405.064313       3383.725740  0.018583     0.038353   
2021-12-14  3368.797467       3343.239969  0.024628     0.062981   
2021-12-15  3346.366280       3324.659691  0.019589     0.082570   
2021-12-16  3331.566324       3309.451914  0.020282     0.102851   
2021-12-17  3314.772517       3286.644427  0.028

In [54]:
colors = px.colors.qualitative.Plotly
import plotly.subplots as sp

figure = sp.make_subplots(rows=3, cols=1)

figure.append_trace(go.Scatter(x=stocks_price['id'], y=stocks_price['IH_latest'],
                               mode='lines', line=dict(color=colors[0], dash='dash'), name="IH_latest"),
                    row=1, col=1)

if reverse_arbitrage_direction == 0:
    figure.append_trace(go.Scatter(x=stocks_price['id'], y=stocks_price['50etf_copy_lower'],
                                   mode='lines', line=dict(color=colors[1], dash='dash'), name="50etf_copy_lower"),
                        row=1, col=1)
else:
    figure.append_trace(go.Scatter(x=stocks_price['id'], y=stocks_price['50etf_copy_higher'],
                               mode='lines', line=dict(color=colors[2], dash='dash'), name="50etf_copy_higher"),
                    row=1, col=1)

figure.append_trace(go.Scatter(x=stocks_price['id'], y=stocks_price['returns'],
                               mode='lines', line=dict(color=colors[3]), name="returns"),
                    row=2, col=1)
figure.append_trace(go.Scatter(x=stocks_price['id'], y=stocks_price['cum_returns'],
                               mode='lines', line=dict(color=colors[4]), name="cum_returns"),
                    row=3, col=1)

figure.update_layout(autosize=False, width=1200, height=800, margin=dict(l=50, r=50, b=100, t=100, pad=4))
figure.show(width=16, height=16)