# 使用 SageMaker 进行超参优化
在超参优化的过程中，可能会存在多个局部高低点，不同的超参数也可能存在不同的寻优路径。SageMaker 本身提供了贝叶斯超参优化的功能，这对于一些超参数量多、模型复杂、不适合网格搜索的场景非常试用。SageMaker 超参优化可以在几百次尝试之内快速找到一组对应局部的高低点超参。

这部分实验将展示如何定义一个 SageMaker 超参优化任务，并快速找到交易策略（以双均线策略为例）历史上的最优参数组合。

### 定义参数

我们首先定义环境变量：

In [1]:
import boto3

aws_account_id = boto3.client('sts').get_caller_identity().get('Account')
repository_name = 'sagemaker-hpo'
aws_region = 'us-east-1'
my_name = 'peng'
bucket = 'algo-trading-workshop-{}'.format(my_name)

### 获取数据

定义一个函数，通过 Athena 获取行情数据：

In [2]:
!pip install awswrangler

import awswrangler as wr

s3_output = wr.athena.create_athena_bucket()

def execute_query(database, sql):
    
    query_execution_id = wr.athena.start_query_execution(database=database, sql=sql)
    response = wr.athena.get_query_execution(query_execution_id=query_execution_id)
    wr.athena.wait_query(query_execution_id=query_execution_id)
    OutputLocation = response['ResultConfiguration']['OutputLocation']
    
    return OutputLocation



In [3]:
database = 'stock-data'
table = 'stock_day'
fields = '*'
ticker = '600519'
orderby = 'tradedate'
sort = 'ASC'

sql = f'''
SELECT {fields}
FROM "{database}"."{table}"
WHERE ticker='{ticker}'
AND tradedate>='2019-01-01'
ORDER BY {orderby}
{sort}
'''

output_location = execute_query(database, sql)
print(output_location)

s3://aws-athena-query-results-364198545638-us-east-1/7d8c94cc-df83-4cd9-af0f-a3a1d9b2c35c.csv


In [4]:
df = wr.s3.read_csv(path=[output_location])

df['ticker'] = df['ticker'].apply(lambda x: str(x))
df['ticker'] = df['ticker'].apply(lambda x: '0'*(6-len(x)) + x)
df['openprice'] = df['openprice'] * df['accumadjfactor'] / df['accumadjfactor'].iloc[-1]
df['closeprice'] = df['closeprice'] * df['accumadjfactor'] / df['accumadjfactor'].iloc[-1]
df['highestprice'] = df['highestprice'] * df['accumadjfactor'] / df['accumadjfactor'].iloc[-1]
df['lowestprice'] = df['lowestprice'] * df['accumadjfactor'] / df['accumadjfactor'].iloc[-1]
df = df[df['isopen'] == True]
df.drop('isopen', 1, inplace=True)
df.drop('accumadjfactor', 1, inplace=True)
df.set_index('tradedate', inplace=True)
df.sort_index(0, inplace=True)

df.rename(columns={'openprice': 'open'}, inplace=True)
df.rename(columns={'closeprice': 'close'}, inplace=True)
df.rename(columns={'highestprice': 'high'}, inplace=True)
df.rename(columns={'lowestprice': 'low'}, inplace=True)
df.rename(columns={'turnovervol': 'volume'}, inplace=True)
df['openinterest'] = 0 # A股回测中一般并不考虑利率，通常可以直接设为 0
df.head()

Unnamed: 0_level_0,ticker,secid,open,high,low,close,volume,turnovervalue,openinterest
tradedate,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1,Unnamed: 7_level_1,Unnamed: 8_level_1,Unnamed: 9_level_1
2019-01-02,600519,600519.SH,588.598322,590.547514,574.153066,577.983905,6228649.0,3754388000.0,0
2019-01-03,600519,600519.SH,578.939203,580.569963,565.265905,569.318682,3097735.0,1838179000.0,0
2019-01-04,600519,600519.SH,566.694027,586.369294,561.618406,580.898045,3768347.0,2258501000.0,0
2019-01-07,600519,600519.SH,586.687727,590.547514,581.110333,584.26571,3475013.0,2105981000.0,0
2019-01-08,600519,600519.SH,584.275359,590.547514,579.238336,583.590247,2883813.0,1750413000.0,0


将结果保存在这个实验专用的路径下，这个路径还将用于保存批量任务的输入和输出结果：

In [5]:
wr.s3.to_csv(df=df, path='s3://{}/{}/data.csv'.format(bucket, repository_name))

{'paths': ['s3://algo-trading-workshop-peng/sagemaker-hpo/data.csv'],
 'partitions_values': {}}

### 定义回测任务

使用 SageMaker 运行任务时，默认使用 /opt/ml/ 路径。任务运行时会默认运行 /opt/ml/code/ 路径下的 train 文件。因此我们需要对回测代码做一些简单改写：

In [6]:
!mkdir -p {repository_name}/model

In [7]:
%%writefile {repository_name}/model/train
#!/usr/bin/env python
from __future__ import (absolute_import, division, print_function,
                        unicode_literals)
from datetime import datetime
import json
import numpy as np
import pandas as pd
import os.path
import sys

import backtrader as bt

# 这里定义 SageMaker 任务默认的环境路径
prefix = '/opt/ml/'

config_path = os.path.join(prefix, 'input/config')
input_path = os.path.join(prefix, 'input/data/training')
data_path = os.path.join(input_path, 'data.csv')
output_path = os.path.join(prefix, 'output')
model_path = os.path.join(prefix, 'model')


class MyStrategy(bt.Strategy):
    ## 全局参数
    params=(('fastmaperiod', 10),
            ('slowmaperiod', 30),
            ('printlog', False),)

    ## 策略初始化
    def __init__(self):

        # 初始化交易指令、买卖价格和手续费
        self.order = None
        self.buyprice = None
        self.buycomm = None

        # 添加移动均线指标。Backtrader 集成了 talib，可以自动算出一些常见的技术指标
        self.fastma = bt.indicators.SimpleMovingAverage(self.datas[0], period=self.params.fastmaperiod)
        self.slowma = bt.indicators.SimpleMovingAverage(self.datas[0], period=self.params.slowmaperiod)
        
    ## 策略核心逻辑
    def next(self):
        # 记录收盘价
#         self.log('收盘价：%.2f' % self.datas[0].close[0])
        if self.order: # 检查是否有指令等待执行
            return
        # 检查是否持仓   
        if not self.position: # 如果没有持仓
            # 快线上穿慢线，执行买入
            if self.fastma[0] > self.slowma[0]:
                self.log('买入委托：%.2f' % self.datas[0].close[0])
                #执行买入
                self.size = int(self.broker.cash / self.datas[0].close[0])
                self.order = self.buy(size=self.size)
        else: # 
            # 快线下穿慢线，执行卖出
            if self.fastma[0] < self.slowma[0]:
                self.log('卖出委托：%.2f' % self.datas[0].close[0])
                #执行卖出
                self.order = self.sell(size=self.size)

    ## 4、日志记录
    # 交易记录日志（可选，默认不输出结果）
    def log(self, txt, dt=None, doprint=False):
        if self.params.printlog or doprint:
            dt = dt or self.datas[0].datetime.date(0)
            print(f'{dt.isoformat()},{txt}')

    # 记录交易执行情况（可选，默认不输出结果）
    def notify_order(self, order):
        # 如果 order 为 submitted/accepted，返回空
        if order.status in [order.Submitted, order.Accepted]:
            return
        # 如果 order 为 buy/sell executed，报告价格结果
        if order.status in [order.Completed]: 
            if order.isbuy():
                self.log(f'买入：\n价格：%.2f,\
                交易金额：-%.2f,\
                手续费：%.2f' % (order.executed.price, order.executed.value, order.executed.comm))
                self.buyprice = order.executed.price
                self.buycomm = order.executed.comm
            else:
                self.log(f'卖出:\n价格：%.2f,\
                交易金额：%.2f,\
                手续费：%.2f' % (order.executed.price, order.executed.price*self.size, order.executed.comm))
            self.bar_executed = len(self) 

        # 如果指令取消/交易失败, 报告结果
        elif order.status in [order.Canceled, order.Margin, order.Rejected]:
            self.log('交易失败')
        self.order = None

    # 记录交易收益情况（可省略，默认不输出结果）
    def notify_trade(self,trade):
        if not trade.isclosed:
            return
        self.log(f'策略收益：\n毛收益 {trade.pnl:.2f}, 净收益 {trade.pnlcomm:.2f}')

    # 回测结束后输出结果（可省略，默认输出结果）
    def stop(self):
        self.log('(MA均线： %2d日  %2d日) 期末总资金 %.2f' %
                 (self.params.fastmaperiod, self.params.slowmaperiod, self.broker.getvalue()), doprint=True)

        
if __name__ == '__main__':
    
    # 创建 Cerebro 对象
    cerebro = bt.Cerebro()

    df = pd.read_csv(data_path)
    df.set_index('tradedate', inplace=True)
    
    # 创建 Data Feed
    df.index = pd.to_datetime(df.index)
    start = df.index[0]
    end = df.index[-1]
    print(start, '-', end)
    data = bt.feeds.PandasData(dataname=df, fromdate=start, todate=end)
    # 将 Data Feed 添加至 Cerebro
    cerebro.adddata(data)

    # 添加策略 Cerebro
    try:
        with open("{}/hyperparameters.json".format(config_path)) as json_file:
            params = json.load(json_file)
        print('Parameter load success')
    except Exception as e:
        print(e)
    cerebro.addstrategy(MyStrategy, fastmaperiod=int(params['fastmaperiod']), slowmaperiod=int(params['slowmaperiod']), printlog=True)
    
    # 设置初始资金
    cerebro.broker.setcash(100000.0)
    # 设置手续费为万二
    cerebro.broker.setcommission(commission=0.0002) 

    # 在开始时 print 初始账户价值
    print('Starting Portfolio Value: %.2f' % cerebro.broker.getvalue())

    # 运行回测流程
    cerebro.run()

    # 在结束时 print 最终账户价值
    print('Final Portfolio Value: %.2f' % cerebro.broker.getvalue())
    print('Return: %.4f' % (float(cerebro.broker.getvalue())/1e5 - 1))

    sys.exit(0)

Writing sagemaker-hpo/model/train


### 创建镜像仓库和容器镜像

接下来构建一个 python3 容器镜像和镜像仓库：

In [8]:
ecr = boto3.client('ecr', region_name=aws_region)
s3 = boto3.client('s3', region_name=aws_region)

In [47]:
ecr.create_repository(repositoryName=repository_name)

{'repository': {'repositoryArn': 'arn:aws:ecr:us-east-1:364198545638:repository/sagemaker-hpo',
  'registryId': '364198545638',
  'repositoryName': 'sagemaker-hpo',
  'repositoryUri': '364198545638.dkr.ecr.us-east-1.amazonaws.com/sagemaker-hpo',
  'createdAt': datetime.datetime(2021, 12, 9, 8, 58, 17, tzinfo=tzlocal()),
  'imageTagMutability': 'MUTABLE',
  'imageScanningConfiguration': {'scanOnPush': False},
  'encryptionConfiguration': {'encryptionType': 'AES256'}},
 'ResponseMetadata': {'RequestId': 'bf685ac2-fd24-42e2-ad06-ac1d9d6573bb',
  'HTTPStatusCode': 200,
  'HTTPHeaders': {'x-amzn-requestid': 'bf685ac2-fd24-42e2-ad06-ac1d9d6573bb',
   'date': 'Thu, 09 Dec 2021 08:58:16 GMT',
   'content-type': 'application/x-amz-json-1.1',
   'content-length': '393'},
  'RetryAttempts': 0}}

In [10]:
%%writefile {repository_name}/Dockerfile
FROM python:3.6

RUN pip --no-cache-dir install \
    backtrader\
    boto3 \
    pandas

ENV PYTHONUNBUFFERED=TRUE
ENV PYTHONDONTWRITEBYTECODE=TRUE
ENV PATH="/opt/ml/code:${PATH}"

RUN mkdir -p /opt/ml/input/data
RUN mkdir -p /opt/ml/input/config
RUN mkdir -p /opt/ml/code
RUN mkdir -p /opt/ml/failure
RUN mkdir -p /opt/ml/model
RUN mkdir -p /opt/ml/output

COPY model /opt/ml/code
RUN chmod -R 777 /opt/ml/code
WORKDIR /opt/ml/code

Writing sagemaker-hpo/Dockerfile


In [11]:
!docker build {repository_name} -t {repository_name}

Sending build context to Docker daemon  15.36kB
Step 1/14 : FROM python:3.6
 ---> ea3a963a14d5
Step 2/14 : RUN pip --no-cache-dir install     backtrader    boto3     pandas
 ---> Using cache
 ---> d2a36b097292
Step 3/14 : ENV PYTHONUNBUFFERED=TRUE
 ---> Running in 1f41b94cadf8
Removing intermediate container 1f41b94cadf8
 ---> 4c79f5303d19
Step 4/14 : ENV PYTHONDONTWRITEBYTECODE=TRUE
 ---> Running in 07c9e2dac957
Removing intermediate container 07c9e2dac957
 ---> b6a9dd6f90c1
Step 5/14 : ENV PATH="/opt/ml/code:${PATH}"
 ---> Running in c3a197472a8b
Removing intermediate container c3a197472a8b
 ---> 31e04be56c44
Step 6/14 : RUN mkdir -p /opt/ml/input/data
 ---> Running in 99c67b959861
Removing intermediate container 99c67b959861
 ---> a7abe0b77afc
Step 7/14 : RUN mkdir -p /opt/ml/input/config
 ---> Running in 60ab5eed38ca
Removing intermediate container 60ab5eed38ca
 ---> 80f90688a516
Step 8/14 : RUN mkdir -p /opt/ml/code
 ---> Running in 25f1e3e09937
Removing intermediate container 25f

SageMaker 提供了本地测试功能。在提交 HPO 任务之前，可以先在本地尝试运行镜像：

In [None]:
!pip uninstall -y sagemaker
!pip install sagemaker==1.72.0
!pip install sagemaker
import sagemaker
from sagemaker.estimator import Estimator

role = sagemaker.get_execution_role()
session = sagemaker.Session()

In [None]:
# 默认参数
params = { 
    'fastmaperiod': 6,
    'slowmaperiod': 11
}

estimator = Estimator(
    repository_name,
    role=role,
    train_instance_count=1,
    train_instance_type='local',
    output_path='s3://{}/{}/output'.format(bucket, repository_name),
    base_job_name=repository_name, 
    hyperparameters=params,
)

estimator.fit('s3://{}/{}/data.csv'.format(bucket, repository_name))

测试成功之后再将镜像推送到 ECR：

In [16]:
!docker tag {repository_name} {aws_account_id}.dkr.ecr.{aws_region}.amazonaws.com/{repository_name}

In [45]:
!docker images

REPOSITORY                                                             TAG       IMAGE ID       CREATED             SIZE
364198545638.dkr.ecr.us-east-1.amazonaws.com/sagemaker-custom-rnn      latest    be842643bd29   5 minutes ago       1.92GB
sagemaker-custom-rnn                                                   latest    be842643bd29   5 minutes ago       1.92GB
364198545638.dkr.ecr.us-east-1.amazonaws.com/sagemaker-hpo             latest    7479592629f6   47 minutes ago      1.08GB
sagemaker-hpo                                                          latest    7479592629f6   47 minutes ago      1.08GB
364198545638.dkr.ecr.us-east-1.amazonaws.com/ecs-gridsearch            latest    198b80057241   About an hour ago   1.23GB
ecs-gridsearch                                                         latest    198b80057241   About an hour ago   1.23GB
364198545638.dkr.ecr.us-east-1.amazonaws.com/ecs-demo-php-simple-app   latest    b7100a00e52a   About an hour ago   529MB
ecs-demo-php-simple

In [18]:
!aws ecr get-login-password | docker login --username AWS --password-stdin {aws_account_id}.dkr.ecr.{aws_region}.amazonaws.com

https://docs.docker.com/engine/reference/commandline/login/#credentials-store

Login Succeeded


In [48]:
!docker push {aws_account_id}.dkr.ecr.{aws_region}.amazonaws.com/{repository_name}

Using default tag: latest
The push refers to repository [364198545638.dkr.ecr.us-east-1.amazonaws.com/sagemaker-hpo]

[1Bbbdf31c6: Preparing 
[1B3b0b5d14: Preparing 
[1Bde92c571: Preparing 
[1B01edd1ce: Preparing 
[1B0c6351a4: Preparing 
[1Bc5a0f407: Preparing 
[1B8e0b146c: Preparing 
[1Bef60b179: Preparing 
[1Bc5f9f96f: Preparing 
[1B46b4d89f: Preparing 
[1B703d63f9: Preparing 
[1B4bfcf7fc: Preparing 
[1B6d85febd: Preparing 
[1B1e7436b1: Preparing 
[1B1d7cb46b: Preparing 
[1B0de04cb3: Preparing 
[1B336bbfff: Preparing 
[5B1e7436b1: Pushed   537.7MB/528.4MB[18A[2K[16A[2K[14A[2K[13A[2K[12A[2K[10A[2K[9A[2K[10A[2K[9A[2K[11A[2K[12A[2K[9A[2K[8A[2K[9A[2K[6A[2K[10A[2K[6A[2K[7A[2K[6A[2K[7A[2K[9A[2K[6A[2K[9A[2K[10A[2K[6A[2K[7A[2K[6A[2K[7A[2K[9A[2K[7A[2K[7A[2K[7A[2K[9A[2K[10A[2K[7A[2K[9A[2K[5A[2K[6A[2K[5A[2K[7A[2K[5A[2K[6A[2K[7A[2K[5A[2K[5A[2K[5A[2K[7A[2K[5A[2K[7A[2K[7A[2K[10A[

### SageMaker 超参优化

接下来我们将定义一个超参优化任务。这首先，我们需要定义超参的类型和取值范围。在超参优化的过程中我们还需要定义一个目标参数（收益率）。SageMaker 将通过正则表达式从日志中提取目标参数所需的值，并判断最优的优化路径。这些配置信息通过 JSON 格式定义：

In [20]:
# 超参优化任务名称
from sagemaker.tuner import IntegerParameter, CategoricalParameter, ContinuousParameter, HyperparameterTuner

# 超参调优的参数配置
hyperparameter_ranges = {
    'fastmaperiod': IntegerParameter(5, 10),
    'slowmaperiod': IntegerParameter(10, 30),
}

# 目标参数的配置
metric_definitions = [
    {
        'Name': 'portfolio value', 
        'Regex': 'Final Portfolio Value: ([0-9\\.]+)'
    },
    {
        'Name': 'return', 
        'Regex': 'Return: ([0-9\\.]+)'
    }
]
objective_metric_name = "return"

In [23]:
estimator = Estimator(
    "{}.dkr.ecr.{}.amazonaws.com/{}".format(aws_account_id, aws_region, repository_name),
    role,
    train_instance_count=1,
    train_instance_type='ml.m5.large',
#     train_instance_type='ml.m5.medium',
    output_path='s3://{}/{}/output'.format(bucket, repository_name),
    sagemaker_session=session
)

from sagemaker.tuner import HyperparameterTuner

tuner = HyperparameterTuner(
    estimator, 
    objective_metric_name,
    hyperparameter_ranges,
    metric_definitions=metric_definitions,
    strategy='Bayesian',
    objective_type='Maximize', 
    max_jobs=100,
    max_parallel_jobs=4,
    base_tuning_job_name=repository_name,
    early_stopping_type='Auto'
)

tuner.fit('s3://{}/{}/data.csv'.format(bucket, repository_name))

Parameter image_name will be renamed to image_uri in SageMaker Python SDK v2.
's3_input' class will be renamed to 'TrainingInput' in SageMaker Python SDK v2.


在超参优化任务结束之后，我们可以通过以下将结果加载到一个 DataFrame 中，并随时查看优化的结果：

In [54]:
tuner_analytics = tuner.analytics()
tuning_result = tuner_analytics.dataframe().sort_values(['FinalObjectiveValue'], ascending=False)
# tuning_result.head(10)
tuning_result

Unnamed: 0,fastmaperiod,slowmaperiod,TrainingJobName,TrainingJobStatus,FinalObjectiveValue,TrainingStartTime,TrainingEndTime,TrainingElapsedTimeSeconds
76,7.0,20.0,sagemaker-hpo-211209-0819-024-f52b8fdd,Completed,1.4249,2021-12-09 08:42:16+00:00,2021-12-09 08:43:27+00:00,71.0
91,8.0,17.0,sagemaker-hpo-211209-0819-009-28c20220,Completed,1.4038,2021-12-09 08:29:39+00:00,2021-12-09 08:30:54+00:00,75.0
54,9.0,14.0,sagemaker-hpo-211209-0819-046-95c97de8,Completed,1.3440,2021-12-09 09:01:54+00:00,2021-12-09 09:03:09+00:00,75.0
97,7.0,18.0,sagemaker-hpo-211209-0819-003-6811f2f0,Completed,1.3268,2021-12-09 08:21:52+00:00,2021-12-09 08:23:03+00:00,71.0
92,8.0,18.0,sagemaker-hpo-211209-0819-008-99caad80,Completed,1.3009,2021-12-09 08:26:13+00:00,2021-12-09 08:27:23+00:00,70.0
...,...,...,...,...,...,...,...,...
59,7.0,15.0,sagemaker-hpo-211209-0819-041-9cacb6cd,Failed,,NaT,NaT,
60,6.0,21.0,sagemaker-hpo-211209-0819-040-5fe00377,Failed,,2021-12-09 08:58:02+00:00,2021-12-09 08:59:54+00:00,112.0
61,6.0,16.0,sagemaker-hpo-211209-0819-039-11a25a28,Failed,,2021-12-09 08:58:02+00:00,2021-12-09 08:59:02+00:00,60.0
62,10.0,19.0,sagemaker-hpo-211209-0819-038-49450938,Failed,,2021-12-09 08:57:47+00:00,2021-12-09 08:59:20+00:00,93.0


In [55]:
!pip uninstall -y sagemaker
!pip install sagemaker==1.72.0

Found existing installation: sagemaker 2.70.0
Uninstalling sagemaker-2.70.0:
  Successfully uninstalled sagemaker-2.70.0
Collecting sagemaker==1.72.0
  Using cached sagemaker-1.72.0-py2.py3-none-any.whl
Collecting smdebug-rulesconfig==0.1.4
  Using cached smdebug_rulesconfig-0.1.4-py2.py3-none-any.whl (10 kB)
Installing collected packages: smdebug-rulesconfig, sagemaker
  Attempting uninstall: smdebug-rulesconfig
    Found existing installation: smdebug-rulesconfig 1.0.1
    Uninstalling smdebug-rulesconfig-1.0.1:
      Successfully uninstalled smdebug-rulesconfig-1.0.1
Successfully installed sagemaker-1.72.0 smdebug-rulesconfig-0.1.4


可以观察下 SageMaker 是否很快就找到了最优参数组合呢？您可以将均线范围再扩大些，看看 SageMaker HPO 在实验组数更多的时候是否效果会比网格搜索更好