In [62]:
import pandas as pd
import numpy as np
import os
import talib as ta
from utils import classify_stock_exchange, pre_adj_factor

In [63]:
raw_data_path = "D:\股票数据\stock_temporal"

stock_list = os.listdir(raw_data_path)
stock_list = [s.split(".")[0] for s in stock_list if s.endswith('.csv')]

exchange_codes = classify_stock_exchange(stock_list)

exchange_codes.to_csv("data/exchange_codes.csv", encoding='utf-8-sig')

In [64]:
from tqdm import tqdm
from concurrent.futures import ThreadPoolExecutor, as_completed

# 参数声明
start_date = '20210501'
end_date = '20250501'
target_exchange = ['上交所', '深交所']
ticker_price = "收盘价"
ticker_date = "交易日期"
ticker_factor = "复权因子"

# 过滤沪深所股票代码
hs_codes = exchange_codes.loc[exchange_codes['exchange'].isin(target_exchange), 'code'].tolist()

# 读取所有沪深所股票的close数据
data = {}

def process_stock(code):
    file_path = os.path.join(raw_data_path, f"{code}.csv")
    if os.path.exists(file_path):
        df = pd.read_csv(file_path, dtype={'code': str})
        df[ticker_date] = pd.to_datetime(df[ticker_date], format='%Y%m%d')
        df.sort_values(by=ticker_date, inplace=True, ascending=True)
        # 前复权调整
        df = pre_adj_factor(df, ticker_price, ticker_factor)
        df = df[(df[ticker_date] >= pd.to_datetime(start_date)) & (df[ticker_date] <= pd.to_datetime(end_date))]
        return code, df.set_index(ticker_date)[ticker_price]
    else:
        return code, pd.Series(name=ticker_price, dtype=float)

with ThreadPoolExecutor() as executor:
    futures = {executor.submit(process_stock, code): code for code in hs_codes}
    for future in tqdm(as_completed(futures), total=len(futures), desc="Processing stocks"):
        code, series = future.result()
        data[code] = series


# 合并为一个DataFrame，index为交易日，columns为股票代码
df_panel = pd.DataFrame(data)

Processing stocks: 100%|██████████| 3767/3767 [00:45<00:00, 82.63it/s] 


In [65]:
df_panel.to_csv("data/hs_close_prices_2205_2505.csv", encoding='utf-8-sig')

In [66]:
na_ratio_eps = 0.01
price_eps = 3

# 对价格做清洗处理
# 计算每只股票的缺失率
nan_ratio = df_panel.isna().mean()
# 过滤缺失率超过2%的股票
df_panel_clean = df_panel.loc[:, nan_ratio <= na_ratio_eps]
# 计算每只股票的最小价格
min_price = df_panel_clean.min()
# 过滤最小价格低于2元的股票
df_panel_clean = df_panel_clean.loc[:, min_price >= price_eps]
df_panel_clean.sort_index(axis=1, inplace=True)

# 查看清洗后股票数量
print(f"清洗后剩余股票数量: {df_panel_clean.shape[1]}")

清洗后剩余股票数量: 2520


In [67]:
df_panel_clean.to_csv("data/hs_close_prices_2205_2505.csv", encoding='utf-8-sig')

In [68]:
# 输出df_panel_clean的所有股票代码（列名），保存到data文件夹
selected_codes = df_panel_clean.columns.tolist()
selected_codes.sort()
pd.Series(selected_codes).to_csv("data/selected_stock_codes.csv", index=False, header=False, encoding='utf-8-sig')

In [69]:
windows = [5, 10, 20, 30, 60]

# 先对每只股票dropna，再计算EMA，最后reindex回原始index，保证对齐
ma_dfs = {}
for w in windows:
    df_ma = df_panel_clean.copy()
    for col in df_ma.columns:
        s = df_ma[col]
        s_drop = s.dropna()
        ema = ta.EMA(s_drop.values, timeperiod=w)
        # reindex到原始index，NaN自动补齐
        ema_series = pd.Series(ema, index=s_drop.index).reindex(s.index)
        df_ma[col] = ema_series
    ma_dfs[w] = df_ma
    # 归一化处理，使用log(ma(close)/close)
    # ma_dfs[w] = np.log(df_ma / df_panel_clean)


In [70]:
import numpy as np

max_ma_days = max(windows)
return_days = 5
trading_dates = df_panel_clean.index.to_list()
stock_codes = df_panel_clean.columns.to_list()
num_dates = len(trading_dates[max_ma_days:-return_days])
num_stocks = len(stock_codes)
num_features = len(windows) + 3  # 日期序号 + 均线特征数 + 掩码 + 未来收益

# 初始化特征矩阵
feature_matrix = np.zeros((num_dates, num_stocks, num_features))

# 1. 日期序号特征
date_idx = np.arange(num_dates)
feature_matrix[:, :, 0] = date_idx[:, None]  # 广播到所有股票

# 2. 均线特征
for i, w in enumerate(windows):
    ma = ma_dfs[w].iloc[max_ma_days:-return_days].values  # [日期, 股票]
    feature_matrix[:, :, i+1] = ma

# 特征名
features = ['date_idx'] + [f'MA_{w}' for w in windows]

# 3. 未来收益标签（可单独保存为 [日期, 股票] 的二维数组）
future_returns = df_panel_clean.shift(-return_days) / df_panel_clean - 1

# 将future_return 空值保存为return_mask掩码
return_mask = (future_returns.iloc[max_ma_days:-return_days].isna() | df_panel_clean.iloc[max_ma_days:-return_days].isna())
return_mask = ~return_mask
feature_matrix[:, :, -2] = return_mask.astype(float)  # 掩码作为倒数第二维特征
features.append('掩码')

# 将price t0 t+5空值设为1e-9，训练中用掩码避开
future_returns[(df_panel_clean.shift(-return_days).isna()) | (df_panel_clean.isna())] = -1e9
# 将特征矩阵中的 nan 替换为 -1e9，便于后续掩码处理
feature_matrix[np.isnan(feature_matrix)] = -1e9

future_returns = future_returns.iloc[max_ma_days:-return_days].values  # [日期, 股票]
feature_matrix[:, :, -1] = future_returns  # 未来收益作为最后一维特征
features.append('future_return')

assert feature_matrix.shape[0] == return_mask.shape[0] == future_returns.shape[0], "时间点数量不一致"
assert feature_matrix.shape[1] == return_mask.shape[1] == future_returns.shape[1], "股票数量不一致"


In [71]:
valid_ratio = 0.2
test_ratio = 0.2
# 计算划分索引
num_train = int(num_dates * (1 - valid_ratio - test_ratio))
num_valid = int(num_dates * valid_ratio)
num_test = num_dates - num_train - num_valid

valid_start_idx = num_train
test_start_idx = num_train + num_valid

print(f"训练集大小: {num_train}, 验证集大小: {num_valid}, 测试集大小: {num_test}")

训练集大小: 541, 验证集大小: 180, 测试集大小: 182


In [72]:
import yaml

# 保存
np.save('data/feature_matrix.npy', feature_matrix)
np.save('data/feature_names.npy', features)
np.save('data/stock_codes.npy', stock_codes)

# Load existing config
with open('configs/rank_lstm_config.yaml', 'r', encoding='utf-8') as f:
    config = yaml.safe_load(f)

# Update split info
config['data']['split_info'] = {
    'num_train': num_train,
    'num_valid': num_valid,
    'num_test': num_test,
    'valid_start_idx': valid_start_idx,
    'test_start_idx': test_start_idx,
}

# Save updated config
with open('configs/rank_lstm_config.yaml', 'w', encoding='utf-8') as f:
    yaml.dump(config, f, allow_unicode=True)