In [None]:
import boto3

In [None]:
import pandas as pd
import psycopg2
import json

In [None]:
import configparser
config=configparser.ConfigParser()
config.read_file(open('cluster.config'))

## 'AWS' is one portion of file , 'DWH' the other portion of file

In [None]:
KEY = config.get('AWS','KEY')
SECRET = config.get('AWS','SECRET')


DWH_CLUSTER_TYPE   =  config.get("DWH","DWH_CLUSTER_TYPE")

DWH_NUM_NODES      =  config.get('DWH','DWH_NUM_NODES')

DWH_NODE_TYPE=config.get('DWH','DWH_NODE_TYPE')

DWH_CLUSTER_IDENTIFIER=config.get('DWH','DWH_CLUSTER_IDENTIFIER')

DWH_DB=config.get('DWH','DWH_DB')

DWH_DB_USER=config.get('DWH','DWH_DB_USER')

DWH_DB_PASSWORD=config.get('DWH','DWH_DB_PASSWORD')

DWH_PORT=config.get('DWH','DWH_PORT')

DWH_IAM_ROLE_NAME=config.get('DWH','DWH_IAM_ROLE_NAME')

(DWH_DB_USER, DWH_DB_PASSWORD, DWH_DB)

# CREATING OBJECTS

In [None]:
ec2 = boto3.resource('ec2',
                   region_name='us-east-1',
                   aws_access_key_id=KEY,
                   aws_secret_access_key=SECRET)


In [None]:
iam =boto3.client('iam',
                   region_name='us-east-1',
                   aws_access_key_id=KEY,
                   aws_secret_access_key=SECRET)



s3 =boto3.resource('s3',
                   region_name='us-east-1',
                   aws_access_key_id=KEY,
                   aws_secret_access_key=SECRET)



In [None]:
redshift =boto3.client('redshift',
                   region_name='us-east-1',
                   aws_access_key_id=KEY,
                   aws_secret_access_key=SECRET)

see what objects are inside the bucket

In [None]:
bucket=s3.Bucket("james-de-dev-bucket")
log_data_files = [filename.key for filename in bucket.objects.filter(Prefix='')]
log_data_files

## access to s3 with the iam role

In [None]:
roleArn = iam.get_role(RoleName = DWH_IAM_ROLE_NAME)['Role']['Arn']
roleArn

## create cluster

In [None]:
try:
    response=redshift.create_cluster(
    ClusterType=DWH_CLUSTER_TYPE,
    NodeType=DWH_NODE_TYPE,
    
#Credentials

    DBName=DWH_DB,
    ClusterIdentifier=DWH_CLUSTER_IDENTIFIER,
    MasterUsername=DWH_DB_USER,
    MasterUserPassword=DWH_DB_PASSWORD,

#roles for s3 access
    IamRoles=[roleArn]
    )
    
except Exception as e:
    print(e)

In [None]:
redshift.describe_clusters(ClusterIdentifier=DWH_CLUSTER_IDENTIFIER)['Clusters'][0]

## Grab all the redshift info as a df

In [None]:
def prettyRedshiftProps(props):
    pd.set_option('display.max_colwidth',-1)
    keysToShow = ['ClusterIdentifier','NodeType','ClusterStatus','MasterUsername'
                 'DBName','Endpoint','VpcId']
    x= [(k,v) for k,v in props.items() if k in keysToShow] 
        
    return pd.DataFrame(data=x, columns=["Key","Value"])
    
myClusterProps = redshift.describe_clusters(ClusterIdentifier=DWH_CLUSTER_IDENTIFIER)['Clusters'][0]
prettyRedshiftProps(myClusterProps)

In [None]:
myClusterProps['ClusterStatus']

## attach your Cluster to VPC, only our IP

In [None]:
try:
    vpc = ec2.Vpc(id=myClusterProps['VpcId'])
    defaultSg=list(vpc.security_groups.all())[0]
    print(defaultSg)
    
    defaultSg.authorize_ingress(
    GroupName=defaultSg.group_name,
    CidrIp='0.0.0.0/0',
    IpProtocol='TCP',
    FromPort=int(DWH_PORT),
    ToPort=int(DWH_PORT)
    )
except Exception as e:
    print(e)

## Grab all columns from myClusterProps into variable

In [None]:
DWH_ENDPOINT=myClusterProps['Endpoint']['Address']

DWH_ROLE_ARN=myClusterProps['IamRoles'][0]['IamRoleArn'] #why we need 0?

DB_NAME=myClusterProps['DBName']
DB_USER=myClusterProps['MasterUsername']

In [None]:
DWH_ROLE_ARN

## connect redshift cluster with psycopg2

In [None]:
try:
    conn=psycopg2.connect(host=DWH_ENDPOINT, dbname=DB_NAME, user=DB_USER
                         ,password="Gandolfi1614?", port=5439)
    
except psycopg2.Error as e:
    print("error: could not make connection")
    print(e)
    
conn.set_session(autocommit=True) #commit all transactions

## create cursor, execute query on db

In [None]:
#create cursor, to execute queries on DB

try:
    cur =conn.cursor()
except psycopg2.Error as e:
    print("error:could not get cursor to the DB")
    print(e)

## creating the data structure, DDLs from S3 to redshift.


## data needs to be observerd, analyzed to derive columns, PKs,Fks, data types... and later create my data model for future users

In [None]:
try:
    cur.execute(""" create table users(
    userid integer not null distkey sortkey,
    username char(8),
    firstname varchar(30),
    lastname varchar(30),
    city varchar(30),
    state char(2),
    email varchar(100),
    phone char(14),
    likesports boolean,
    liketheathre boolean,
    likeconcerts boolean,
    likejazz boolean,
    likeclassical boolean,
    likeopera boolean,
    likerock boolean,
    likevegas boolean,
    likebroadway boolean,
    likemusicals boolean);""")
except psycopg2.Error as e:
    print('Error:Issue creating table')
    print(e)

In [None]:
try:
    cur.execute("""create table venue(
    venueid smallint not null distkey sortkey,
    venuename varchar(100),
    venuecity varchar(30),
    venuestate char(2),
    venueseats integer);""")
except psycopg2.Error as e:
    print('Error')
    print(e)

In [None]:
try:
    cur.execute("""create table category(
    catid smallint not null distkey sortkey,
    catgroup varchar(10),
    catname varchar(10),
    catdesc varchar(50));
    
create table date(
    dateid smallint not null distkey sortkey,
    caldate date not null,
    day character(3) not null,
    week smallint not null,
    month character(50) not null,
    qtr character (5) not null,
    year smallint not null,
    holiday boolean default ('N'));

create table event(
    eventid smallint not null distkey,
    venueid smallint not null,
    catid smallint not null,
    dateid smallint not null sortkey,
    eventname varchar(200),
    startime timestamp);
    
    
create table listing(
    listid integer not null distkey,
    sellerid integer not null,
    eventid integer not null,
    dateid smallint not null,
    numtickets smallint not null,
    priceperticket decimal (8,2),
    totalprice decimal(8,2),
    listime timestamp);
    
    """)
    
except psycopg2.Error as e:
    print('Error')
    print(e)

## start copying data
## we need to tell redshift we have access by copying the ARN from your 'redshift-access-s3' role

## started with all users table, the others table should be copied too


In [None]:
try:
    cur.execute("""
    copy users from 's3://james-de-dev-bucket/allusers_pipe.txt'
    credentials 'aws_iam_role=arn:aws:iam::316620696731:role/redshift-s3-access'
    delimiter '|'
    region 'us-east-1'
    
    """)
    
except psycopg2.Error as e:
    print('Error')
    print(e)

In [None]:
try:
    cur.execute("""
    copy users from 's3://james-de-dev-bucket/allusers_pipe.txt'
    credentials 'aws_iam_role=arn:aws:iam::316620696731:role/redshift-s3-access'
    delimiter '|'
    region 'us-east-1'
    
    """)
    
except psycopg2.Error as e:
    print('Error')
    print(e)

In [None]:
try:
    conn.close()
except psycopg2.Error as e:
    print(e)

In [None]:
redshift.delete_cluster(ClusterIdentifier=DWH_CLUSTER_IDENTIFIER, SkipFinalClusterSnapshot=True)