# 概要

- 目的
    - AWS IoT Analyticsの理解を深める
    - Amazon SageMakerで異常検出モデル（ランダムカットフォレスト）をAWS IoT Analyticsのデータセットを使って学習する
- やること
    - AWS IoT Analyticsでチャネル、データストア、パイプライン、データセット(学習用とテスト用)を作成
    - 検証用データセットのデータをチャネルに入れる
    - データセットの作成処理を実行(クエリ実行)
    - ノートブックで学習用とテスト用データセットを読み込む
    - ランダムカットフォレストで異常値検出モデルを作る
    - 結果確認



## ノートブック実行に必要なパッケージをインストール

ノートブックを分けて各処理を書いているため、別のノートブックを実行することができるPapermillを使う。

- <a target="_blank" href="https://github.com/nteract/papermill">nteract/papermill: 📚 Parameterize, execute, and analyze notebooks</a>


In [None]:
!pip install papermill

## パラメータ定義

In [None]:
from os import path
# 処理ノートブック
notebook_dir = './notebook/'
# papermillでノートブックを実行した場合の出力結果があるノートブック
output_notebook_dir = './output_notebook/'


# 対象のURLを指定
data_source = 'https://raw.githubusercontent.com/numenta/NAB/master/data/realKnownCause/machine_temperature_system_failure.csv'
label_data_source = 'https://raw.githubusercontent.com/numenta/NAB/master/labels/raw/known_labels_v1.0.json'

# s3にデータを保存する場所定義
prefix = 'machine_temperature_iot'
bucket_name = 'bucket-name'

# モデル学習ジョブ名
base_job_name = 'machine-temperature-iot'

# モデル学習周辺でのパス定義
working_dir = './working/'
train_dataset_path = path.join(working_dir, 'train_dataset.csv')
test_dataset_path = path.join(working_dir, 'test_dataset.csv')
train_data_s3_path = path.join('s3://', bucket_name, prefix, 'train.csv')
test_data_s3_path = path.join('s3://', bucket_name, prefix, 'test.csv')
model_artifact_path = path.join('s3://', bucket_name, prefix)
transformed_data_s3_path = path.join('s3://', bucket_name, prefix)

# AWS IoT Analytics関連のリソース名
channel_name = prefix
pipeline_name = prefix
datastore_name = prefix
train_dataset_name = prefix+'_train'
test_dataset_name = prefix+'_test'

# データのシングリングサイズ(１つのデータポイントの長さ)
shingle_size = 12*24

# 前準備
## AWS IoT Analyticsのリソース作成

- チャネル、データストア、パイプライン、データセット


In [None]:
create_resource_parameters = dict(
    channel_name = channel_name,
    pipeline_name = pipeline_name,
    datastore_name = datastore_name,
    train_dataset_name = train_dataset_name,
    test_dataset_name = test_dataset_name,
)

In [None]:
import papermill as pm
from os import path
exec_notebook_name = 'create_resource.ipynb'
pm.execute_notebook(
    path.join(notebook_dir, exec_notebook_name),
    path.join(output_notebook_dir, exec_notebook_name),
    parameters = create_resource_parameters,
    kernel_name ='python3'
)

## AWS IoT Analyticsのチャネルへデータを入れる

In [None]:
put_data_parameters = dict(
    channel_name = channel_name,
    csv_data_source = data_source,
    data_split_size = 100
)

In [None]:
import papermill as pm
from os import path
exec_notebook_name = 'put_data.ipynb'
pm.execute_notebook(
    path.join(notebook_dir, exec_notebook_name),
    path.join(output_notebook_dir, exec_notebook_name),
    parameters = put_data_parameters,
    kernel_name ='python3'
)

## データセットの作成

データストアからデータを抽出してデータセットを作る。  
実行してから数分後にデータセットの作成が完了する。

In [None]:
import boto3
iota_client = boto3.client('iotanalytics')
for dataset_name in [train_dataset_name, test_dataset_name]:
    response = iota_client.create_dataset_content(datasetName=dataset_name)
    print(response)

# 異常検出モデルの作成
## 学習用とテスト用データセットをダウンロード
*データセットの作成が終わっていない場合はエラーが出るので注意*

In [None]:
dataset_params = [
    dict(
        dataset_name = train_dataset_name,
        file_path = train_dataset_path
    ),
    dict(
        dataset_name = test_dataset_name,
        file_path = test_dataset_path
    ),
]

In [None]:
import boto3
from urllib.request import urlretrieve

iota_client = boto3.client('iotanalytics')

for dataset_param in dataset_params:
    # データセットの情報を取得
    response = iota_client.get_dataset_content(datasetName = dataset_param['dataset_name'])
    # データセットの署名つきURLを取りだす
    data_uri = response['entries'][0]['dataURI']
    
    # ファイルデータを取り出す
    urlretrieve(data_uri, dataset_param['file_path'])
    

## データの前処理

In [None]:
preprocess_data_parameters_list = [('train', dict(
    dataset_path = train_dataset_path,
    data_s3_path = train_data_s3_path,
    label_data_source = '',
    shingle_size = shingle_size,
)),('test', dict(
    dataset_path = test_dataset_path,
    data_s3_path = test_data_s3_path,
    label_data_source = label_data_source,
    shingle_size = shingle_size,
))]

In [None]:
import papermill as pm
from os import path
exec_notebook_name = 'preprocess_data.ipynb'
for phase_name, preprocess_data_parameters in preprocess_data_parameters_list:
    pm.execute_notebook(
        path.join(notebook_dir, exec_notebook_name),
        path.join(output_notebook_dir, phase_name+'_'+exec_notebook_name),
        parameters = preprocess_data_parameters,
        kernel_name ='python3'
    )


## 学習
SageMakerで異常検出(ランダムカットフォレスト)モデルを学習させる

In [None]:
import sagemaker
hyperparameters = dict(
    num_samples_per_tree=256,
    num_trees=100,
    feature_dim=shingle_size
)
train_parameters = dict(
    train_s3_path = train_data_s3_path,
    test_s3_path = test_data_s3_path,
    execution_role = sagemaker.get_execution_role(),
    hyperparameters = hyperparameters,
    model_artifact_path = model_artifact_path,
    base_job_name = base_job_name,
)

In [None]:
import papermill as pm
from os import path
exec_notebook_name = 'train.ipynb'
pm.execute_notebook(
    path.join(notebook_dir, exec_notebook_name),
    path.join(output_notebook_dir, exec_notebook_name),
    parameters = train_parameters,
    kernel_name ='python3'
)

### 学習ジョブ名を取得

In [None]:
nb = pm.read_notebook(path.join(output_notebook_dir, exec_notebook_name))
training_job_name = nb.data['job_name']

## モデルの確認

In [None]:
confirm_model_parameters = dict(
    training_job_name = training_job_name,
    labeled_test_data_s3_path = test_data_s3_path,
    output_data_s3_path = transformed_data_s3_path,
    shingle_size = shingle_size    
)

In [None]:
import papermill as pm
from os import path
exec_notebook_name = 'confirm_model.ipynb'
pm.execute_notebook(
    path.join(notebook_dir, exec_notebook_name),
    path.join(output_notebook_dir, exec_notebook_name),
    parameters = confirm_model_parameters,
    kernel_name ='python3'
)

In [None]:
s3path = 's3://osawa-test-ml/machine_temperature_iot/test_transform.csv'
import pandas as pd
df = pd.read_csv(s3path, header=None)


# リソースの削除
IoT Analytics関連のリソースを削除

In [None]:
delete_resource_parameters = dict(
    channel_name = channel_name,
    pipeline_name = pipeline_name,
    datastore_name = datastore_name,
    train_dataset_name = train_dataset_name,
    test_dataset_name = test_dataset_name,
)

In [None]:
import papermill as pm
from os import path
exec_notebook_name = 'delete_resource.ipynb'
pm.execute_notebook(
    path.join(notebook_dir, exec_notebook_name),
    path.join(output_notebook_dir, exec_notebook_name),
    parameters = delete_resource_parameters,
    kernel_name ='python3'
)