In [1]:
import pandas as pd
from pyarrow import csv
from fastparquet import *

In [2]:
import torch
import numpy as np

## 读取数据

In [3]:
stock_parquet_url = 'E://Study/2023/dataset/CSMAR/stock.parquet.gzip'

In [4]:
stock_data = pd.read_parquet(stock_parquet_url)

In [7]:
stock_data.shape

(6436907, 21)

In [8]:
stock_data.iloc[1]

Stkcd                  000001
Trddt              2016-01-05
Opnprc                  11.27
Hiprc                   11.57
Loprc                   11.15
Clsprc                   11.4
Dnshrtrd             66326995
Dnvaltrd         755531353.72
Dsmvosd           134566222.2
Dsmvtll          163118907.98
Dretwd               0.006178
Dretnd               0.006178
Adjprcwd           942.851875
Adjprcnd           786.944484
Markettype                  4
Capchgdt           2015-05-20
Trdsta                      1
Ahshrtrd_D                NaN
Ahvaltrd_D                NaN
PreClosePrice           11.33
ChangeRatio          0.006178
Name: 1, dtype: object

## 预处理数据

### 1. 日期股票信息汇总

In [5]:
sectors = sorted(list(set([item for item in stock_data['Stkcd']])))
timestamps = sorted(list(set([item for item in stock_data['Trddt']])))
cap_chg_date = sorted(list(set([item for item in stock_data['Capchgdt']])))

sector_to_index = dict(zip(sectors, range(len(sectors))))
index_to_sector = dict(zip(range(len(sectors)), sectors))
timestamps_to_index = dict(zip(timestamps, range(len(timestamps))))
index_to_timestamps = dict(zip(range(len(timestamps)), timestamps))
cap_chg_date_to_index = dict(zip(cap_chg_date, range(len(cap_chg_date))))
index_to_cap_chg_date = dict(zip(range(len(cap_chg_date)), cap_chg_date))

In [10]:
index_to_timestamps[10]

'2016-01-18'

In [6]:
# 替换时间属性
stock_data['Trddt'] = stock_data['Trddt'].apply(lambda x: timestamps_to_index[x])
stock_data['Capchgdt'] = stock_data['Capchgdt'].apply(lambda x: cap_chg_date_to_index[x])
stock_data['idx'] = stock_data['Stkcd'].apply(lambda x: sector_to_index[x])

### 2. 按照个股划分训练集测试集和验证集

In [12]:
len(sector_to_index)

5275

In [13]:
df_bool = stock_data[stock_data['idx'] < 300]
df_bool.shape

(477991, 22)

In [7]:
train_size = 300
valid_size = 100
test_size = 100
# 每个都是用 len(size) - 1天预测最后一天的收盘价

In [8]:
# 训练集
train_data = stock_data[stock_data['idx'] < train_size]
valid_data = stock_data[(stock_data['idx'] >= train_size) & (stock_data['idx'] < train_size + valid_size)]
test_data = stock_data[(stock_data['idx'] >= train_size + valid_size) & (stock_data['idx'] < train_size + valid_size + test_size)]

In [16]:
train_data.shape, valid_data.shape, test_data.shape

((477991, 22), (163580, 22), (110732, 22))

In [9]:
# 每个股票按交易时间排序
def sort_by_trade_date(data):
    res = {}
    last_res = {}
    last_trade_date = {}
    cat_res = {}
    # 分组
    grouped_train_stock = data.groupby('Stkcd')
    for stock_id, group in grouped_train_stock:
        tmp = group.sort_values(by='Trddt')
        res['stock' + str(stock_id)] = tmp.iloc[-1]['Clsprc']
        last_res['stock' + str(stock_id)] = tmp.iloc[-2]['Clsprc']
        last_trade_date['stock' + str(stock_id)] = tmp.iloc[-1]['Trddt']
        if tmp.iloc[-2]['Clsprc'] < tmp.iloc[-1]['Clsprc']:
            cat_res['stock' + str(stock_id)] = 1
        else:
            cat_res['stock' + str(stock_id)] = 0

    return res, last_res, cat_res, last_trade_date

In [10]:
# 获取 label
train_label, train_last_label, train_cat_label, train_data_new = sort_by_trade_date(train_data)
valid_label, valid_last_label, valid_cat_label, valid_data_new = sort_by_trade_date(valid_data)
test_label, test_last_label, test_cat_label, test_data_new = sort_by_trade_date(test_data)

In [12]:
train_label['stock000001'], train_last_label['stock000001'], train_cat_label['stock000001'], train_data_new['stock000001']

(13.16, 13.03, 1, 1702)

In [11]:
from copy import deepcopy

labels = deepcopy(train_label)
labels.update(valid_label)
labels.update(test_label)

last_labels = deepcopy(train_last_label)
last_labels.update(valid_last_label)
last_labels.update(test_last_label)

cat_labels = deepcopy(train_last_label)
cat_labels.update(valid_last_label)
cat_labels.update(test_last_label)

len(labels), len(last_labels), len(cat_labels)

(500, 500, 500)

### 3. 生成 ground truth 文件

In [21]:
# 生成 ground_truth 文件
def generate_ground_truth(task_type='reg'):
    # 1. 回归任务直接生成
    if task_type == 'reg':
        with open('data/ground_truth_reg.csv', 'w') as f:
            [f.write('{0},{1}\n'.format(key, value)) for key, value in labels.items()]
    # 2. 分类任务需考虑上一天的收盘价，看预测的这一天是涨还是跌，即二分类问题
    else:
        with open('data/ground_truth_cat.csv', 'w') as f:
            [f.write('{0},{1}\n'.format(key, value)) for key, value in cat_labels.items()]

In [58]:
# 生成 ground_truth 文件
generate_ground_truth()
generate_ground_truth(task_type='cat')

### 4. 按照个股划分 records

In [13]:
import pathlib
import pickle

In [14]:
path = pathlib.Path('data')
record_path = path / 'records'
record_path.mkdir(exist_ok=True, parents=True)

In [79]:
stock_data.columns

Index(['Stkcd', 'Trddt', 'Opnprc', 'Hiprc', 'Loprc', 'Clsprc', 'Dnshrtrd',
       'Dnvaltrd', 'Dsmvosd', 'Dsmvtll', 'Dretwd', 'Dretnd', 'Adjprcwd',
       'Adjprcnd', 'Markettype', 'Capchgdt', 'Trdsta', 'Ahshrtrd_D',
       'Ahvaltrd_D', 'PreClosePrice', 'ChangeRatio'],
      dtype='object')

In [19]:
columns = ['Trddt', 'Opnprc', 'Hiprc', 'Loprc', 'Clsprc', 'Dnshrtrd',
       'Dnvaltrd', 'Dsmvosd', 'Dsmvtll', 'Dretwd', 'Dretnd', 'Adjprcwd',
       'Adjprcnd', 'Markettype', 'Capchgdt', 'Trdsta', 'Ahshrtrd_D',
       'Ahvaltrd_D', 'PreClosePrice', 'ChangeRatio']

In [16]:
stock_data.iloc[11]

Stkcd                  000001
Trddt                      11
Opnprc                  10.45
Hiprc                   10.78
Loprc                   10.41
Clsprc                  10.71
Dnshrtrd             50110908
Dnvaltrd          532074699.5
Dsmvosd          126421424.54
Dsmvtll          153245921.45
Dretwd               0.028818
Dretnd               0.028818
Adjprcwd           885.784525
Adjprcnd           739.313632
Markettype                  4
Capchgdt                  554
Trdsta                      1
Ahshrtrd_D                NaN
Ahvaltrd_D                NaN
PreClosePrice           10.41
ChangeRatio          0.028818
idx                         0
Name: 11, dtype: object

In [86]:
str(record_path) + '\\'

'data\\records\\'

In [17]:
def records_to_file(data):
    # 分组
    grouped_train_stock = data.groupby('Stkcd')
    # record 里只包括训练集
    for stock_id, group in grouped_train_stock:
        df = pd.DataFrame(group.sort_values(by='Trddt'), columns=columns).fillna(0)
        df.drop(df.index[-1], inplace=True)
        df.to_csv(record_path / f'stock{stock_id}.csv', header=True, index=False)

In [18]:
records_to_file(train_data)
records_to_file(valid_data)
records_to_file(test_data)

### 5. info 文件生成

In [74]:
attributes_info = pd.DataFrame(columns=['col', 'dtype'])

In [76]:
for col in columns:
    attributes_info.loc[len(attributes_info)] = [col, 'num']
attributes_info.to_csv(path / f'info.csv', header=True, index=False)

## todo：用GRU跑一下原始数据得到训练集，测试集，验证集的预测结果（这里直接用的原始数据）

In [77]:
with open('data/train_preds_reg.csv', 'w') as f:
    [f.write('{0},{1}\n'.format(key, value)) for key, value in train_label.items()]
with open('data/valid_preds_reg.csv', 'w') as f:
    [f.write('{0},{1}\n'.format(key, value)) for key, value in valid_label.items()]
with open('data/test_preds_reg.csv', 'w') as f:
    [f.write('{0},{1}\n'.format(key, value)) for key, value in test_label.items()]

In [88]:
with open('data/train_preds_cat.csv', 'w') as f:
    [f.write('{0},{1}\n'.format(key, value)) for key, value in train_cat_label.items()]
with open('data/valid_preds_cat.csv', 'w') as f:
    [f.write('{0},{1}\n'.format(key, value)) for key, value in valid_cat_label.items()]
with open('data/test_preds_cat.csv', 'w') as f:
    [f.write('{0},{1}\n'.format(key, value)) for key, value in test_cat_label.items()]

## 构造训练集测试集验证集以及对应标签

In [22]:
from tqdm import tqdm

In [37]:
def generate_data(data):
    # 分组
    columns.append("label")
    data_data = pd.DataFrame(columns=columns)
    i = 0
    grouped_train_stock = data.groupby('Stkcd')
    for stock_id, group in grouped_train_stock:
        df = pd.DataFrame(group.sort_values(by='Trddt'), columns=columns).fillna(0)
        df['label'] = stock_data['Clsprc'].shift(-1)
        df.drop(df.index[-1], inplace=True)
        df.reset_index(inplace=True)
        print(stock_id)
        for j in range(len(group) - 1):
            data_data.loc[i] = df.iloc[j].copy
            data_data.loc[i, 'Stkcd'] = stock_id
            i += 1
    return data_data

In [None]:
train_data_f = generate_data(train_data)
train_data_f.to_csv('./data/train_data.csv')

000001
000002
000004
000005
000006


### 1. 得到全部训练集按照30天划分的数据和label

In [None]:
grouped_train_stock = data.groupby('Stkcd')
for stock_id, group in grouped_train_stock:
    df = pd.DataFrame(group.sort_values(by='Trddt'), columns=columns).fillna(0)
    df['label'] = stock_data['Clsprc'].shift(-1)
    df.drop(df.index[-1], inplace=True)
    df.reset_index(inplace=True)