In [None]:
!pip install tushare # -i https://opentuna.cn/pypi/web/simple

In [1]:
import tushare as ts

import datetime
from datetime import date
import pandas as pd
import numpy as np
from sklearn.preprocessing import LabelEncoder
import pickle
import json

import boto3
import sagemaker

In [2]:
ts.set_token('1a1754d406d84b97ebb678b3cae9bfe3cbfaf4c0770f5409ae6e03b5')

pro = ts.pro_api()

In [3]:
#查询当前所有正常上市交易的股票列表

data = pro.stock_basic(exchange='', list_status='L', fields='ts_code,symbol,name,area,industry,list_date')

In [4]:
data['list_date'] = pd.to_datetime(data['list_date'], format='%Y%m%d')

In [5]:
data.shape

(4260, 6)

In [6]:
origin_data = data.copy()

In [7]:
origin_data.to_csv('stock_basic.csv', index=False)

In [8]:
data = data[:10]

In [9]:
data.head()

Unnamed: 0,ts_code,symbol,name,area,industry,list_date
0,000001.SZ,1,平安银行,深圳,银行,1991-04-03
1,000002.SZ,2,万科A,深圳,全国地产,1991-01-29
2,000004.SZ,4,国华网安,深圳,软件服务,1991-01-14
3,000005.SZ,5,世纪星源,深圳,环境保护,1990-12-10
4,000006.SZ,6,深振业A,深圳,区域地产,1992-04-27


In [10]:
start_time = '20200101'
end_time = date.today().strftime("%Y%m%d")

In [11]:
alldata = None

def get_daily(ts_code, start_date, end_date):
    global alldata
    df = pro.daily(ts_code=ts_code, start_date=start_date, end_date=end_date)
    if alldata is None:
        alldata = df
    else:
        alldata = pd.concat((alldata, df), axis=0)

_ = data['ts_code'].apply(lambda x: get_daily(x, start_time, end_time))

In [12]:
alldata['trade_date'] = pd.to_datetime(alldata['trade_date'], format='%Y%m%d')

In [13]:
def get_list_day(ts_code, trade_date):
    list_date = data[data['ts_code'] == ts_code]['list_date']
    if list_date.shape[0] > 0:
        list_date = list_date.iloc[0]
    else:
        return None
    list_day = (trade_date-list_date).days
    return list_day

alldata['list_day'] = alldata.apply(lambda x: get_list_day(x['ts_code'], x['trade_date']), axis=1)

In [14]:
alldata.shape

(3139, 12)

In [15]:
alldata.head()

Unnamed: 0,ts_code,trade_date,open,high,low,close,pre_close,change,pct_chg,vol,amount,list_day
0,000001.SZ,2021-04-20,21.08,21.95,20.92,21.69,21.15,0.54,2.5532,834772.72,1798249.024,10975
1,000001.SZ,2021-04-19,20.03,21.24,19.91,21.15,20.26,0.89,4.3929,1112282.62,2304021.478,10974
2,000001.SZ,2021-04-16,20.36,20.43,19.81,20.26,20.36,-0.1,-0.4912,729998.26,1466509.057,10971
3,000001.SZ,2021-04-15,20.76,20.77,20.08,20.36,20.67,-0.31,-1.4998,735865.05,1494016.04,10970
4,000001.SZ,2021-04-14,20.79,20.94,20.42,20.67,20.78,-0.11,-0.5294,537438.49,1113001.541,10969


In [16]:
alldata.describe()

Unnamed: 0,open,high,low,close,pre_close,change,pct_chg,vol,amount,list_day
count,3139.0,3139.0,3139.0,3139.0,3139.0,3139.0,3139.0,3139.0,3139.0,3139.0
mean,11.504817,11.743778,11.288611,11.516903,11.516301,0.000602,0.034724,377981.5,565625.8,10437.597324
std,9.411287,9.616231,9.212036,9.412513,9.413363,0.478168,2.964855,487745.5,964899.0,516.142814
min,1.83,1.85,1.76,1.82,1.82,-4.16,-10.101,4300.0,2777.865,8833.0
25%,3.79,3.885,3.74,3.81,3.81,-0.11,-1.4081,56087.03,34587.63,10299.0
50%,8.0,8.13,7.85,8.0,8.0,0.0,0.0,145248.8,84509.77,10551.0
75%,17.5,17.855,17.195,17.5,17.49,0.09,1.228,579005.0,632479.3,10773.5
max,48.0,49.0,42.63,44.77,44.77,4.07,10.1639,4711461.0,8382664.0,11089.0


In [17]:
alldata.to_csv(start_time+'_'+end_time+'.csv', index=False)

In [18]:
freq = '1D'
prediction_length = 7
context_length = 365

id_feature = 'ts_code'
label_feature = 'close'
time_feature = 'trade_date'
sparse_features = ['area', 'industry']
dynamic_dense_features = ['list_day']

start_time = alldata[time_feature].min()
end_time = alldata[time_feature].max()
print('start_time:', start_time)
print('end_time:', end_time)

start_time: 2020-01-02 00:00:00
end_time: 2021-04-20 00:00:00


In [19]:
for sparse_feature in sparse_features:
    print(sparse_feature+':', len(data[sparse_feature].unique()), data[sparse_feature].unique()[:5], '... na:', sum(data[sparse_feature].isna()))

area: 2 ['深圳' '北京'] ... na: 0
industry: 9 ['银行' '全国地产' '软件服务' '环境保护' '区域地产'] ... na: 0


In [20]:
%%time

ids = []
data_group = alldata.groupby(id_feature)
cnt = 0
for name, group in data_group:
    if cnt % 1000 == 0:
        print('cnt:', cnt)
    cnt += 1
    # print(name)
    # print(group)
    new_name = str(name)
    # print(new_name)
    ids.append(new_name)

num_timeseries = len(ids)
print('num_timeseries:', num_timeseries)

cnt: 0
num_timeseries: 10
CPU times: user 1.93 ms, sys: 385 µs, total: 2.32 ms
Wall time: 2.05 ms


In [21]:
ids

['000001.SZ',
 '000002.SZ',
 '000004.SZ',
 '000005.SZ',
 '000006.SZ',
 '000007.SZ',
 '000008.SZ',
 '000009.SZ',
 '000010.SZ',
 '000011.SZ']

In [22]:
def get_timeseries(df, dense_feature):
    df_group = df.groupby(id_feature)
    dense_df = pd.DataFrame({time_feature: [start_time, end_time]})
    dense_df.set_index(time_feature, inplace=True)
    dense_df = dense_df.resample(freq).asfreq()
    # print(dense_df)
    for name, group in df_group:
#         print(name)
        tmp_df = pd.DataFrame({name: group[dense_feature], time_feature:group[time_feature]})
        tmp_df.set_index(time_feature, inplace=True)
        if dense_feature == label_feature:
            tmp_df = tmp_df.resample(freq).sum()  # aggregate
        else:
            tmp_df = tmp_df.resample(freq).mean()  # aggregate
        # print(tmp_df)
        dense_df = dense_df.join(tmp_df)
    if dense_feature == label_feature:
        dense_df = dense_df.resample(freq).sum()  # aggregate
    else:
        dense_df = dense_df.resample(freq).mean()  # aggregate
        # TODO fill NaN
        dense_df = dense_df.replace([np.inf, -np.inf], np.nan)
        dense_df.fillna(method='ffill', inplace=True)
        dense_df.fillna(method='bfill', inplace=True)
        dense_df.fillna(0, inplace=True)
    print('dense_df.shape:', dense_df.shape)
    
    timeseries = []
    for i in range(num_timeseries):
        dfi = dense_df.iloc[:,i]
        timeseries.append(dfi)
    # print(timeseries)
    return timeseries

In [23]:
timeseries = get_timeseries(alldata, label_feature)

dense_df.shape: (475, 10)


In [24]:
dynamic_dense_timeseries = []
for dense_feature in dynamic_dense_features:
    print(dense_feature)
    dense_timeseries = get_timeseries(alldata, dense_feature)
    dynamic_dense_timeseries.append(dense_timeseries)

list_day
dense_df.shape: (475, 10)


In [25]:
property_cats = []

ids_df = pd.DataFrame({id_feature: ids})

for sparse_feature in sparse_features:
    le = LabelEncoder()
    new_data = ids_df.merge(data, how='left', on=id_feature)
#     print(new_data)
    features_arr = le.fit_transform(new_data[sparse_feature])
    property_cats.append(features_arr.tolist())
    le_classes = le.classes_.tolist()
    print(sparse_feature, 'features_arr:', len(le_classes))
    pickle.dump(le, open((sparse_feature+'_le.pickle').replace('/', '_'), 'wb'))

area features_arr: 2
industry features_arr: 9


In [26]:
property_cats

[[1, 1, 1, 1, 1, 1, 0, 1, 1, 1], [8, 0, 5, 3, 1, 7, 6, 4, 2, 1]]

In [27]:
DATETIME_START_OF_TRAIN = start_time
DATETIME_END_OF_TRAIN = end_time+datetime.timedelta(days=1)-datetime.timedelta(days=2*prediction_length)
DATETIME_START_OF_TEST = DATETIME_END_OF_TRAIN
DATETIME_END_OF_TEST = end_time+datetime.timedelta(days=1)-datetime.timedelta(days=prediction_length)
DATETIME_START_OF_PREDICT = DATETIME_END_OF_TEST
DATETIME_END_OF_PREDICT = end_time+datetime.timedelta(days=1)

In [28]:
start_dataset = pd.Timestamp(DATETIME_START_OF_TRAIN, freq=freq)
end_training = pd.Timestamp(DATETIME_END_OF_TRAIN, freq=freq)
start_test = pd.Timestamp(DATETIME_START_OF_TEST, freq=freq)
end_test = pd.Timestamp(DATETIME_END_OF_TEST, freq=freq)
start_predict = pd.Timestamp(DATETIME_START_OF_PREDICT, freq=freq)
end_predict = pd.Timestamp(DATETIME_END_OF_PREDICT, freq=freq)
print('start_dataset:', start_dataset)
print('end_training:', end_training)
print('start_test:', start_test)
print('end_test:', end_test)
print('start_predict:', start_predict)
print('end_predict:', end_predict)

start_dataset: 2020-01-02 00:00:00
end_training: 2021-04-07 00:00:00
start_test: 2021-04-07 00:00:00
end_test: 2021-04-14 00:00:00
start_predict: 2021-04-14 00:00:00
end_predict: 2021-04-21 00:00:00


In [29]:
training_data = [
    {
        "start": str(timeseries[i].index[0]),
        "target": timeseries[i][start_dataset:end_training][:-1].tolist(),  # We use -1, because pandas indexing includes the upper bound 
        "dynamic_feat": [dense_timeseries[i][start_dataset:end_training][:-1].tolist() for dense_timeseries in dynamic_dense_timeseries],
        "cat": [property_cat[i] for property_cat in property_cats],
        "id": ids[i]
    }
    for i in range(num_timeseries)
]
print(len(training_data), len(timeseries[0][start_dataset:end_training][:-1].tolist()), len(dense_timeseries[0][start_dataset:end_training][:-1].tolist()))

10 461 461


In [30]:
test_data = [
    {
        "start": str(timeseries[i].index[0]),
        "target": timeseries[i][start_dataset:end_test][:-1].tolist(),
        "dynamic_feat": [dense_timeseries[i][start_dataset:end_test][:-1].tolist() for dense_timeseries in dynamic_dense_timeseries],
        "cat": [property_cat[i] for property_cat in property_cats],
        "id": ids[i]
    }
    for i in range(num_timeseries)
]
print(len(test_data), len(timeseries[0][start_dataset:end_test][:-1].tolist()), len(dense_timeseries[0][start_dataset:end_test][:-1].tolist()))

10 468 468


In [31]:
predict_data = [
    {
        "start": str(timeseries[i].index[0]),
        "target": timeseries[i][start_dataset:end_predict].tolist(),
        "dynamic_feat": [dense_timeseries[i][start_dataset:end_predict].tolist() for dense_timeseries in dynamic_dense_timeseries],
        "cat": [property_cat[i] for property_cat in property_cats],
        "id": ids[i]
    }
    for i in range(num_timeseries)
]
print(len(predict_data), len(timeseries[0][start_dataset:end_predict].tolist()), len(dense_timeseries[0][start_dataset:end_predict].tolist()))

10 475 475


In [32]:
def write_dicts_to_file(path, data):
    with open(path, 'wb') as fp:
        for d in data:
            fp.write(json.dumps(d).replace('NaN', '"NaN"').encode("utf-8"))
            fp.write("\n".encode('utf-8'))

In [33]:
%%time
write_dicts_to_file("train_"+freq+".json", training_data)
write_dicts_to_file("test_"+freq+".json", test_data)
write_dicts_to_file("predict_"+freq+".json", predict_data)

CPU times: user 7.44 ms, sys: 0 ns, total: 7.44 ms
Wall time: 6.7 ms


In [34]:
s3 = boto3.resource('s3')
def copy_to_s3(local_file, s3_path, override=False):
    assert s3_path.startswith('s3://')
    split = s3_path.split('/')
    bucket = split[2]
    path = '/'.join(split[3:])
    buk = s3.Bucket(bucket)
    
    if len(list(buk.objects.filter(Prefix=path))) > 0:
        if not override:
            print('File s3://{}/{} already exists.\nSet override to upload anyway.\n'.format(s3_bucket, s3_path))
            return
        else:
            print('Overwriting existing file')
    with open(local_file, 'rb') as data:
        print('Uploading file to {}'.format(s3_path))
        buk.put_object(Key=path, Body=data)

In [35]:
sagemaker_session = sagemaker.Session()
role = sagemaker.get_execution_role()             # IAM role to use by SageMaker
region = sagemaker_session.boto_region_name

s3_bucket = sagemaker_session.default_bucket()  # replace with an existing bucket if needed
s3_prefix = 'time_series_forecast'    # prefix used for all data stored within the bucket
s3_data_path = "s3://{}/{}/data".format(s3_bucket, s3_prefix)

In [None]:
%%time
copy_to_s3("train_"+freq+".json", s3_data_path + "/train/train_"+freq+".json", override=True)
copy_to_s3("test_"+freq+".json", s3_data_path + "/test/test_"+freq+".json", override=True)
copy_to_s3("predict_"+freq+".json", s3_data_path + "/predict/predict_"+freq+".json", override=True)