In [83]:
import boto3
from botocore.exceptions import WaiterError
from botocore.waiter import WaiterModel
from botocore.waiter import create_waiter_with_client
from sqlalchemy import text
import operator
import pandas as pd
import numpy as np

redshift_data = boto3.client('redshift-data')
redshift = boto3.client('redshift')
forecast = boto3.client('forecast')
iam = boto3.client('iam')

s3_export_path = "s3://aws-forecast-demo-examples/aws_forecast_export_jobs/"
predictor_export_name = 'manning_full_csv_predictor_backtest_export'
forecast_export_name = 'manning_full_csv_forecast_90h'
role_name = "AmazonForecast-ExecutionRole-1645141603603"

role_arn = iam.get_role(RoleName=role_name)['Role']['Arn']
predictor_arn = forecast.list_predictors()['Predictors'][0]['PredictorArn']


### Create Export Jobs

In [14]:

predictor_export = forecast.create_predictor_backtest_export_job(
    PredictorBacktestExportJobName=predictor_export_name,
    PredictorArn=predictor_arn,
    Destination={
        'S3Config': {
            'Path': s3_export_path,
            'RoleArn': role_arn,
        }
    }
)


forecast_export = forecast.create_forecast_export_job(
    ForecastExportJobName=forecast_export_name,
    ForecastArn=forecast_arn,
    Destination={
        'S3Config': {
            'Path': s3_export_path,
            'RoleArn': role_arn,
        }
    }
)

predictor_export_arn = predictor_export['PredictorBacktestExportJobArn']
print("")
forecast_export_arn = forecast_export['ForecastExportJobArn']




### Check job status

Check job status and wait for the status to show 'Active' for all before progressing to next step

In [36]:
import time

forecast_status = None
predictor_status = None
status = {forecast_status, predictor_status}
while "ACTIVE" not in status:
    time.sleep(20)
    forecast_status = forecast.describe_forecast_export_job(ForecastExportJobArn=forecast_export_arn)['Status']
    predictor_status = forecast.describe_predictor_backtest_export_job(PredictorBacktestExportJobArn=predictor_export_arn)['Status']
    print(f"Forecast export job status: {forecast_status}")
    print(f"Predictor export job status: {predictor_status}")
    status = {forecast_status, predictor_status}

Forecast export job status: ACTIVE
Predictor export job status: ACTIVE


### Delete export job resources 

In [37]:
response = forecast.delete_forecast_export_job(
    ForecastExportJobArn=forecast_export_arn
)


response = forecast.delete_predictor_backtest_export_job(
    PredictorBacktestExportJobArn=predictor_export_arn
)



### Redshift query

In [50]:
cluster_id = "<enter-cluster-id>""
db_user="<enter-temp-user>"
db = 'dev'
response = redshift.resume_cluster(ClusterIdentifier=cluster_id)

In [61]:

redshift_data.list_schemas(
    Database= db, 
    DbUser=db_user,
    ClusterIdentifier= cluster_id)["Schemas"]

['catalog_history',
 'information_schema',
 'pg_automv',
 'pg_catalog',
 'pg_internal',
 'public',
 'spectrum']

In [97]:
with open("../redshift/external_tables.sql") as file:
    query_list = text(file.read())
print(query_list)
query_list = str(query_list).replace('\n', ' ').split(';')


drop schema if exists spectrum_manning;

create external schema spectrum_manning
from data catalog
database 'spectrumdb'
iam_role 'arn:aws:iam::376337229415:role/myspectrum_role'
create external database if not exists;

drop table IF EXISTS spectrum_manning.manning;

create external table spectrum_manning.manning  (item_id smallint, timestamp date, target_value float)
row format delimited
fields terminated by '\t'
stored as textfile
location 's3://aws-forecast-demo-examples/glue_prep_for_aws_forecast/';

SELECT COUNT(*)
FROM spectrum_manning.manning


In [98]:
final_query_list = []
for query in query_list:
    final_query_list.append(query.strip(' '))

if '' in final_query_list:
    final_query_list.remove('')
final_query_list

['drop schema if exists spectrum_manning',
 "create external schema spectrum_manning from data catalog database 'spectrumdb' iam_role 'arn:aws:iam::376337229415:role/myspectrum_role' create external database if not exists",
 'drop table IF EXISTS spectrum_manning.manning',
 "create external table spectrum_manning.manning  (item_id smallint, timestamp date, target_value float) row format delimited fields terminated by '\\t' stored as textfile location 's3://aws-forecast-demo-examples/glue_prep_for_aws_forecast/'",
 'SELECT COUNT(*) FROM spectrum_manning.manning']

In [99]:
# Create custom waiter for the Redshift Data API to wait for finish execution of current SQL statement
waiter_name = 'DataAPIExecution'
delay=2
max_attempts=3

#Configure the waiter settings
waiter_config = {
  'version': 2,
  'waiters': {
    'DataAPIExecution': {
      'operation': 'DescribeStatement',
      'delay': delay,
      'maxAttempts': max_attempts,
      'acceptors': [
        {
          "matcher": "path",
          "expected": "FINISHED",
          "argument": "Status",
          "state": "success"
        },
        {
          "matcher": "pathAny",
          "expected": ["PICKED","STARTED","SUBMITTED"],
          "argument": "Status",
          "state": "retry"
        },
        {
          "matcher": "pathAny",
          "expected": ["FAILED","ABORTED"],
          "argument": "Status",
          "state": "failure"
        }
      ],
    },
  },
}


waiter_model = WaiterModel(waiter_config)
custom_waiter = create_waiter_with_client(waiter_name, waiter_model, redshift_data)


In [102]:
for query_str in final_query_list:
    print(f"Query: {query_str}")
    res = redshift_data.execute_statement(Database= db, DbUser=db_user, Sql= query_str, ClusterIdentifier= cluster_id)
    id = res["Id"]
    try:
        custom_waiter.wait(Id=id)
        print("Done waiting to finish Data API.")
    except WaiterError as e:
        print (e)

    desc=redshift_data.describe_statement(Id=id)
    print("Status: " + desc["Status"] + ". Excution time: %d miliseconds" %float(desc["Duration"]/pow(10,6)))
    print("")

Query: drop schema if exists spectrum_manning
Done waiting to finish Data API.
Status: FINISHED. Excution time: 46 miliseconds

Query: create external schema spectrum_manning from data catalog database 'spectrumdb' iam_role 'arn:aws:iam::376337229415:role/myspectrum_role' create external database if not exists
Done waiting to finish Data API.
Status: FINISHED. Excution time: 330 miliseconds

Query: drop table IF EXISTS spectrum_manning.manning
Done waiting to finish Data API.
Status: FINISHED. Excution time: 1030 miliseconds

Query: create external table spectrum_manning.manning  (item_id smallint, timestamp date, target_value float) row format delimited fields terminated by '\t' stored as textfile location 's3://aws-forecast-demo-examples/glue_prep_for_aws_forecast/'
Done waiting to finish Data API.
Status: FINISHED. Excution time: 367 miliseconds

Query: SELECT COUNT(*) FROM spectrum_manning.manning
Done waiting to finish Data API.
Status: FINISHED. Excution time: 1718 miliseconds



In [104]:
output=redshift_data.get_statement_result(Id=id)

print(output['Records'])

[[{'longValue': 2886}]]


In [105]:
response = redshift.pause_cluster(
    ClusterIdentifier='redshift-cluster'
)
