# SageMaker のノートブックから Athena を使ってデータの前処理を行う

## 概要

このノートブックでは、Amazon SageMaker上で Athena を実行して S3 上のデータを読み込み，結果データを pandas に取り込んでさらに処理を行うための例をおみせします．また，より大規模なデータを効率的に前処理するために，非同期な形で Athena を実行するやり方についても説明します

## セットアップ

### IAM のセットアップ

ノートブックインスタンスから Athena に接続できるようにするため，このインスタンスにアタッチしている IAM ロールに，以下のポリシーを付与する

- AmazonAthenaFullAccess

### テーブルの作成

Athena のマネジメントコンソールにいって，クエリエディタ上で以下の DDL を実行する．

```
CREATE EXTERNAL TABLE default.taxis (
     vendorid STRING,
     pickup_datetime TIMESTAMP,
     dropoff_datetime TIMESTAMP,
     ratecode INT,
     passenger_count INT,
     trip_distance DOUBLE,
     fare_amount DOUBLE,
     total_amount DOUBLE,
     payment_type INT
    )
PARTITIONED BY (YEAR INT, MONTH INT, TYPE string)
STORED AS PARQUET
LOCATION 's3://serverless-analytics/canonical/NY-Pub/'
```

上記テーブルはパーティションに分かれているため，続いて以下のコマンドを実行して，パーティションを認識させる．

```
MSCK REPAIR TABLE default.taxis
```

## Athena に対してインタラクティブなクエリを実行

### Athena のセットアップ

Python から Athena に接続するための PyAthenaJDBC モジュールをインストール．こちらは同期的な処理でクエリを実行するためのツールとなる．

In [None]:
!pip install PyAthenaJDBC

S3 から Athena に接続．最初に Athena クエリの結果を吐き出すディレクトリをセットする．デフォルトバケットと揃えるため，以下の `region` と `account_id` に現在のリージョン名とアカウント ID を入力して，Athena との接続をセットアップする．

In [None]:
from pyathenajdbc import connect

region = 'us-east-1' # 別のリージョンで実施する場合は，適宜変更する
account_id = '666254511816' #'YOUR-ACCOUNT-ID'

bucket_name = 'aws-athena-query-results-{}-{}'.format(account_id, region)
staging_dir = 's3://{}'.format(bucket_name)

con = connect(s3_staging_dir=staging_dir, region_name=region)

### 結果データを読み出して，ノートブック上で可視化

あとは Athena に対してインタラクティブなクエリを実行する．乗車回数および乗客者数の年月推移を可視化してみる．

In [None]:
query = """
select
  date_trunc('month', pickup_datetime) as date
  , count(1) as ride_count
  , sum(passenger_count) as passenger_count
from
  default.taxis
where
  year is not null
group by
  date_trunc('month', pickup_datetime)
order by
  date_trunc('month', pickup_datetime)
"""

cursor = con.cursor()
cursor.execute(query)
data = cursor.fetchall()

得られた結果データを pandas に突っ込んで，matplotlib で可視化してみる．

In [None]:
% matplotlib inline

import pandas as pd
import matplotlib.pyplot as plt

df = pd.DataFrame(data)
df.columns = ['date', 'ride_count', 'passenger_count']
df.index = pd.to_datetime(df.date)
df = df.drop('date', axis=1)
df.plot()

### 非同期にクエリを実行して結果を S3 に保存

上記の PyAthenaJDBC は，あくまでクエリを同期的な形で実行するものであるため，長時間のクエリ実行や，大きなデータの取得には向かない．ノートブックインスタンスでハンドリングするには大きすぎるデータを出力したい場合は，boto3 クライアント経由で，Athena API を直接叩く形で，非同期でのクエリ実行を行うのがよい．結果データは必ず S3 にも保存されるため，あとはそれを入力データとして，機械学習を行えば良い．

まずは `start_query_execution` API でクエリを実行．レスポンスを確認してみる．

In [None]:
import boto3
import pprint

athena = boto3.client('athena', region_name=region)
response = athena.start_query_execution(
    QueryString = query,
    QueryExecutionContext = {
        'Database': 'default'
    },
    ResultConfiguration = {
        'OutputLocation': staging_dir,
    }
)
pprint.pprint(response)

続いて `get_query_execution` API でクエリの進行状況を確認．ここでは JSON レスポンスのうち，クエリステータスの部分だけ取り出してみた．

In [None]:
import time
status = 'None'

while status != 'SUCCEEDED':
    query_status = athena.get_query_execution(
        QueryExecutionId=response['QueryExecutionId']
    )
    status = query_status['QueryExecution']['Status']['State']
    print(status)
    time.sleep(2)

全体を出力すると，以下のようになる．

In [None]:
pprint.pprint(query_status)

クエリ結果の保存先は，`s3://{OutputLocation}/{QueryExecutionId}.csv` となる．今回は，試しに手元に取ってきて，実際に結果が取得できるかを確認してみる．

In [None]:
s3 = boto3.resource('s3')

s3_key = '{}.csv'.format(response['QueryExecutionId'])
s3.Bucket(bucket_name).download_file(s3_key, 'output.csv')

さっきと同じ結果がローカルにあるかを確認．

In [None]:
!cat output.csv