In [1]:
import os
import pandas as pd
from sqlalchemy import create_engine

In [2]:
def load_data_by_date_from_db(start_date, end_date, table_name='t_stocks_5min', user='dg', host='localhost', password='123456', db='trading_db'):
    engine = create_engine('mysql+pymysql://' + user + ':' + password \
                               + '@' + host + '/' + db, echo=False)
    df = pd.read_sql(f"SELECT date, code, time, open, close, volume FROM {table_name} WHERE date BETWEEN '{start_date}' AND '{end_date}';", con=engine)
    engine.dispose()
    return df

In [3]:
def clean_data(df: pd.DataFrame):
    # 使用 value_counts 统计每个组合出现的次数
    counts = df.value_counts(subset=['code', 'time']).reset_index(name='counts')

    # 找出出现次数大于1的组合
    duplicates = counts[counts['counts'] > 1][['code', 'time']]

    print("重复行：", duplicates)

    # 使用 drop_duplicates 保留每组重复值中的第一行
    df_filtered = df.drop_duplicates(subset=['code', 'time']).copy()

    print("原始数据：")
    print(df)
    print("\n保留第一个重复行后的数据：")
    print(df_filtered)

    del df

    # 数据格式转换
    df_filtered['date'] = pd.to_datetime(df_filtered['date'].values, utc=True).tz_convert('Asia/Shanghai')
    df_filtered['open'] = df_filtered['open'].astype(float)
    df_filtered['close'] = df_filtered['close'].astype(float)
    df_filtered['volume'] = df_filtered['volume'].astype(int)

    # 索引的排序
    df_filtered.set_index(['date', 'code', 'time'], inplace=True)
    df_filtered.sort_index(inplace=True)
    return df_filtered

In [4]:
def get_open_price(df_stock: pd.DataFrame):
    return df_stock['open'].groupby('date', group_keys=False).apply(lambda x: x.values[0])

In [5]:
def PV_Corr(df_stock: pd.DataFrame):
    daily_corr = df_stock[['close', 'volume']].groupby('date', group_keys=False).apply(lambda x: x.corr().iloc[0, 1])
    return daily_corr

def PV_Corr_P(df_stock: pd.DataFrame):
    daily_corr = df_stock[['close', 'volume']].groupby('date', group_keys=False)\
        .apply(lambda x: pd.concat([x['close'], x['volume'].shift(-1)], axis=1).corr().iloc[0, 1])
    return daily_corr

def PV_Corr_V(df_stock: pd.DataFrame):
    daily_corr = df_stock[['close', 'volume']].groupby('date', group_keys=False)\
        .apply(lambda x: pd.concat([x['close'].shift(-1), x['volume']], axis=1).corr().iloc[0, 1])
    return daily_corr

In [6]:
def get_auto_corr(x, bound=None):
    daily_auto_corr = 0
    if bound is None:
        daily_auto_corr = pd.concat([x.diff(), x.shift(-1)], axis=1).corr().iloc[0, 1]
    elif bound == 'u':
        x = x[x.diff() > 0]
        if len(x) >= 2:
            daily_auto_corr = pd.concat([x.diff(), x.shift(-1)], axis=1).corr().iloc[0, 1]   
    elif bound == 'd':
        x = x[x.diff() < 0]
        if len(x) >= 2:
            daily_auto_corr = pd.concat([x.diff(), x.shift(-1)], axis=1).corr().iloc[0, 1]   
    elif bound == 'abs':
        daily_auto_corr = pd.concat([abs(x.diff()), x.shift(-1)], axis=1).corr().iloc[0, 1] 
    return daily_auto_corr

In [7]:
def dP_P_Corr(df_stock: pd.DataFrame):
    daily_corr = df_stock['close'].groupby('date', group_keys=False).apply(get_auto_corr)
    return daily_corr

def dP_u_P_Corr(df_stock: pd.DataFrame):
    daily_corr = df_stock['close'].groupby('date', group_keys=False).apply(lambda x: get_auto_corr(x, bound='u'))
    return daily_corr

def dP_d_P_Corr(df_stock: pd.DataFrame):
    daily_corr = df_stock['close'].groupby('date', group_keys=False).apply(lambda x: get_auto_corr(x, bound='d'))
    return daily_corr

def dP_abs_P_Corr(df_stock: pd.DataFrame):
    daily_corr = df_stock['close'].groupby('date', group_keys=False).apply(lambda x: get_auto_corr(x, bound='abs'))
    return daily_corr

In [8]:
def dV_V_Corr(df_stock: pd.DataFrame):
    daily_corr = df_stock['volume'].groupby('date', group_keys=False).apply(get_auto_corr)
    return daily_corr

def dV_u_V_Corr(df_stock: pd.DataFrame):
    daily_corr = df_stock['volume'].groupby('date', group_keys=False).apply(lambda x: get_auto_corr(x, bound='u'))
    return daily_corr

def dV_d_V_Corr(df_stock: pd.DataFrame):
    daily_corr = df_stock['volume'].groupby('date', group_keys=False).apply(lambda x: get_auto_corr(x, bound='d'))
    return daily_corr

def dV_abs_V_Corr(df_stock: pd.DataFrame):
    daily_corr = df_stock['volume'].groupby('date', group_keys=False).apply(lambda x: get_auto_corr(x, bound='abs'))
    return daily_corr

In [9]:
date_list = [
    # 按半年计算
    ('2018-01-01', '2018-06-30'),
    ('2018-07-01', '2018-12-31'),
    ('2019-01-01', '2019-06-30'),
    ('2019-07-01', '2019-12-31'),
    ('2020-01-01', '2020-06-30'),
    ('2020-07-01', '2020-12-31'),
    ('2021-01-01', '2021-06-30'),
    ('2021-07-01', '2021-12-31'),
    ('2022-01-01', '2022-06-30'),
    ('2022-07-01', '2022-12-31'),
    ('2023-01-01', '2023-06-30'),
    ('2023-07-01', '2023-12-31'),
    ('2024-01-01', '2024-05-16'),
]
# factor_names = ['cal_pvCorr']
list_factors = []
for start_date, end_date in date_list:
    df = load_data_by_date_from_db(start_date, end_date)
    df = clean_data(df)

    df_factors = pd.DataFrame()
    df_factors['open'] = df.groupby('code').apply(lambda x: get_open_price(x))

    # 计算多个因子
    df_factors['PV_Corr'] = df.groupby('code').apply(lambda x: PV_Corr(x))

    df_factors['dP_P_Corr'] = df.groupby('code').apply(lambda x: dP_P_Corr(x))
    df_factors['dP_abs_P_Corr'] = df.groupby('code').apply(lambda x: dP_abs_P_Corr(x))

    df_factors['dV_V_Corr'] = df.groupby('code').apply(lambda x: dV_V_Corr(x))
    df_factors['dV_abs_V_Corr'] = df.groupby('code').apply(lambda x: dV_abs_V_Corr(x))
    print(df_factors)

    list_factors.append(df_factors)

    del df, df_factors

df_factors_total = pd.concat(list_factors) 
df_factors_total = df_factors_total.swaplevel('date', 'code').sort_index(level='date')
df_factors_total.index.names = ['date', 'asset']
df_factors_total

重复行：              code               time
0       sh.600004  20180102093500000
1       sh.600073  20180521145000000
2       sh.600073  20180522102000000
3       sh.600073  20180522101500000
4       sh.600073  20180522101000000
...           ...                ...
271339  sh.600161  20180620135500000
271340  sh.600161  20180620140000000
271341  sh.600161  20180620140500000
271342  sh.600161  20180620141000000
271343  sh.600161  20180620142000000

[271344 rows x 2 columns]
原始数据：
               date       code               time     open    close  volume
0        2018-01-02  sh.600004  20180102093500000  14.7300  14.6800  287900
1        2018-01-02  sh.600004  20180102094000000  14.6800  14.6900  520500
2        2018-01-02  sh.600004  20180102094500000  14.6900  14.6700  255300
3        2018-01-02  sh.600004  20180102095000000  14.6600  14.7100  556420
4        2018-01-02  sh.600004  20180102095500000  14.7300  14.7500  384406
...             ...        ...                ...      ...    

Unnamed: 0_level_0,Unnamed: 1_level_0,open,PV_Corr,dP_P_Corr,dP_abs_P_Corr,dV_V_Corr,dV_abs_V_Corr
date,asset,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1,Unnamed: 7_level_1
2018-01-02 08:00:00+08:00,sh.600004,14.73,0.090534,0.270058,-0.277863,0.097419,0.189974
2018-01-02 08:00:00+08:00,sh.600006,5.86,0.413978,0.304839,0.420312,0.253404,0.106000
2018-01-02 08:00:00+08:00,sh.600017,3.88,-0.018362,-0.004690,0.238512,-0.126166,-0.008230
2018-01-02 08:00:00+08:00,sh.600022,2.14,-0.192313,-0.095678,-0.109664,0.034059,-0.054035
2018-01-02 08:00:00+08:00,sh.600026,6.11,0.086897,0.036088,-0.122444,0.076649,0.092814
...,...,...,...,...,...,...,...
2024-05-16 08:00:00+08:00,sz.300287,2.54,-0.190364,0.079971,-0.182289,0.243220,0.119812
2024-05-16 08:00:00+08:00,sz.300291,4.83,-0.348482,-0.214735,-0.219475,-0.054340,0.085721
2024-05-16 08:00:00+08:00,sz.300324,2.53,-0.186222,0.029957,0.151901,0.086165,-0.001680
2024-05-16 08:00:00+08:00,sz.300376,5.16,-0.384478,0.275499,0.087942,0.175358,0.143703


In [None]:
save_folder_path = 'tests'
os.makedirs(save_folder_path, exist_ok=True)
df_factors_total.to_parquet(f'{save_folder_path}/factors.parquet')