# 演示使用SageMaker/DeepAR预测上证50成分股票某日收盘价

这个notebook来源于 [DeepAR introduction notebook](https://github.com/awslabs/amazon-sagemaker-examples/blob/master/introduction_to_amazon_algorithms/deepar_synthetic/deepar_synthetic.ipynb). 

这里我们将通过预测上海证券交易所上证50的成分指数股票（50支股票）的收盘价格来演示SageMaker内置的DeepAR算法的使用方法。

我们会重点介绍以下内容：
* 准备数据集
* 使用SageMaker Python SDK训练一个DeepAR模型并部署
* 使用交互模式来调用我们部署好的模型

运行这个notebook将使用ml.c5.2xlarge机型来进行训练，推理使用ml.m4.xlarge机型。

更多DeepAR详细信息请参看[文档](https://docs.aws.amazon.com/sagemaker/latest/dg/deepar.html)或者[论文](https://arxiv.org/abs/1704.04110)。

In [None]:
from __future__ import print_function
from ipywidgets import interact, interactive, fixed, interact_manual
import ipywidgets as widgets
from ipywidgets import IntSlider, FloatSlider, Checkbox

%matplotlib inline

import sys
!{sys.executable} -m pip install -U s3fs

from urllib.request import urlretrieve
import zipfile
from dateutil.parser import parse
import json
from random import shuffle
import random
import datetime
import os

import boto3
import s3fs
import sagemaker
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt

In [None]:
# set random seeds for reproducibility
np.random.seed(42)
random.seed(42)

In [None]:
sagemaker_session = sagemaker.Session()

在开始之前，我们可以使用自己定义的一些属性来覆盖默认值：
- 训练和模型数据使用的。这个S3桶的位置应该和notebook运行的Notebook Instance或者SageMaker Studio在同一个AWS区域。
- IAM角色arn用来给训练和获取数据授权。要创建新的IAM角色请参看文档。

In [None]:
s3_bucket = sagemaker.Session().default_bucket()  # replace with an existing bucket if needed
s3_prefix = 'deepar-shstock-demo-notebook'    # prefix used for all data stored within the bucket

role = sagemaker.get_execution_role()             # IAM role to use by SageMaker

In [None]:
region = sagemaker_session.boto_region_name

s3_data_path = "s3://{}/{}/data".format(s3_bucket, s3_prefix)
s3_output_path = "s3://{}/{}/output".format(s3_bucket, s3_prefix)

接下来，我们配置在AWS区域范围内的容器镜像。

In [None]:
image_name = sagemaker.amazon.amazon_estimator.get_image_uri(region, "forecasting-deepar", "latest")

### 导入上证50成分股票的价格数据，上传的S3以便SageMaker使用

第一步，安装并导入第三方股票价格历史数据的开源库。我们将使用开源的[证券数据平台BaoStock](http://baostock.com/)提供的库来导入证券历史价格。

In [None]:
!{sys.executable} -m pip install -U baostock

import baostock as bs
lg = bs.login()

第二步，使用BaoStock导入上证50股票代码。

In [None]:
rs = bs.query_sz50_stocks()

sz50_stocks = []
while (rs.error_code == '0') & rs.next():
    # 获取一条记录，将记录合并在一起
    sz50_stocks.append(rs.get_row_data())

第三步，循环调用证券历史价格接口，调取50支股票的历史价格。

In [None]:
start = '2010-01-01'
end = '2020-01-01'
ts = pd.date_range(start, end)
data = pd.DataFrame(index=ts)
for s in sz50_stocks:
    rs = bs.query_history_k_data_plus(s[1], 'date, close', adjustflag='1', start_date=start, end_date=end)
    data_list = []
    while (rs.error_code == '0') & rs.next():
        data_list.append(rs.get_row_data())
    stock_code = s[1]+'-'+s[2]
    result = pd.DataFrame(data_list, columns=['date', stock_code])
    result['date'] = pd.to_datetime(result['date'])
    result[stock_code] = pd.to_numeric(result[stock_code])
    result = result.set_index('date').asfreq('D')
    data = pd.concat([data, result], axis=1, join='outer')


In [None]:
num_timeseries = data.shape[1]
# data_kw = data.resample('2H').sum() / 8
timeseries = []
for i in range(num_timeseries):
    timeseries.append(np.trim_zeros(data[~pd.isna(data.iloc[:,i])].iloc[:,i] , trim='f'))

我们把按照时间序列股票收盘价格用图表的形式展现出来，取10支股票。

In [None]:
fig, axs = plt.subplots(5, 2, figsize=(20, 20), sharex=True)
axx = axs.ravel()
for i in range(0, 10):
    timeseries[i].plot(ax=axx[i])
    axx[i].set_xlabel("date") 
    axx[i].set_ylabel("Close Price")
    axx[i].grid(which='minor', axis='x')

### 训练集和测试集划分


通常，通过查看保留测试集上的误差指标来评估模型或调整其超参数。在这里，我们将可用数据分为训练集和测试集，以评估训练后的模型。对于诸如分类和回归之类的标准机器学习任务，通常通过将样本随机分为训练集和测试集进行划分。但是，在时间序列预测中，我们基于按时间的序列对数据集进行训练/测试的划分却是非常重要的。

在此示例中，我们将保留每个时间序列的最后一部分以进行评估，并且仅将第一部分用作训练数据。

In [None]:
freq = 'D'

# we predict for 7 days
prediction_length = 7
# we also use 7 days as context length, this is the number of state updates accomplished before making predictions
context_length = 7

我们这里指定训练集的范围为：从2010年1月1日到2019年1月1日。

In [None]:
start_dataset = pd.Timestamp(start, freq=freq)
end_training = pd.Timestamp("2019-01-01", freq=freq)

DeepAR JSON格式需要每个时间序列作为一个JSON对象作为输入，在最简单的使用场景中每个时间序列只包含开始的时间戳（``start``）和一个数组（``target``）。更为复杂的使用场景DeepAR可以支持使用字段``dynamic_feat``指定时间序列特征和``cat``作为类别特征。

In [None]:
from datetime import timedelta

training_data = [
    {
        "start": str(start_dataset),
        "target": ts[start_dataset:end_training - timedelta(days=1)].tolist()  # We use -1, because pandas indexing includes the upper bound 
    }
    for ts in timeseries
]
print(len(training_data))

作为测试数据，我们将考虑扩展训练范围之外的时间序列：这些时间序列将用于计算训练模型的测试分数，方法是使用训练过的模型来预测其追踪的7天，并将预测与实际值进行比较。
为了评估我们在一周以上的模型性能，我们生成的测试数据超出了训练范围，延伸了1、2、3、4周。 这样，我们就可以对模型进行“滚动评估”。

In [None]:
num_test_windows = 4

test_data = [
    {
        "start": str(start_dataset),
        "target": ts[start_dataset:end_training + timedelta(days=k * prediction_length)].tolist()
    }
    for k in range(1, num_test_windows + 1) 
    for ts in timeseries
]
print(len(test_data))

现在让我们将字典写成DeepAR可以理解的`jsonlines`文件格式（它也支持gzip压缩的jsonlines和parquet）。

In [None]:
def write_dicts_to_file(path, data):
    with open(path, 'wb') as fp:
        for d in data:
            fp.write(json.dumps(d).encode("utf-8"))
            fp.write("\n".encode('utf-8'))

In [None]:
%%time
write_dicts_to_file("train.json", training_data)
write_dicts_to_file("test.json", test_data)

现在我们已经有了本地数据文件，我们将它们复制到DeepAR可以访问它们的S3上。由于连接速度的不同，这可能需要几分钟。

In [None]:
s3 = boto3.resource('s3')
def copy_to_s3(local_file, s3_path, override=False):
    assert s3_path.startswith('s3://')
    split = s3_path.split('/')
    bucket = split[2]
    path = '/'.join(split[3:])
    buk = s3.Bucket(bucket)
    
    if len(list(buk.objects.filter(Prefix=path))) > 0:
        if not override:
            print('File s3://{}/{} already exists.\nSet override to upload anyway.\n'.format(s3_bucket, s3_path))
            return
        else:
            print('Overwriting existing file')
    with open(local_file, 'rb') as data:
        print('Uploading file to {}'.format(s3_path))
        buk.put_object(Key=path, Body=data)

In [None]:
%%time
copy_to_s3("train.json", s3_data_path + "/train/train.json")
copy_to_s3("test.json", s3_data_path + "/test/test.json")

我们看一下在S3写入数据文件的情况。

In [None]:
s3filesystem = s3fs.S3FileSystem()
with s3filesystem.open(s3_data_path + "/train/train.json", 'rb') as fp:
    print(fp.readline().decode("utf-8")[:100] + "...")

现在所有需要处理的数据集我们都准备好了，可以调用DeepAR来训练模式并进行推理了。

### 训练模型

这里定义一个sagemaker的estimator来启动训练任务。

In [None]:
estimator = sagemaker.estimator.Estimator(
    sagemaker_session=sagemaker_session,
    image_name=image_name,
    role=role,
    train_instance_count=1,
    train_instance_type='ml.c5.2xlarge',
    base_job_name='deepar-shstock-demo',
    output_path=s3_output_path
)

接下来，我们需要为训练任务设置超参。 例如，使用的时间序列的频率，模型将在过去查看的数据点数，预测数据点数。 其他超参数涉及要训练的模型（层数，每层像元数，似然函数）和训练选项（历元数，批处理大小，学习率...）。 在这种情况下，我们为每个可选参数使用默认参数（您可以使用[Sagemaker Automated Model Tuning](https://aws.amazon.com/blogs/aws/sagemaker-automatic-model-tuning/)进行自动化调优） 。


In [None]:
hyperparameters = {
    "time_freq": freq,
    "epochs": "400",
    "early_stopping_patience": "40",
    "mini_batch_size": "64",
    "learning_rate": "5E-4",
    "context_length": str(context_length),
    "prediction_length": str(prediction_length)
}

In [None]:
estimator.set_hyperparameters(**hyperparameters)

我们准备开始培训任务。SageMaker将启动一个EC2实例，从S3下载数据，开始训练模型并保存训练后的模型。

如果像我们在本例中一样提供“测试”数据通道，则DeepAR还将在该测试中计算训练模型的准确性指标。这是通过预测测试集中每个时间序列的最后``prediction_length``个点并将其与时间序列的实际值进行比较来完成的。

**注意：下一个单元格可能需要几分钟才能完成，具体取决于数据大小，模型复杂性和训练选项。**


In [None]:
%%time
data_channels = {
    "train": "{}/train/".format(s3_data_path),
    "test": "{}/test/".format(s3_data_path)
}

estimator.fit(inputs=data_channels, wait=True)


由于在此示例中输入了测试集，因此将计算并记录预测的准确性度量标准（请参阅日志底部）。
您可以从[我们的文档](https://docs.aws.amazon.com/sagemaker/latest/dg/deepar.html)中找到这些指标的定义。 您可以使用这些参数优化参数并调整模型，也可以使用SageMaker的[Sagemaker Automated Model Tuning](https://aws.amazon.com/blogs/aws/sagemaker-automatic-model-tuning/)对模型进行自动调优。


### 创建endpoint和predictor

现在我们已经有了训练好的模型，可以使用部署的endpoint来进行预测了。

**注意：不要忘记在完成这个实验后删除endpoint，在notebook的最下面有一个cell用于专门删除这个endpoint。**


要查询endpoint并执行预测，我们可以定义以下工具类：可以使用`pandas.Series`对象而不是原始JSON字符串进行请求。

In [None]:
class DeepARPredictor(sagemaker.predictor.RealTimePredictor):
    
    def __init__(self, *args, **kwargs):
        super().__init__(*args, content_type=sagemaker.content_types.CONTENT_TYPE_JSON, **kwargs)
        
    def predict(self, ts, cat=None, dynamic_feat=None, 
                num_samples=100, return_samples=False, quantiles=["0.1", "0.5", "0.9"]):
        """Requests the prediction of for the time series listed in `ts`, each with the (optional)
        corresponding category listed in `cat`.
        
        ts -- `pandas.Series` object, the time series to predict
        cat -- integer, the group associated to the time series (default: None)
        num_samples -- integer, number of samples to compute at prediction time (default: 100)
        return_samples -- boolean indicating whether to include samples in the response (default: False)
        quantiles -- list of strings specifying the quantiles to compute (default: ["0.1", "0.5", "0.9"])
        
        Return value: list of `pandas.DataFrame` objects, each containing the predictions
        """
        prediction_time = ts.index[-1] + timedelta(days=1)
        quantiles = [str(q) for q in quantiles]
        req = self.__encode_request(ts, cat, dynamic_feat, num_samples, return_samples, quantiles)
        res = super(DeepARPredictor, self).predict(req)
        return self.__decode_response(res, ts.index.freq, prediction_time, return_samples)
    
    def __encode_request(self, ts, cat, dynamic_feat, num_samples, return_samples, quantiles):
        instance = series_to_dict(ts, cat if cat is not None else None, dynamic_feat if dynamic_feat else None)

        configuration = {
            "num_samples": num_samples,
            "output_types": ["quantiles", "samples"] if return_samples else ["quantiles"],
            "quantiles": quantiles
        }
        
        http_request_data = {
            "instances": [instance],
            "configuration": configuration
        }
        
        return json.dumps(http_request_data).encode('utf-8')
    
    def __decode_response(self, response, freq, prediction_time, return_samples):
        # we only sent one time series so we only receive one in return
        # however, if possible one will pass multiple time series as predictions will then be faster
        predictions = json.loads(response.decode('utf-8'))['predictions'][0]
        prediction_length = len(next(iter(predictions['quantiles'].values())))
        prediction_index = pd.date_range(prediction_time, freq=freq, periods=prediction_length)        
        if return_samples:
            dict_of_samples = {'sample_' + str(i): s for i, s in enumerate(predictions['samples'])}
        else:
            dict_of_samples = {}
        return pd.DataFrame(data={**predictions['quantiles'], **dict_of_samples}, index=prediction_index)

    def set_frequency(self, freq):
        self.freq = freq
        
def encode_target(ts):
    return [x if np.isfinite(x) else "NaN" for x in ts]        

def series_to_dict(ts, cat=None, dynamic_feat=None):
    """Given a pandas.Series object, returns a dictionary encoding the time series.

    ts -- a pands.Series object with the target time series
    cat -- an integer indicating the time series category

    Return value: a dictionary
    """
    obj = {"start": str(ts.index[0]), "target": encode_target(ts)}
    if cat is not None:
        obj["cat"] = cat
    if dynamic_feat is not None:
        obj["dynamic_feat"] = dynamic_feat        
    return obj

现在，我们可以部署模型并创建endpoint，可以使用我们的自定义DeepARPredictor类进行查询。

In [None]:
predictor = estimator.deploy(
    initial_instance_count=1,
    instance_type='ml.m4.xlarge',
    predictor_cls=DeepARPredictor)

### 进行预测并用图表展示结果

我们可以使用 `predictor` 对象产生预测。

In [None]:
predictor.predict(ts=timeseries[2], quantiles=[0.10, 0.5, 0.90]).head()

在下面，我们定义了一个绘图函数，用于查询模型并显示预测。

In [None]:
def plot(
    predictor, 
    target_ts, 
    cat=None, 
    dynamic_feat=None, 
    forecast_date=end_training, 
    show_samples=False, 
    plot_history=7 * 12,
    confidence=80
):
    print("calling served model to generate predictions starting from {}".format(str(forecast_date)))
    assert(confidence > 50 and confidence < 100)
    low_quantile = 0.5 - confidence * 0.005
    up_quantile = confidence * 0.005 + 0.5
        
    # we first construct the argument to call our model
    args = {
        "ts": target_ts[:forecast_date],
        "return_samples": show_samples,
        "quantiles": [low_quantile, 0.5, up_quantile],
        "num_samples": 100
    }


    if dynamic_feat is not None:
        args["dynamic_feat"] = dynamic_feat
        fig = plt.figure(figsize=(20, 6))
        ax = plt.subplot(2, 1, 1)
    else:
        fig = plt.figure(figsize=(20, 3))
        ax = plt.subplot(1,1,1)
    
    if cat is not None:
        args["cat"] = cat
        ax.text(0.9, 0.9, 'cat = {}'.format(cat), transform=ax.transAxes)

    # call the end point to get the prediction
    prediction = predictor.predict(**args)

    # plot the samples
    if show_samples: 
        for key in prediction.keys():
            if "sample" in key:
                prediction[key].plot(color='lightskyblue', alpha=0.2, label='_nolegend_')
                
                
    # plot the target
    target_section = target_ts[forecast_date-timedelta(days=plot_history):forecast_date+timedelta(days=prediction_length)]
    target_section.plot(color="black", label='target')
    
    # plot the confidence interval and the median predicted
    ax.fill_between(
        prediction[str(low_quantile)].index, 
        prediction[str(low_quantile)].values, 
        prediction[str(up_quantile)].values, 
        color="b", alpha=0.3, label='{}% confidence interval'.format(confidence)
    )
    prediction["0.5"].plot(color="b", label='P50')
    ax.legend(loc=2)    
    
    # fix the scale as the samples may change it
    ax.set_ylim(target_section.min() * 0.5, target_section.max() * 1.5)
    
    if dynamic_feat is not None:
        for i, f in enumerate(dynamic_feat, start=1):
            ax = plt.subplot(len(dynamic_feat) * 2, 1, len(dynamic_feat) + i, sharex=ax)
            feat_ts = pd.Series(
                index=pd.DatetimeIndex(start=target_ts.index[0], freq=target_ts.index.freq, periods=len(f)),
                data=f
            )
            feat_ts[forecast_date-plot_history:forecast_date+prediction_length].plot(ax=ax, color='g')

我们可以与先前定义的功能进行交互，以查看（未来）时间在任何时候的50支股票收盘价格的预测。

对于每个请求，可以通过动态调用我们的服务模型来获得预测。

您可以选择任何时间序列和任何预测日期，只需单击 `Run Interact` 即可从我们提供的endpoint生成预测并查看图表。


In [None]:
style = {'description_width': 'initial'}

In [None]:
@interact_manual(
    stock_id=IntSlider(min=0, max=49, value=91, style=style), 
    forecast_day=IntSlider(min=0, max=100, value=51, style=style),
    confidence=IntSlider(min=60, max=95, value=80, step=5, style=style),
    history_weeks_plot=IntSlider(min=1, max=20, value=1, style=style),
    show_samples=Checkbox(value=False),
    continuous_update=False
)
def plot_interact(stock_id, forecast_day, confidence, history_weeks_plot, show_samples):
    plot(
        predictor,
        target_ts=timeseries[stock_id],
        forecast_date=end_training + datetime.timedelta(days=forecast_day),
        show_samples=show_samples,
        plot_history=history_weeks_plot * 12 * 7,
        confidence=confidence
    )

### 删除endpoints

In [None]:
predictor.delete_endpoint()