# Redshift Connect & Load Data Demo

In [None]:
%load_ext sql

In [None]:
from time import time
import configparser
# import matplotlib.pyplot as plt
# import pandas as pd

# STEP 1: Get the params of the created redshift cluster 
- We need:
    - The redshift cluster <font color='red'>endpoint</font>
    - The <font color='red'>IAM role ARN</font> that give access to Redshift to read from S3

In [None]:
import configparser

config = configparser.ConfigParser()
config.read_file(open('dwh.cfg'))
KEY=config.get('AWS','key')
SECRET= config.get('AWS','secret')

DWH_DB= config.get("DWH","DWH_DB")
DWH_DB_USER= config.get("DWH","DWH_DB_USER")
DWH_DB_PASSWORD= config.get("DWH","DWH_DB_PASSWORD")
DWH_PORT = config.get("DWH","DWH_PORT")

# FILL IN THE REDSHIFT ENPOINT HERE
DWH_ENDPOINT="redshift-cluster-de-assignment.ceoqlgitrobq.us-east-1.redshift.amazonaws.com" 
    
#FILL IN THE IAM ROLE ARN for S3 access
DWH_ROLE_ARN="arn:aws:iam::206790211102:role/myRedshiftRole"

# STEP 2: Connect to the Redshift Cluster

In [None]:
conn_string="postgresql://{}:{}@{}:{}/{}".format(DWH_DB_USER, DWH_DB_PASSWORD, DWH_ENDPOINT, DWH_PORT,DWH_DB)
print(conn_string)
%sql $conn_string

In [None]:
import boto3

s3 = boto3.resource('s3',
                       region_name="us-east-1",
                       aws_access_key_id=KEY,
                       aws_secret_access_key=SECRET
                     )

sampleDbBucket =  s3.Bucket("my-demo-data-redshift")

for obj in sampleDbBucket.objects.filter():
    print(obj)

# STEP 3: Create Tables

In [None]:

%%sql 
DROP TABLE IF EXISTS "accounts";
CREATE TABLE "accounts" ( 
	batch_id             bigint    ,
	date_id              integer   ,
	date_id_ps           integer   ,
	month_id             integer   ,
	month_id_ps          integer   ,
	handle_batch_id      bigint    ,
	id                   varchar(1024)    ,
	code                 varchar(65535)   ,
	org_id               bigint   , 
	parent_account_id    varchar(65535)   ,
	bill_to              varchar(65535)   ,
	"state"              varchar(65535)   ,
	username             varchar(65535)   ,
	email                varchar(65535)   ,
	email_id             integer    	  ,
	preferred_locale     varchar(65535)   ,
	first_name           varchar(65535)   ,
	last_name            varchar(65535)   ,
	company              varchar(65535)   ,
	country_code         varchar(2)    	  ,
	custom_fields        varchar    ,
	has_live_subscription boolean    ,
	has_active_subscription boolean  ,
	has_future_subscription boolean  ,
	has_canceled_subscription boolean,
	has_paused_subscription boolean  ,
	has_past_due_invoice boolean    ,
		created_at           timestamp   ,
	updated_at           timestamp   ,
	deleted_at           timestamp   ,
	created_at_ps        timestamp   ,
	updated_at_ps        timestamp   ,
	deleted_at_ps        timestamp   ,
	sales_channel        integer    ,
	sales_rep            varchar(1024)  ,
	renewed_by           integer    ,
	original_orgid       bigint    , 
	sale_credit_sharing  decimal(18,4)    ,
	cs_credit_sharing    decimal(18,4)    ,
	partner_credit_sharing decimal(18,4)  ,
	ecom_credit_sharing  decimal(18,4)    ,
	credit_sharing       varchar   
 )   DISTSTYLE AUTO;

In [None]:
%sql select * from accounts limit 10

# STEP 4: Load Data into the cluster

In [None]:
%%time
qry = """
    copy accounts from 's3://my-demo-data-redshift/recurly_accounts.csv'
    credentials 'aws_iam_role={}'
    csv 
    IGNOREHEADER 1;
""".format(DWH_ROLE_ARN)

%sql $qry

In [None]:
%%sql
SELECT errors.tbl, *
FROM stl_load_errors errors

In [None]:
create or replace view md_revenue.vi_fact_subscription_components as
with rs as 
(
	select 
	t.month_id ,
	t.bo_month_datetime ,
	t.org_id,
	t.subscription_id,
	t.sales_channel_id,
	t.sales_rep,
	t.renewed_by_id,
	t.interval_length,
	t.plan_id,
	sum(coalesce (t.arr_starting_amount,0.0)) as  arr_starting_amount,
	0.0 as  arr_new_amount,
	0.0 as  arr_reactivation_amount,
	0.0 as  arr_expansion_amount,
	0.0 as  arr_contraction_amount,
	0.0 as  arr_churn_amount,
	sum(coalesce (t.arr_ending_amount,0.0))  as  arr_ending_amount
	from md_revenue.vi_list_arr_starting_ending_by_subscription t
	group by 1,2,3,4,5,6,7,8,9
	union all 
	select 
	t.month_id ,
	t.bo_month_datetime ,
	t.org_id,
	t.subscription_id,
	t.sales_channel_id,
	t.sales_rep,
	t.renewed_by_id,
	t.interval_length,
	t.plan_id,
	0.0 as  arr_starting_amount,
	sum(coalesce (t.arr_new_amount,0.0)) as  arr_new_amount,
	0.0 as  arr_reactivation_amount,
	0.0 as  arr_expansion_amount,
	0.0 as  arr_contraction_amount,
	0.0 as  arr_churn_amount,
	0.0 as  arr_ending_amount
	from md_revenue.vi_list_arr_new_by_subscription t
	group by 1,2,3,4,5,6,7,8,9
	union all 
	select 
	t.month_id ,
	t.bo_month_datetime ,
	t.org_id,
	t.subscription_id,
	t.sales_channel_id,
	t.sales_rep,
	t.renewed_by_id,
	t.interval_length,
	t.plan_id,
	0.0 as  arr_starting_amount,
	0.0 as  arr_new_amount,
	sum(coalesce (t.arr_reactivation_amount,0.0)) as  arr_reactivation_amount,
	0.0 as  arr_expansion_amount,
	0.0 as  arr_contraction_amount,
	0.0 as  arr_churn_amount,
	0.0 as  arr_ending_amount
	from md_revenue.vi_list_arr_reactivation_by_subscription t
	group by 1,2,3,4,5,6,7,8,9
	union all 
	select 
	t.month_id ,
	t.bo_month_datetime ,
	t.org_id,
	t.subscription_id,
	t.sales_channel_id,
	t.sales_rep,
	t.renewed_by_id,
	t.interval_length,
	t.plan_id,
	0.0 as  arr_starting_amount,
	0.0 as  arr_new_amount,
	0.0 as  arr_reactivation_amount,
	sum(coalesce (t.arr_expansion_amount,0.0)) as  arr_expansion_amount,
	0.0 as  arr_contraction_amount,
	0.0 as  arr_churn_amount,
	0.0 as  arr_ending_amount
	from md_revenue.vi_list_arr_expansion_by_subscription t
	group by 1,2,3,4,5,6,7,8,9
	union all 
	select 
	t.month_id ,
	t.bo_month_datetime ,
	t.org_id,
	t.subscription_id,
	t.sales_channel_id,
	t.sales_rep,
	t.renewed_by_id,
	t.interval_length,
	t.plan_id,
	0.0 as  arr_starting_amount,
	0.0 as  arr_new_amount,
	0.0 as  arr_reactivation_amount,
	0.0 as  arr_expansion_amount,
	sum(coalesce (t.arr_contraction_amount,0.0)) as  arr_contraction_amount,
	0.0 as  arr_churn_amount,
	0.0 as  arr_ending_amount
	from md_revenue.vi_list_arr_contraction_by_subscription t
	group by 1,2,3,4,5,6,7,8,9
	union all 
	select 
	t.month_id ,
	t.bo_month_datetime ,
	t.org_id,
	t.subscription_id,
	t.sales_channel_id,
	t.sales_rep,
	t.renewed_by_id,
	t.interval_length,
	t.plan_id,
	0.0 as  arr_starting_amount,
	0.0 as  arr_new_amount,
	0.0 as  arr_reactivation_amount,
	0.0 as  arr_expansion_amount,
	0.0 as  arr_contraction_amount,
	sum(coalesce (t.arr_churn_amount,0.0)) as  arr_churn_amount,
	0.0 as  arr_ending_amount
	from md_revenue.vi_list_arr_churn_by_subscription t
	group by 1,2,3,4,5,6,7,8,9
)
select 
	t.month_id ,
	t.org_id,
	t.sales_channel_id,
	t.interval_length,
	t.plan_id ,
	sum(coalesce (t.arr_starting_amount,0.0)) as  arr_starting_amount,
	sum(coalesce (t.arr_new_amount ,0.0)) as  arr_new_amount,
	sum(coalesce (t.arr_reactivation_amount ,0.0)) as  arr_reactivation_amount,
	sum(coalesce (t.arr_expansion_amount  ,0.0)) as  arr_expansion_amount,
	sum(coalesce (t.arr_contraction_amount  ,0.0)) as  arr_contraction_amount,
	sum(coalesce (t.arr_churn_amount  ,0.0)) as  arr_churn_amount,
	sum(coalesce (t.arr_ending_amount,0.0))  as  arr_ending_amount
from rs as t
group by 1,2,3,4,5
with no schema binding;