# PBC BigData/MLハンズオン
- S3上に配置したデータをAthenaを経由してSageMakerに取得する
- SageMaker上でデータの可視化やデータ加工を行う
- 広く利用されている機械学習ライブラリーである、Scikit-Learnを使ったモデル構築を試してみる
- 構築したモデルをSageMakerの推論Endpointとしてデプロイし、推論APIを構築する

# <font color="MediumSlateBlue">1. データを理解する</font>

In [None]:
!pip install awswrangler

In [None]:
import awswrangler
import pandas as pd
import numpy as np

%matplotlib inline
pd.set_option('display.max_rows', 2000)

In [None]:
# Python上からAthenaを操作するために便利なライブラリー AWS Data Wranglerを利用する
session = awswrangler.Session()

# S3に格納されたjson_salesの件数を確認する
# この関数はPandas DataFrameを戻すが、表示させるだけの場合は変数に格納する必要はない
session.pandas.read_sql_athena(
    sql="select count(*) as json_sales_count from json_sales",
    database="workshop"
)

In [None]:
# 少量のデータを取得してJupyter上で見てみる
session.pandas.read_sql_athena(
    sql="select * from json_sales where prod_id is not null limit 5",
    database="workshop"
)

In [None]:
# 後続処理で利用するデータは変数に格納する
# ここでは全期間のamaount_soldを日ごとに合計する処理をAthenaで行い、サマリーした結果をJupyter上で保持する
df_sales_daily = session.pandas.read_sql_athena(
    sql="select time_id, sum(amount_sold) as daily_sum from json_sales \
         where prod_id is not null \
         group by time_id order by time_id",
    database="workshop"
)
print('取得したデータの件数（日数）: {}'.format(df_sales_daily.shape[0]))

In [None]:
df_sales_daily.head(5)

In [None]:
# 月単位の売り上げを集計してグラフ化する
df_sales_daily['time_id'] = pd.to_datetime(df_sales_daily.time_id)
df_sales_daily.set_index('time_id', inplace=True)
df_sales_monthly = df_sales_daily.resample('M').sum()

df_sales_monthly.rename(columns={'daily_sum':'monthly_sum'}, inplace=True)
df_sales_monthly.plot(figsize=(12,5))

#  <font color="MediumSlateBlue">2. データを加工する</font>

In [None]:
# 過去の実績値の推移を入力データにするため、1ヶ月前、2ヶ月前、3ヶ月前、12ヶ月前の実績値を取得する
df_data = df_sales_monthly['2007-01-01':].copy()
df_data['monthly_sum'] = df_data.monthly_sum / 1000000
df_data['1month_ago'] = df_data.monthly_sum.shift(1)
df_data['2month_ago'] = df_data.monthly_sum.shift(2)
df_data['3month_ago'] = df_data.monthly_sum.shift(3)
df_data['12month_ago'] = df_data.monthly_sum.shift(12)

# 過去のデータをシフトしているため、期間の開始から12レコード分は欠損値が発生する
df_data.head(20)

In [None]:
# 欠損値が発生した部分のデータは削除する
df_data = df_data['2008-01-01':].copy()
df_data.head(5)

#  <font color="MediumSlateBlue">3. モデルを構築する</font>

In [None]:
from sklearn.linear_model import LinearRegression
from sklearn.metrics import r2_score

# モデルのトレーニングを行う鑵子を定義する
def train_lr_model(feature_cols, target_col, train_test_split):
    # 学習データとテストデータを分離
    train_x = df_data[:train_test_split][feature_cols].values
    train_y = df_data[:train_test_split][[target_col]].values
    train_index = df_data[:train_test_split].index
    test_x = df_data[train_test_split:][feature_cols].values
    test_y = df_data[train_test_split:][[target_col]].values
    test_index = df_data[train_test_split:].index

    # モデルのトレーニングを実行
    lr_model = LinearRegression(normalize=True)
    lr_model.fit(train_x, train_y)

    # テストデータに対して予測を実行
    test_pred = lr_model.predict(test_x)

    # 実績データと予測結果を結合して返却
    df_test = pd.DataFrame({'label': test_y[:,0], 
                            'pred': test_pred[:,0]}, index=test_index)
    df_train = pd.DataFrame({'label': train_y[:,0], 
                           }, index=train_index)
    df_result = pd.concat([df_train, df_test], sort=False).sort_index()

    return lr_model, df_test, df_result

# 1回目のモデル作成試行
feature_cols1 = ['1month_ago', '2month_ago', '3month_ago', '12month_ago']
target_col = 'monthly_sum'
train_test_split = '2011-12-31'

model1, df_test1, df_result1 = train_lr_model(feature_cols1, target_col, train_test_split)

In [None]:
print('r2', r2_score(df_test1.label, df_test1.pred))
df_result1.plot(figsize=(10,6))

### モデル1の予測傾向
- r2乗値は約0.76となった
- 2012年の予測はある程度実績に追随している
- 2013年は、2012年までの増加傾向を反映して上振れした予測値となっている

#  <font color="MediumSlateBlue">3-2. モデルの改善を試みる</font>

In [None]:
# 年々増えていたり、月ごとに周期的な動きをしている傾向を取り込みたい
# そのために特徴量を追加する

df_data.reset_index(inplace=True)

df_data['month'] = df_data.time_id.dt.month
starting_year = df_data.time_id.dt.year.min()
df_data['year_delta'] = df_data.time_id.dt.year - starting_year
df_data.set_index('time_id', inplace=True)

In [None]:
# もう一度モデルの学習を実行
feature_cols2 = ['1month_ago', '2month_ago', '3month_ago', '12month_ago', 'month', 'year_delta']
model2, df_test2, df_result2 = train_lr_model(feature_cols2, target_col, train_test_split)

In [None]:
print('r2', r2_score(df_test2.label, df_test2.pred))
df_result2.plot(figsize=(10,6))

### モデル2の予測傾向
- r2乗値は約0.84に増加して、全体としての予実差は改善された
- 2013年の上振れ傾向は是正されているが、2012年がやや下振れした予測となった

In [None]:
# モデルはどんなパラメーターが算出されたのか
def WriteCoef(model, feature_cols):
    [print('coefficient[', v, ']=', model.coef_[0][i]) for i, v in enumerate(feature_cols)]
    print('intercept = ', model.intercept_)
    
print('model1:')
WriteCoef(model1, feature_cols1)
print()
print('model2:')
WriteCoef(model2, feature_cols2)

#  <font color="MediumSlateBlue">4. モデルを推論用に展開する</font>

In [None]:
import sagemaker
from sagemaker import get_execution_role

# Sagemaker session object
sagemaker_session = sagemaker.Session()

# Sagemakerの実行ロールを取得
role = get_execution_role()

In [None]:
# S3上のデータ出力先を定義
default_bucket = sagemaker_session.default_bucket()
s3_prefix = 'sagemaker-handson'
s3_path = 's3://{}/{}/monthly_sum'.format(default_bucket, s3_prefix)

# 事前に出力先のS3パスを掃除しておく
session.s3.delete_objects(path=s3_path)

out_cols = ['monthly_sum', '1month_ago', '2month_ago', '3month_ago', '12month_ago', 'month', 'year_delta']
df_train_estimator = df_data[:train_test_split][out_cols]
df_test_estimator = df_data[train_test_split:][out_cols]

# Jupyter上で処理していたデータをS3にdumpする
data_on_s3 = session.pandas.to_csv(dataframe=df_train_estimator, path=s3_path, preserve_index=False )
data_on_s3

In [None]:
# S3上のデータ出力先を定義
default_bucket = sagemaker_session.default_bucket()
s3_prefix = 'sagemaker-handson'
s3_path = 's3://{}/{}/monthly_sum'.format(default_bucket, s3_prefix)

# 事前に出力先のS3パスを掃除しておく
session.s3.delete_objects(path=s3_path)

out_cols = ['monthly_sum', '1month_ago', '2month_ago', '3month_ago', '12month_ago', 'month', 'year_delta']
df_train_estimator = df_data[:train_test_split][out_cols]
df_test_estimator = df_data[train_test_split:][out_cols]

# Jupyter上で処理していたデータをS3にdumpする
data_on_s3 = session.pandas.to_csv(dataframe=df_data, path=s3_path, preserve_index=False )
data_on_s3

In [None]:
from sagemaker.sklearn.estimator import SKLearn

script_path = 'sklearn_monthly_sum.py'

sklearn = SKLearn(
    entry_point=script_path,
    train_instance_type="local",
    role=role,
    hyperparameters={'normalize': True})

【参考】大量のデータを使用する重い学習に、学習用の別インスタンスを利用する場合は以下のように記述する
```python
from sagemaker.sklearn.estimator import SKLearn

script_path = 'sklearn_monthly_sum.py'

sklearn = SKLearn(
    entry_point=script_path,
    train_instance_type="ml.c4.xlarge",
    role=role,
    sagemaker_session=sagemaker_session,
    hyperparameters={'normalize': True})
```

In [None]:
# S3に出力したデータを指定してモデルを学習させる
sklearn.fit({'train': s3_path})

In [None]:
# 学習が完了したので予測のためのpredictorを作成する
predictor = sklearn.deploy(initial_instance_count=1, instance_type="local")

【参考】実運用に利用するEndpointをdeployする際はインスタンスタイプの記述を変更する
```python
predictor = sklearn.deploy(initial_instance_count=1, instance_type="ml.m4.xlarge")
```

#  <font color="MediumSlateBlue">5. endpointを使用してpredictionを行う</font>

In [None]:
df_test_estimator.head(10)

In [None]:
# Estimatorに合わせたテストデータを作成する
test_x = df_test_estimator.values[:,1:]
test_y = df_test_estimator.values[:,0]

In [None]:
pred = predictor.predict(test_x)
df_test_result = pd.DataFrame({'label':test_y, 'pred':pred})

In [None]:
df_test_result.plot()