In [1]:
import os
os.chdir('..')
print('working dir:', os.getcwd())

import sys
import pandas as pd
import numpy as np
import polars as pl
import datetime

raw_data_path = '/mnt/datassd2/crypto/15minbar'  
data_save_path = '/mnt/datassd2/crypto/dl_data/'
df = pl.scan_parquet(raw_data_path).collect()

working dir: /home/yuheng/mydata_mgmt/nerv_ml/tools


In [2]:
# 实盘需要cut最新数据
df = df.filter(pl.col('datetime') > datetime.datetime(2024, 1, 1))
df = df.sort(by='datetime')

In [3]:
las = ['la1', 'la2', 'la5', 'la10', 'la15', 'la30', 'la60', 'la120', 'la180', 'la240', 'la300', 'la360']

def ema(col, length=100)->pl.Expr:
    target_col = pl.col(col)
    ema = target_col.ewm_mean(span=length)
    ema_reduced = target_col / ema
    return ema_reduced

def p_change(col, length=1)->pl.Expr:
    '''变化率，percentage change'''
    clip_col = pl.col(col).clip(lower_bound=1e-5)  # todo 加一个动态下界
    target_col = clip_col.log()
    return target_col - target_col.shift(length)

def zscore(col, length=100)->pl.Expr:
    '''zscore'''
    target_col = pl.col(col)
    mean = target_col.rolling_mean(length)
    std = target_col.rolling_std(length)
    zscore = (target_col - mean) / std
    zscore = zscore.clip(-4, 4)
    return zscore

raw_pldf = df.with_columns(
    ema('close', 100).over('symbol').alias('ema'),
    p_change('close', 1).over('symbol').alias('p_change'),
    zscore('close', 100).over('symbol').alias('zscore'),
    ema('amount', 100).over('symbol').alias('ema_amt'),
    p_change('amount', 1).over('symbol').alias('p_change_amt'),
    zscore('amount', 100).over('symbol').alias('zscore_amt')
    )

pldf = raw_pldf.filter(pl.col('amount') > 0)

feats = ['ema', 'p_change', 'zscore', 'ema_amt', 'p_change_amt', 'zscore_amt']
cols = feats + las + ['symbol', 'datetime', 'date', 'close']
pldf = pldf.select(cols)
pldf = pldf.unique(subset=["symbol", "datetime"], maintain_order=True)

In [4]:
la_df = pldf.to_pandas().pivot(index='datetime', columns='symbol', values='la240')
la_df.fillna(0, inplace=True)


In [5]:
# 实盘池子
amt_df = raw_pldf.to_pandas().pivot(index='datetime', columns='symbol', values='amount')
amt_df.fillna(0, inplace=True)
# amt_df = amt_df[amt_df.index > pd.Timestamp('2024-01-01')]

# 空值尽可能少
not_null_cols = (amt_df != 0 ).sum()
not_null_cols = not_null_cols[not_null_cols>=33000]
amt_df = amt_df[not_null_cols.index]

large_rank_cols = amt_df.sum().sort_values(ascending=False)[:120].index
print(list(large_rank_cols))

['BTCUSDT', 'ETHUSDT', 'SOLUSDT', 'DOGEUSDT', 'XRPUSDT', '1000PEPEUSDT', 'BNBUSDT', 'SUIUSDT', '1000SHIBUSDT', 'WLDUSDT', 'ADAUSDT', 'ORDIUSDT', 'AVAXUSDT', 'LINKUSDT', 'LTCUSDT', 'BCHUSDT', 'FILUSDT', '1000BONKUSDT', 'NEARUSDT', 'ARBUSDT', 'PEOPLEUSDT', 'TIAUSDT', 'APTUSDT', 'DOTUSDT', '1000FLOKIUSDT', 'OPUSDT', 'ETCUSDT', 'FETUSDT', 'TRBUSDT', 'HBARUSDT', 'SEIUSDT', 'XLMUSDT', 'UNIUSDT', 'GALAUSDT', 'INJUSDT', '1000SATSUSDT', 'AAVEUSDT', 'ENSUSDT', 'APEUSDT', 'CRVUSDT', 'SANDUSDT', 'STXUSDT', 'EOSUSDT', 'TRXUSDT', 'JTOUSDT', 'LDOUSDT', 'ATOMUSDT', 'GMTUSDT', 'RUNEUSDT', 'JASMYUSDT', 'DYDXUSDT', 'MASKUSDT', 'PENDLEUSDT', 'ICPUSDT', '1000RATSUSDT', 'CKBUSDT', 'BIGTIMEUSDT', 'ARKMUSDT', 'ALGOUSDT', 'CFXUSDT', 'MEMEUSDT', 'PYTHUSDT', 'UMAUSDT', 'BLURUSDT', 'POLYXUSDT', 'ARUSDT', 'LPTUSDT', 'THETAUSDT', 'NEOUSDT', 'MKRUSDT', 'CHZUSDT', '1000LUNCUSDT', 'BNXUSDT', 'GRTUSDT', 'AGLDUSDT', 'IDUSDT', 'YGGUSDT', 'AXSUSDT', 'API3USDT', 'RSRUSDT', 'ZENUSDT', 'HIGHUSDT', 'ZRXUSDT', 'SUSHIUSDT', 'VE

In [6]:
# 实盘：结合Currency_picking.ipynb 筛选了近期活跃的symbol
used_cols = ['DOTUSDT', 'SEIUSDT', 'TRXUSDT', 'AUCTIONUSDT', 'ACHUSDT', 'INJUSDT', 'ARBUSDT', 'XRPUSDT', 'APTUSDT', '1000BONKUSDT', 'ARKUSDT', 'DOGEUSDT', 'AAVEUSDT', 'CAKEUSDT', 'UNIUSDT', 'GALAUSDT', 'ARKMUSDT', '1000SHIBUSDT', 'JTOUSDT', 'NEARUSDT', 'ETCUSDT', 'CRVUSDT', 'XLMUSDT', 'RUNEUSDT', 'ETHUSDT', 'SANDUSDT', 'FILUSDT', 'APEUSDT', 'LDOUSDT', 'AVAXUSDT', 'ZENUSDT', 'FETUSDT', 'BAKEUSDT', 'EOSUSDT', 'BNBUSDT', 'ORDIUSDT', 'MKRUSDT', 'SUIUSDT', '1000SATSUSDT', 'BTCUSDT', 'BCHUSDT', 'ALGOUSDT', 'HBARUSDT', 'OPUSDT', '1000FLOKIUSDT', 'ADAUSDT', 'ATOMUSDT', 'WLDUSDT', 'TIAUSDT', '1000PEPEUSDT']
pldf = pldf.filter(pl.col('symbol').is_in(used_cols))

In [7]:
pldf = pldf.drop_nulls()
pldf = pldf.with_columns(
    pl.concat_list(feats).alias("feats")
)
pivot_feats = pldf.pivot(on="symbol",index="datetime", values="feats", maintain_order=True)

## 保存数据

1. eod_data(cs_data)
- 处理 eod_data (S,T,F) = (n_stock, n_datetime, n_feats)  
- 维护一个字典，映射 S,T,F 到对应的股票代码/日期/特征，后续预测值通过这个字典可以反向映射回原dataframe

2. mask_data
- 把eod == null的处理为mask

3. gt_data(return_data)
- 取la15存起来就行
4. extra_data 其他数据
- symbols 顺序
- dt 顺序

In [8]:
exp_symbols = pivot_feats.columns
exp_symbols.remove('datetime')
exp_dt = pivot_feats['datetime']
extra_data = {
    'symbols': exp_symbols,
    'dt': exp_dt.to_list()
}

pivot_feats = pivot_feats.drop('datetime')

# task2. 得到mask_data
mask_data = pivot_feats.with_columns(
    pl.col(s).is_null().not_() for s in exp_symbols
).to_numpy()
mask_data = mask_data.T

pivot_feats = pivot_feats.with_columns(
    pl.col(s).fill_null([0.] * len(feats)) for s in exp_symbols
)
# task1. 得到cs_feats_data, 
feats_numpy = pivot_feats.to_numpy()
feats_array = np.array([np.stack(sublist, axis=1) for sublist in feats_numpy])
feats_array = feats_array.transpose(2, 0, 1)   # shape: (n_symbols, n_T,  n_feats)

# task3 得到la240_data
la_data =  pldf.pivot(on="symbol",index="datetime", values="la60", maintain_order=True)
la_data = la_data.drop('datetime')
la_array = la_data.fill_null(0.).to_numpy()
la_array = la_array.T

In [9]:
print('feats_array.shape:', feats_array.shape)
print('mask_data.shape:', mask_data.shape)
print('la15_array.shape:', la_array.shape)

feats_array.shape: (50, 39308, 6)
mask_data.shape: (50, 39308)
la15_array.shape: (50, 39308)


In [10]:
dataset_path = data_save_path + 'stock_mix_data'
# mkdir
if not os.path.exists(dataset_path):
    os.makedirs(dataset_path)
print('save to:', dataset_path)

save to: /mnt/datassd2/crypto/dl_data/stock_mix_data


In [11]:
import pickle

with open(os.path.join(dataset_path, "cs_data.pkl"), 'wb') as f:
    pickle.dump(feats_array, f)
with open(os.path.join(dataset_path, "mask_data.pkl"), 'wb') as f:
    pickle.dump(mask_data, f)
with open(os.path.join(dataset_path, "la15_data.pkl"), 'wb') as f:
    pickle.dump(la_array, f)
with open(os.path.join(dataset_path, "extra_data.pkl"), 'wb') as f:
    pickle.dump(extra_data, f)

In [12]:
len(extra_data['dt'])

39308

In [13]:
mask_data.shape

(50, 39308)

In [14]:
extra_data['dt'][15000:15010]

[datetime.datetime(2024, 6, 6, 7, 0),
 datetime.datetime(2024, 6, 6, 7, 15),
 datetime.datetime(2024, 6, 6, 7, 30),
 datetime.datetime(2024, 6, 6, 7, 45),
 datetime.datetime(2024, 6, 6, 8, 0),
 datetime.datetime(2024, 6, 6, 8, 15),
 datetime.datetime(2024, 6, 6, 8, 30),
 datetime.datetime(2024, 6, 6, 8, 45),
 datetime.datetime(2024, 6, 6, 9, 0),
 datetime.datetime(2024, 6, 6, 9, 15)]

In [15]:
extra_data['dt'][-10:]

[datetime.datetime(2025, 2, 14, 15, 30),
 datetime.datetime(2025, 2, 14, 15, 45),
 datetime.datetime(2025, 2, 14, 16, 0),
 datetime.datetime(2025, 2, 14, 16, 15),
 datetime.datetime(2025, 2, 14, 16, 30),
 datetime.datetime(2025, 2, 14, 16, 45),
 datetime.datetime(2025, 2, 14, 17, 0),
 datetime.datetime(2025, 2, 14, 17, 15),
 datetime.datetime(2025, 2, 14, 17, 30),
 datetime.datetime(2025, 2, 14, 17, 45)]

In [16]:
print('extra_data.keys(): ', extra_data.keys())
print('len(extra_data[\'dt\']): ', len(extra_data['dt']))
print('len(extra_data[\'symbols\']): ', len(extra_data['symbols']))

extra_data.keys():  dict_keys(['symbols', 'dt'])
len(extra_data['dt']):  39308
len(extra_data['symbols']):  50


## 预处理MLP用的截面数据

In [23]:
import numpy as np
import os
import pickle
import polars as pl

def ema(col, length=100)->pl.Expr:
    target_col = pl.col(col)
    ema = target_col.ewm_mean(span=length)
    ema_reduced = target_col / ema
    return ema_reduced  

def featclip(feat: pl.Expr) -> pl.Expr:
    """
    1 filter quantile 0.01~0.99
    """
    featclip = feat.clip(
        feat.quantile(0.01),
        feat.quantile(0.99),
    )
    return sanitise(featclip)

def sanitise(pl_col) -> pl.Expr:
    """
    replace inf, -inf, NaN, and null to 0
    """
    return (
        pl.when(pl_col.is_infinite() | pl_col.is_nan() | pl_col.is_null())
        .then(0)
        .otherwise(pl_col)
    )

df = pl.scan_parquet(raw_data_path)
df = df.filter(pl.col('datetime') > datetime.datetime(2024, 1, 1))
df = df.sort(by='datetime')

las = ['la1', 'la2', 'la5', 'la10', 'la15', 'la30', 'la60', 'la120', 'la180', 'la240', 'la300', 'la360']
df = df.with_columns([featclip(pl.col(la)).alias(la) for la in las])
df = df.with_columns(ema('close', 100).over('symbol').alias('ema'),)
df = df.collect().to_pandas()
df.drop_duplicates(subset=['datetime', 'symbol'], keep='first', inplace=True)
factors_df = df.pivot(index='datetime', columns='symbol', values='ema')
la_df = df.pivot(index='datetime', columns='symbol', values='la60')
factors_df.fillna(0, inplace=True)
la_df.fillna(0, inplace=True)
#矩阵过于稀疏，取出现次数最多的50个symbol
# large_rank_cols = (la_df != 0 ).sum().sort_values(ascending=False)[:50].index
used_cols = used_cols  # todo 整理代码
factors_df_top50 = factors_df[large_rank_cols].copy()
la_df_top50 = la_df[large_rank_cols].copy()
X = factors_df_top50
y = la_df_top50

exp_symbols = X.columns
exp_dt = X.index.values
extra_data = {
    'symbols': exp_symbols,
    'dt': exp_dt
}

In [24]:
dataset_path = data_save_path + 'cs_dataset'
# mkdir
if not os.path.exists(dataset_path):
    os.makedirs(dataset_path)
print('save to:', dataset_path)

save to: /mnt/datassd2/crypto/dl_data/cs_dataset


In [30]:
factors_df_top50

symbol,BTCUSDT,ETHUSDT,SOLUSDT,DOGEUSDT,XRPUSDT,1000PEPEUSDT,BNBUSDT,SUIUSDT,1000SHIBUSDT,WLDUSDT,...,BELUSDT,CAKEUSDT,BEAMXUSDT,TOKENUSDT,QTUMUSDT,KASUSDT,SUPERUSDT,LEVERUSDT,1INCHUSDT,FLOWUSDT
datetime,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1,Unnamed: 7_level_1,Unnamed: 8_level_1,Unnamed: 9_level_1,Unnamed: 10_level_1,Unnamed: 11_level_1,Unnamed: 12_level_1,Unnamed: 13_level_1,Unnamed: 14_level_1,Unnamed: 15_level_1,Unnamed: 16_level_1,Unnamed: 17_level_1,Unnamed: 18_level_1,Unnamed: 19_level_1,Unnamed: 20_level_1,Unnamed: 21_level_1
2024-01-01 00:15:00,1.000000,1.000000,1.000000,1.000000,1.000000,1.000000,1.000000,1.000000,1.000000,1.000000,...,1.000000,1.000000,1.000000,1.000000,1.000000,1.000000,1.000000,1.000000,1.000000,1.000000
2024-01-01 00:30:00,1.000186,1.000509,1.000499,1.000275,0.999277,1.001406,0.999149,1.001842,1.001001,1.000743,...,1.002511,1.000863,0.997628,1.000455,1.003551,1.000745,1.000648,1.001455,1.003337,1.006414
2024-01-01 00:45:00,1.000574,1.000974,0.999173,0.999668,1.000272,1.001940,0.999249,1.002567,1.000089,1.005234,...,1.001656,1.002378,0.994370,1.005749,0.999538,1.001600,1.004303,1.001348,1.003410,0.998531
2024-01-01 01:00:00,1.000323,1.001085,1.002502,0.999423,1.001045,1.002008,1.001074,1.003708,1.000280,1.006882,...,1.002273,0.998437,0.997819,1.010361,0.988960,0.999612,1.003675,0.996634,0.990844,0.998106
2024-01-01 01:15:00,1.000762,1.001724,1.001795,1.000952,1.001212,1.006194,0.999741,1.005159,1.002583,1.019081,...,1.003245,1.002176,1.002822,1.008427,0.997170,1.001584,1.003552,0.996866,1.010220,0.999356
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
2025-02-14 22:45:00,1.000395,1.002171,1.003098,1.001035,1.015676,0.991907,0.975182,0.989843,0.998552,1.005225,...,1.039377,0.948989,0.989818,0.983309,0.995162,1.003272,1.010200,0.973111,0.994883,1.000554
2025-02-14 23:00:00,1.001269,1.004767,1.003826,1.003671,1.021015,0.995104,0.979838,0.992903,1.002595,1.013239,...,1.038679,0.954175,0.995331,0.989962,1.000809,1.006172,1.016264,0.979250,0.996063,1.002397
2025-02-14 23:15:00,1.002259,1.004843,1.003010,1.005381,1.021068,1.001026,0.977605,0.995739,1.002839,1.013128,...,1.038219,0.950850,0.999517,0.990510,1.001085,1.006790,1.017372,0.985342,0.996500,1.002349
2025-02-14 23:30:00,1.003249,1.004302,1.002359,1.007857,1.025521,1.004255,0.981720,0.997227,1.006913,1.015259,...,1.031075,0.957269,0.999962,0.992806,1.001940,1.006840,1.017805,0.988135,0.995130,1.000449


In [25]:
X.to_parquet(os.path.join(dataset_path, "X.parquet"))
y.to_parquet(os.path.join(dataset_path, "y.parquet"))
with open(os.path.join(dataset_path, "extra_data.pkl"), 'wb') as f:
    pickle.dump(extra_data, f)

In [26]:
X = pd.read_parquet(os.path.join(dataset_path, "X.parquet")).values

In [27]:
X.shape

(39455, 120)

In [28]:
y.shape

(39455, 120)