In [1]:
from scipy.stats import rankdata
import scipy as sp
import numpy as np
import pandas as pd

### 读取数据

In [None]:
df = pd.read_csv('./alldata.csv')

In [None]:
df.dtypes

In [None]:
df = df.loc[df['Trddt']>='2017-01-03']

### 辅助函数

In [None]:
def get_data(column):
    df_res = df.pivot(index=['Trddt'], columns='Stkcd', values=[column])
    df_res = df_res[column]
    df_res = df_res.reset_index()
    df_res.fillna(df_res.mean(), inplace=True)
    df_res = df_res.set_index('Trddt')
    def change_extreme(row):
        m=row.mean()
        sd=row.std()
        row=row.apply(lambda x:x if m-3*sd<=x<=m+3*sd else (m+3*sd if x>m+3*sd else m-3*sd))
        return row
    df_res = df_res.apply(change_extreme, axis=1)
    df_res = df_res.apply(lambda row: (row-row.mean())/row.std(), axis=1)
    return df_res
                    

In [None]:
import statsmodels.api as sm 
def compute_IC_t(df,return_nextday):#df为因子值矩阵
    
    #行列对齐
    return_nextday=return_nextday[df.columns.tolist()]
    return_nextday=return_nextday.loc[df.index.tolist()]
    
    #建立新的dataframe，用于存储结果
    result=pd.DataFrame(np.nan,index=df.index,columns=["factor_return","tvalue","IC"])

    #T+1期收益率对T期因子值逐行做回归
    for date in df.index.tolist():
        x=df.loc[date]
        y=return_nextday.loc[date]
        model=sm.OLS(y,x).fit()
        result.loc[date,"factor_return"]=model.params[date]
        result.loc[date,"tvalue"]=model.tvalues[date]
        result.loc[date,"IC"]=return_nextday.loc[date].corr(df.loc[date])
    
    return result

In [None]:
df_high = get_data('Hiprc')
df_low = get_data('Loprc')
df_open = get_data('Opnprc')
df_close = get_data('Clsprc')
df_volume = get_data('Dnshrtrd')
df_value = get_data('Dnvaltrd')
df_marketcap = get_data('Dsmvosd')
df_low

In [None]:
df_high

In [None]:
df_return = df_close / df_close.shift(1) - 1
df_return

### 因子

In [None]:
simple_factors = [df_high, df_low, df_open, df_close, df_volume, df_value, df_marketcap]

### 因子回归结果

In [None]:
res = []
for i in simple_factors:
    res.append(compute_IC_t(i.shift(2)[2:], df_return))
res[0]

In [None]:
res[1]

# 其他因子

<ol>
    <li>
        收益率比五天前收益率 <b>(不显著)</b>
        <code>((df_return / df_return.shift(5)) - 1).shift(2)[8:]</code>
    </li>
    <li>
        五日反转因子
        <code>((df_close / df_close.shift(5)) - 1)[5:]</code>
    </li>
    <li>
        二十日反转因子
        <code>((df_close / df_close.shift(20)) - 1)[20:]</code>
    </li>
    <li>
        开/收 开/高 等等（6个）
        <pre>
            <code>
            price = [df_high, df_low, df_open, df_close]
            res = []
            for i in range(4):
                for j in range(i+1, 4):
                    res.append(get_t_value(price[i]/price[j]))
            </code>
        </pre>
    </li>
    <li>
        单个数据因子（7个）
    </li>
    <li>
        成交额比市值 <code>df_value/df_marketcap</code>
    </li>
    <li>
        rolling 最大值比最小值 <code>df_high.rolling(20).max() / df_high.rolling(20).min()</code>
    </li>
</ol>

In [None]:
df_return = df_return.tail(1236)

In [None]:
def get_t_value(df):
    df_res = compute_IC_t(df, df_return)
    one = df_res['factor_return'].mean()
    two = abs(df_res['tvalue']).mean()
    three = len(df_res[df_res['tvalue'] > 2]) / len(df_res)
    four = df_res[df_res['tvalue']>0].mean()['tvalue']
    five = df_res[df_res['tvalue']<0].mean()['tvalue']
    six = df_res[df_res['IC']>0].mean()['IC']
    seven = df_res[df_res['IC']<0].mean()['IC']
    df_res = pd.DataFrame(data={'平均收益率':[one],'t值绝对值均值':[two],'t值大于2的占比':[three],'正t值均值':[four]
                                ,'负t值均值':[five],'正ic值均值':[six],'负ic值均值':[seven]})
    return df_res

In [None]:
from math import sqrt
def choose(df, rev: bool = True, top: int = 100):
    """
    df: 因子值矩阵
    返回每天收益率序列
    """
    df = df.shift(1)
    df_rank = df.rank(ascending=rev)
    return df_return.shift(-1)[df_rank<=top].mean(axis=1)

def MaxDrawdown(return_list):
    '''最大回撤率'''
    i = np.argmax((np.maximum.accumulate(return_list) - return_list) / np.maximum.accumulate(return_list))  # 结束位置
    if i == 0:
        return 0
    j = np.argmax(return_list[:i])  # 开始位置
    return (return_list[j] - return_list[i]) / (return_list[j]) 

def evaluate(df):
    sharpe = df.mean() / df.std() * sqrt(250)
    total = df.sum()
    max_drawdown = MaxDrawdown(list(df.dropna()))
    return sharpe, total, max_drawdown

def all_choose(df, rev: bool = True, top: int = 100):
    df_c = choose(df, rev, top)
    s, t, m = evaluate(df_c)
    df_res = pd.DataFrame(data={'夏普':[s],'累计收益率':[t],'最大回撤':[m]})
    return df_res

In [None]:
df_reverse20 = ((df_close / df_close.shift(20)) - 1)[20:]
df_reverse20

In [None]:
all_choose(df_reverse20, False)

In [None]:
df_rolling = df_high.rolling(20).max() / df_high.rolling(20).min()

In [None]:
all_choose(df_rolling)