# This notebook is used to load data into RedShift

## Variables

In [None]:
secret_name='bankdemo_redshift_login' 

## Install and import libraries

In [None]:
!pip install -q SQLAlchemy==1.3.13
!pip install psycopg2-binary pyathena
!pip install -U pip
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker
from pyathena import connect
from botocore.exceptions import ClientError
import pandas as pd
import json
import boto3
import sagemaker

In [None]:
# Get region 
session = boto3.session.Session()
region_name = session.region_name

# Get SageMaker session & default S3 bucket
sagemaker_session = sagemaker.Session()
bucket = sagemaker_session.default_bucket()

s3 = boto3.client('s3')
redshift = boto3.client('redshift')
secretsmanager = boto3.client('secretsmanager')

## Get credentials & connection information from Secret Manager

In [None]:
session = boto3.session.Session()
region = session.region_name

client = session.client(
        service_name='secretsmanager',
        region_name=region
    )

try:
    get_secret_value_response = client.get_secret_value(
            SecretId=secret_name
        )
    secret_arn=get_secret_value_response['ARN']

except ClientError as e:
    print("Error retrieving secret. Error: " + e.response['Error']['Message'])
    
else:
    # Depending on whether the secret is a string or binary, one of these fields will be populated.
    if 'SecretString' in get_secret_value_response:
        secret = get_secret_value_response['SecretString']
    else:
        secret = base64.b64decode(get_secret_value_response['SecretBinary'])
            
secret_json = json.loads(secret)
master_user_name = secret_json['username']
master_user_pw = secret_json['password']
redshift_port = secret_json['port']
redshift_cluster_identifier = secret_json['dbClusterIdentifier']
redshift_endpoint_address = secret_json['host']

database_name_redshift = secret_json['database_name_redshift']
database_name_athena = secret_json['database_name_athena']

schema_redshift = secret_json['schema_redshift']
schema_athena = secret_json['schema_athena']

table_name_glue = secret_json['table_name_glue']
table_name_redshift = secret_json['table_name_redshift']

# print(master_user_name)

## Copy data (bank-additional.csv) to S3

In [None]:
s3.upload_file('bank-additional/bank-additional-full.csv', bucket, 'bankdemo/bank-additional.csv')


# Athena

## Create Athena database

In [None]:
# Set S3 staging directory -- this is a temporary directory used for Athena queries
s3_staging_dir = "s3://{0}/athena/staging".format(bucket)
conn = connect(region_name=region_name, s3_staging_dir=s3_staging_dir)
statement = "CREATE DATABASE IF NOT EXISTS {}".format(database_name_athena)
print(statement)

In [None]:
import pandas as pd
pd.read_sql(statement, conn)
statement = "SHOW DATABASES"

df_show = pd.read_sql(statement, conn)
df_show.head(5)

## Load CSV to Athena

The 'default' column name causes an error and I changed the name to 'defaulted' instead.

In [None]:
s3_bankdemo_path = "s3://{}/bankdemo/".format(bucket)
# SQL statement to execute
statement = """CREATE EXTERNAL TABLE IF NOT EXISTS {}.{}(
         age int,
         job string,
         marital string,
         education string,
         defaulted string,
         housing string,
         loan string,
         contact string,
         month string,
         day_of_week string,
         duration int,
         campaign int,
         pdays int,
         previous int,
         poutcome string,
         emp_var_rate float,
         cons_price_idx float,
         cons_conf_idx float,
         euribor3m float,
         nr_employed int,
         y string
) 
ROW FORMAT DELIMITED 
FIELDS TERMINATED BY ',' 
LINES TERMINATED BY '\n' 
LOCATION '{}'
TBLPROPERTIES ('compressionType'='gzip', 'skip.header.line.count'='1')""".format(database_name_athena, table_name_glue, s3_bankdemo_path)

print(statement)

In [None]:
import pandas as pd

pd.read_sql(statement, conn)
statement = "SHOW TABLES in {}".format(database_name_athena)

df_show = pd.read_sql(statement, conn)
df_show.head(5)

## Test getting data from Athena

In [None]:
statement = """SELECT * FROM {}.{}
""".format(
    database_name_athena, table_name_glue
)

print(statement)

In [None]:
df = pd.read_sql(statement, conn)
df.head(5)

# RedShift

## Connect to RedShift

In [None]:
# Ensure that the cluster is available

import time

response = redshift.describe_clusters(ClusterIdentifier=redshift_cluster_identifier)
cluster_status = response['Clusters'][0]['ClusterStatus']
print(cluster_status)

while cluster_status != 'available':
    time.sleep(10)
    response = redshift.describe_clusters(ClusterIdentifier=redshift_cluster_identifier)
    cluster_status = response['Clusters'][0]['ClusterStatus']
    print(cluster_status)

In [None]:
# Ensure the ApplyStatus is in-sync

# redshift_endpoint_address = response['Clusters'][0]['Endpoint']['Address']
iam_role = response['Clusters'][0]['IamRoles'][0]['IamRoleArn']

response['Clusters'][0]['IamRoles']

In [None]:
print('Redshift endpoint: {}'.format(redshift_endpoint_address))
print('IAM Role: {}'.format(iam_role))

In [None]:
engine = create_engine('postgresql://{}:{}@{}:{}/{}'.format(master_user_name, master_user_pw, redshift_endpoint_address, redshift_port, database_name_redshift))
session = sessionmaker()
session.configure(bind=engine)


## Create RedShift schema

In [None]:
statement = """CREATE SCHEMA IF NOT EXISTS {}""".format(schema_redshift)

s = session()
s.execute(statement)
s.commit()

## Register Athena Database bankdemo with Redshift Spectrum to Access the Data Directly in S3 using Glue Data Catalog

With just one command, you can query the S3 data lake from Amazon Redshift without moving any data into our data warehouse. This is the power of Redshift Spectrum. 

Note the `FROM DATA CATALOG` below.  This is pulling the table and schema information from the Glue Data Catalog (ie. Hive Metastore).

In [None]:
statement = """
CREATE EXTERNAL SCHEMA IF NOT EXISTS {} FROM DATA CATALOG 
    DATABASE '{}' 
    IAM_ROLE '{}'
    REGION '{}'
    CREATE EXTERNAL DATABASE IF NOT EXISTS
""".format(schema_athena, database_name_athena, iam_role, region_name)

print(statement)

In [None]:
s = session()
s.execute(statement)
s.commit()

### Run Sample Query on S3 Data through Redshift Spectrum

In [None]:
statement = """
SELECT *
    FROM {}.{}
""".format(schema_athena, table_name_glue)

print(statement)

In [None]:
df = pd.read_sql_query(statement, engine)
df.head(5)

## Create table in RedShift

In [None]:
statement = """
CREATE TABLE IF NOT EXISTS {}.{}( 
     age integer,
     job text,
     marital text,
     education text,
     defaulted text,
     housing text,
     loan text,
     contact text,
     month text,
     day_of_week text,
     duration integer,
     campaign integer,
     pdays integer,
     previous integer,
     poutcome text,
     emp_var_rate decimal,
     cons_price_idx decimal,
     cons_conf_idx decimal,
     euribor3m decimal,
     nr_employed integer,
     y text
     )
""".format(schema_redshift, table_name_redshift)

print(statement)
s = session()
s.execute(statement)
s.commit()

print("Done.")

## Insert data from S3 to RedShift using Athena

In [None]:
statement = """
INSERT INTO {}.{}
    SELECT
        *
    FROM
        {}.{};             

""".format(schema_redshift, table_name_redshift, schema_athena, table_name_glue)
print(statement)
s = session()
s.execute(statement)
s.commit()        
print("Done.")

## Test getting data from RedShift

In [None]:
statement = """
SELECT *
    FROM {}.{}
""".format(schema_redshift, table_name_redshift)

print(statement)

In [None]:
df = pd.read_sql_query(statement, engine)
df.head(5)