## インタラクティブ デモ 2: Data API を Jupyter ノートブックで使用して Amazon Redshift クラスターに接続する
このデモでは、Amazon SageMaker Jupyter ノートブックを Redshift クラスターに接続し、Python で Data API コマンドを実行します。 次のアクティビティを実行します。

* Redshift テーブルを作成する
* Amazon S3 バケットから株価データをロードします
* Data API を使用して Jupyter ノートブックからデータをクエリする

## 前提条件
このデモでは、次の Python モジュールと、現在の SQL ステートメントの実行が完了するのを待機する Amazon Redshift Data API のカスタム ウェイターが必要です。

In [None]:
%pip install --upgrade boto3

In [None]:
# These are libraries required for the demo activities.

import botocore.session as s
from botocore.exceptions import ClientError
import boto3.session
import json
import boto3
import sagemaker
import operator
from botocore.exceptions import WaiterError
from botocore.waiter import WaiterModel
from botocore.waiter import create_waiter_with_client

import pandas as pd
import numpy as np


# Create a custom waiter for the Amazon Redshift Data API to wait for the completed run of the current SQL statement.
waiter_name = 'DataAPIExecution'

delay=2
max_attempts=3

# Configure the waiter settings.
waiter_config = {
  'version': 2,
  'waiters': {
    'DataAPIExecution': {
      'operation': 'DescribeStatement',
      'delay': delay,
      'maxAttempts': max_attempts,
      'acceptors': [
        {
          "matcher": "path",
          "expected": "FINISHED",
          "argument": "Status",
          "state": "success"
        },
        {
          "matcher": "pathAny",
          "expected": ["PICKED","STARTED","SUBMITTED"],
          "argument": "Status",
          "state": "retry"
        },
        {
          "matcher": "pathAny",
          "expected": ["FAILED","ABORTED"],
          "argument": "Status",
          "state": "failure"
        }
      ],
    },
  },
}

## AWS Secrets Manager から データベース接続用の Secrets を取得し、Redshift クラスターとの接続を確立します。

AWS Secrets Manager から以下を取得する必要があります。
* クラスター識別子
* Secrets ARN
* データベース名
* データバケット

In [None]:
secret_name='demolab-secrets' # Replace the secret name with yours.
session = boto3.session.Session()
region = session.region_name

client = session.client(
        service_name='secretsmanager',
        region_name=region
    )

try:
    get_secret_value_response = client.get_secret_value(
            SecretId=secret_name
        )
    secret_arn=get_secret_value_response['ARN']

except ClientError as e:
    print("Error retrieving secret. Error: " + e.response['Error']['Message'])
    
else:
    # Depending on whether the secret is a string or binary, one of these fields will be populated.
    if 'SecretString' in get_secret_value_response:
        secret = get_secret_value_response['SecretString']
    else:
        secret = base64.b64decode(get_secret_value_response['SecretBinary'])
            
secret_json = json.loads(secret)

cluster_id = secret_json['dbClusterIdentifier']
db = secret_json['db']
s3_data_path = "s3://{}/data/stock_prices.csv".format(secret_json['dataBucket'])
print("Region: " + region + "\nCluster_id: " + cluster_id + "\nDB: " + db + "\nSecret ARN: " + secret_arn + "\ndata file location: " + s3_data_path)

# Create a Data API client and test it.
bc_session = s.get_session()

session = boto3.Session(
        botocore_session=bc_session,
        region_name=region,
    )

# Set up the Data API client.
client_redshift = session.client("redshift-data")
print("Data API client successfully loaded")

# List all the schemas in the current database `demolab`.
client_redshift.list_schemas(
    Database= db, 
    SecretArn= secret_arn, 
    ClusterIdentifier= cluster_id)["Schemas"]

## テーブルスキーマとテーブルを作成する
Data API を使用して、`stocksummary` スキーマと `stocks` テーブルを作成します。

In [None]:
# First, set the waiter when running a query to help you wait for the response.
waiter_model = WaiterModel(waiter_config)
custom_waiter = create_waiter_with_client(waiter_name, waiter_model, client_redshift)

# Script for schema create.
query_str = "create schema if not exists stocksummary;"

res = client_redshift.execute_statement(Database= db, SecretArn= secret_arn, Sql= query_str, ClusterIdentifier= cluster_id)
id=res["Id"]

# Waiter in try block and wait for DATA API to return.
try:
    custom_waiter.wait(Id=id)   
    print("Schema creation is successful.") 
except WaiterError as e:
    print (e)
    
desc=client_redshift.describe_statement(Id=id)
print("Status: " + desc["Status"] + ". Run time: %d milliseconds" %float(desc["Duration"]/pow(10,6)))

query_str = 'CREATE TABLE IF NOT EXISTS stocksummary.stocks (\
            Trade_Date VARCHAR(15) NOT NULL,\
            Ticker VARCHAR(5) NOT NULL,\
            High DECIMAL(8,2),\
            Low DECIMAL(8,2),\
            Open_value DECIMAL(8,2),\
            Close DECIMAL(8,1),\
            Volume DECIMAL(15),\
            Adj_Close DECIMAL(8,2) NOT NULL )\
            sortkey (Trade_Date);'

res = client_redshift.execute_statement(Database= db, SecretArn= secret_arn, Sql= query_str, ClusterIdentifier= cluster_id)
id=res["Id"]

try:
    custom_waiter.wait(Id=id)
    print("Table creation is successful.")
except WaiterError as e:
    print (e)
    
desc=client_redshift.describe_statement(Id=id)
print("Status: " + desc["Status"] + ". Run time: %d milliseconds" %float(desc["Duration"]/pow(10,6)))


## データのロード
ここで、Amazon S3 から `stocks` テーブルにデータをロードします。

In [None]:
redshift_iam_role = sagemaker.get_execution_role() 
print("IAM Role: " + redshift_iam_role)

# Set the 'delay' attribute of the waiter to 10 seconds for long-running COPY statement.
waiter_config["waiters"]["DataAPIExecution"]["delay"] = 10
waiter_model = WaiterModel(waiter_config)
custom_waiter = create_waiter_with_client(waiter_name, waiter_model, client_redshift)

query = "COPY stocksummary.stocks FROM '" + s3_data_path + "' IAM_ROLE '" + redshift_iam_role + "' CSV IGNOREHEADER 1;"

print("COPY query: " + query)
# Run COPY statements in parallel.
resp = client_redshift.execute_statement(Database= db, SecretArn= secret_arn, Sql= query, ClusterIdentifier= cluster_id)

print("Redshift COPY started ...")

id = resp["Id"]
print("\nID: " + id)

# Waiter in try block and wait for DATA API to return.
try:
    custom_waiter.wait(Id=id)
    print("Done waiting to finish Data API for the COPY statement.")
except WaiterError as e:
    print (e)

desc=client_redshift.describe_statement(Id=id)
print("[COPY] Status: " + desc["Status"] + ". Run time: %d milliseconds" %float(desc["Duration"]/pow(10,6)))

# Reset the 'delay' attribute of the waiter to 5 seconds for long-running COPY statement.
waiter_config["waiters"]["DataAPIExecution"]["delay"] = 5
waiter_model = WaiterModel(waiter_config)
custom_waiter = create_waiter_with_client(waiter_name, waiter_model, client_redshift)

## データのクエリ (インプレース分析)

Amazon Redshift Data API を使用して、インプレース データ分析を実行できます。

In [None]:
#1. Number of stock records in the dataset.

query_str = "select  count(*) as record_count from stocksummary.stocks"

res = client_redshift.execute_statement(Database= db, SecretArn= secret_arn, Sql= query_str, ClusterIdentifier= cluster_id)
print("Redshift Data API execution  started ...")
id = res["Id"]

# Waiter in try block and wait for DATA API to return.
try:
    custom_waiter.wait(Id=id)
    print("Done waiting to finish Data API.")
except WaiterError as e:
    print (e)
    
output=client_redshift.get_statement_result(Id=id)
nrows=output["TotalNumRows"]
ncols=len(output["ColumnMetadata"])
resultrows=output["Records"]

col_labels=[]
for i in range(ncols): col_labels.append(output["ColumnMetadata"][i]['label'])
                                              
# Load the results into a dataframe.
df = pd.DataFrame(np.array(resultrows), columns=col_labels)

# Reformatting the results before display.
for i in range(ncols): 
    df[col_labels[i]]=df[col_labels[i]].apply(operator.itemgetter('longValue'))

df

In [None]:
#2. Find out top 10 high stock prices for dis (Disney) ticker.

query_str = "select * from stocksummary.stocks \
where ticker = 'dis' \
order by adj_close desc limit 10;"

res = client_redshift.execute_statement(Database= db, SecretArn= secret_arn, Sql= query_str, ClusterIdentifier= cluster_id)
print("Redshift Data API execution  started ...")
id = res["Id"]

# Waiter in try block and wait for DATA API to return.
try:
    custom_waiter.wait(Id=id)
    print("Done waiting to finish Data API.")
except WaiterError as e:
    print (e)
    
output=client_redshift.get_statement_result(Id=id)
nrows=output["TotalNumRows"]
ncols=len(output["ColumnMetadata"])
resultrows=output["Records"]

col_labels=[]
for i in range(ncols): col_labels.append(output["ColumnMetadata"][i]['label'])
                                              
# Load the results into a dataframe.
df = pd.DataFrame(np.array(resultrows), columns=col_labels)

# Reformatting the results before display.
for i in range(ncols): 
    df[col_labels[i]]=df[col_labels[i]].apply(operator.itemgetter('stringValue'))

df

## チャレンジアクティビティ

銘柄 tsla (テスラ) の取引量が最も少ない 10 日を見つける

In [None]:
# Write your code here and run the cell.
# Hint - Except for the query, the rest of the code is the same as the previous cell.


## 以上です。