In [2]:
import pandas as pd

data = pd.read_csv('/Users/zhongshijie/Desktop/01-Projects/big_quant/_data/2024-12-08.csv')
print(data.shape)
data['时间'] = pd.to_datetime(data['时间'])
data = data[data['时间'] > '2024-12-05 20:59']
print(data.shape)

(1876010, 40)
(1875972, 40)


## 采集数据 

In [None]:
from typing import Callable

import efinance as ef
import pandas as pd
import sklearn.metrics

pd.set_option('display.max_columns', None)


def collect_future_day_history(deal_func: Callable):
    # 1.收集行情ID
    base_info = ef.futures.get_futures_base_info()
    cod_list = list(
        base_info[(base_info['期货名称'].str.contains('主')) & (~base_info['期货名称'].str.contains('次'))]['行情ID'])

    # 2.收集历史
    history_dict = ef.futures.get_quote_history(cod_list)

    # 2.1 处理单个历史
    for k, v in history_dict.items():
        history_dict[k] = deal_func(v)

    # 3. 合并历史
    result = pd.concat(history_dict.values())

    # 4.返回结果
    return result


def ft(df: pd.DataFrame):
    col_name = '次日涨幅'
    df[col_name] = (df['收盘'].shift(-1) - df['收盘']) / df['收盘'] * 100
    df = df[df[col_name].notna()]

    return df

In [None]:
data = collect_future_day_history(ft)

data_dict = {
    'train': data[data['日期'] < '2024'],
    'test': data[data['日期'] >= '2024'],
}

del data

## 处理数据

In [None]:
data_dict['train'].head()

In [None]:
def deal_data(dd, func_list):
    for k in dd:
        for func in func_list:
            dd[k] = func(dd[k], k)


def make_flag(odf, k):
    df = odf.copy()
    df.loc[df['次日涨幅'] > 0, '次日涨幅'] = 1
    df.loc[df['次日涨幅'] <= 0, '次日涨幅'] = 0
    df['次日涨幅'] = df['次日涨幅'].astype(int)
    return df


def make_ft(odf, k):
    df = odf.copy()

    df['价格中间线'] = (df['收盘'] - df['开盘']) / 2
    df['开盘-价格中间线'] = df['开盘'] - df['价格中间线']
    df['收盘-价格中间线'] = df['收盘'] - df['价格中间线']
    df['开盘-价格中间线@收盘_幅'] = df['开盘-价格中间线'] / df['开盘']
    df['收盘-价格中间线@收盘_幅'] = df['收盘-价格中间线'] / df['收盘']

    df['收盘_上攀幅'] = (df['最高'] - df['收盘']) / df['收盘']
    df['收盘_下攀幅'] = (df['最低'] - df['收盘']) / df['收盘']
    df['收盘_上下攀幅差'] = df['收盘_上攀幅'] - df['收盘_下攀幅']

    df['开盘_上攀幅'] = (df['最高'] - df['开盘']) / df['开盘']
    df['开盘_下攀幅'] = (df['最低'] - df['开盘']) / df['开盘']
    df['开盘_上下攀幅差'] = df['开盘_上攀幅'] - df['开盘_下攀幅']

    df['收盘_上攀幅/成交量'] = df['收盘_上攀幅'] / df['成交量']
    df['收盘_下攀幅/成交量'] = df['收盘_下攀幅'] / df['成交量']

    df['振幅/涨跌幅'] = df['振幅'] / df['涨跌幅']

    df['收盘_上攀幅/涨跌幅'] = df['收盘_上攀幅'] / df['涨跌幅']
    df['收盘_下攀幅/涨跌幅'] = df['收盘_下攀幅'] / df['涨跌幅']
    df['收盘_上下攀幅差/涨跌幅'] = df['收盘_上下攀幅差'] / df['涨跌幅']

    df['开盘_上攀幅/涨跌幅'] = df['开盘_上攀幅'] / df['涨跌幅']
    df['开盘_下攀幅/涨跌幅'] = df['开盘_下攀幅'] / df['涨跌幅']
    df['开盘_上下攀幅差/涨跌幅'] = df['开盘_上下攀幅差'] / df['涨跌幅']

    return df


def clean_data(odf, k):
    df = odf.copy()
    del_col_list = ['期货代码', '日期', '换手率']
    for del_col in del_col_list:
        del df[del_col]
    return df


def make_category(odf, k):
    df = odf.copy()
    for object_col in df.select_dtypes(include=['object']).columns.to_list():
        df[object_col] = df[object_col].astype('category')
    return df

In [None]:
deal_data(data_dict, [make_flag, clean_data, make_ft, make_category])

In [None]:
data_dict['train'].shape

## 使用数据

In [None]:
from typing import Dict, Any, List
import sklearn
import lightgbm as lgb


def train(
        x: pd.DataFrame,
        y: pd.Series,
        n_folds: int = 5,
        params: dict = None,
        feval: Callable = None,
        num_boost_round: int = 100,
        seed: int = 2024,
):
    if params is None:
        params = {}
    result = {}
    stratified_k_fold = sklearn.model_selection.StratifiedKFold(n_folds, shuffle=True, random_state=seed)
    for k, (train_idx, val_idx) in enumerate(stratified_k_fold.split(x, y)):
        # 获取本折数据
        print(f'------------ {k} ------------')
        object_cols = x.select_dtypes(include=['category']).columns.to_list()
        all_data = lgb.Dataset(x, y)
        train_data = lgb.Dataset(x.iloc[train_idx], y.iloc[train_idx], categorical_feature=object_cols)
        val_data = lgb.Dataset(x.iloc[val_idx], y.iloc[val_idx], categorical_feature=object_cols)
        # 开始训练并记录训练数据
        eval_result = {}
        gbm = lgb.train(
            params=params,
            train_set=train_data,
            valid_sets=[train_data, val_data],
            valid_names=['train', 'val'],
            feval=feval if feval is not None else None,
            callbacks=[lgb.log_evaluation(int(num_boost_round / 5)), lgb.record_evaluation(eval_result)],
            num_boost_round=num_boost_round,
        )
        best_score = dict(gbm.best_score['val'])
        print(f'best-best_iteration:[{gbm.best_iteration}], best-score[{best_score}]]')
        result[k] = {
            'gbm': gbm,
            'eval': eval_result,
            'feature_importance': pd.DataFrame({'feature_name': gbm.feature_name(), f'{k}': gbm.feature_importance()})
        }
    return result


def show_importance(tr: Dict):
    result = pd.DataFrame()
    result['feature_name'] = tr[0]['feature_importance']['feature_name']
    for k, v in tr.items():
        result = pd.merge(result, v['feature_importance'], on='feature_name', how='left')
    result['feature_importance'] = result[[str(x) for x in range(len(tr))]].mean(axis=1)
    result.sort_values(by='feature_importance', inplace=True, ascending=False)
    return result


def pred(
        tr: Dict[str, Any],
        x: pd.DataFrame,
        y: pd.Series = None,
        n_folds: int = 5,
        check_func_list: List[Callable] = None,
):
    result = x.copy()
    result['pred-mean'] = 0
    for k, v in tr.items():
        result['pred-mean'] += (v['gbm'].predict(x) / n_folds)
    if y is not None:
        result['real'] = y
        if check_func_list is not None:
            for check_func in check_func_list:
                print('回测结果', check_func(result['pred-mean'], result['real']))
    return result


def roc_auc(y_pred, y_true):
    func_name = 'ROC_AUC'
    score = sklearn.metrics.roc_auc_score(y_true, y_pred)
    is_higher_better = True
    return func_name, score, is_higher_better


def ks(y_pred, y_true):
    func_name = 'KS'
    fpr, tpr, thr = sklearn.metrics.roc_curve(y_true, y_pred)
    score = max(tpr - fpr)
    is_higher_better = True
    return func_name, score, is_higher_better


In [None]:
kn = 5

train_result = train(
    x=data_dict['train'].iloc[:, ~data_dict['train'].columns.isin(['次日涨幅'])],
    y=data_dict['train']['次日涨幅'],
    n_folds=kn,
    params={
        'objective': 'binary',
        'metric': ['auc'],
        'verbose': -1,
        'n_jobs': 10,
        'learning_rate': 0.01,
        'early_stopping_round': 300,
    },
    num_boost_round=1000,
)

display(show_importance(train_result))

pred_result = pred(
    tr=train_result,
    x=data_dict['test'].iloc[:, ~data_dict['test'].columns.isin(['次日涨幅'])],
    y=data_dict['test']['次日涨幅'],
    n_folds=kn,
    check_func_list=[roc_auc, ks]
)