In [None]:
import boto3
import pandas as pd
import psycopg2
import json

In [None]:
import configparser
config = configparser.ConfigParser()
config.read_file(open('cluster.config'))

In [None]:
KEY = config.get('AWS','KEY')
SECRET = config.get('AWS','SECRET')

DWH_CLUSTER_TYPE = config.get('DWH','DWH_CLUSTER_TYPE')
DWH_NUM_NODES = config.get('DWH','DWH_NUM_NODES')
DWH_NODE_TYPE = config.get('DWH','DWH_NODE_TYPE')
DWH_CLUSTER_IDENTIFIER = config.get('DWH','DWH_CLUSTER_IDENTIFIER')
DWH_DB = config.get('DWH','DWH_DB')
DWH_DB_USER = config.get('DWH','DWH_DB_USER')
DWH_DB_PASSWORD = config.get('DWH','DWH_DB_PASSWORD')
DWH_DB_PORT = config.get('DWH','DWH_DB_PORT')
DWH_IAM_ROLE_NAME = config.get('DWH','DWH_IAM_ROLE_NAME')

(DWH_DB, DWH_DB_USER, DWH_DB_PASSWORD)

In [None]:
pd.DataFrame({"Param": ["DWH_CLUSTER_TYPE","DWH_NUM_NODES","DWH_NODE_TYPE","DWH_CLUSTER_IDENTIFIER","DWH_DB","DWH_DB_USER","DWH_DB_PASSWORD","DWH_DB_PORT","DWH_IAM_ROLE_NAME"],
             "Value": [DWH_CLUSTER_TYPE,DWH_NUM_NODES,DWH_NODE_TYPE,DWH_CLUSTER_IDENTIFIER,DWH_DB,DWH_DB_USER,DWH_DB_PASSWORD,DWH_DB_PORT,DWH_IAM_ROLE_NAME]})

In [None]:
ec2 = boto3.resource('ec2',
                 region_name="ap-south-1",
                 aws_access_key_id=KEY,
                 aws_secret_access_key=SECRET)
s3 = boto3.resource('s3',
                 region_name="ap-south-1",
                 aws_access_key_id=KEY,
                 aws_secret_access_key=SECRET)
iam = boto3.client('iam',
                 region_name="ap-south-1",
                 aws_access_key_id=KEY,
                 aws_secret_access_key=SECRET)
redshift = boto3.client('redshift',
                 region_name="ap-south-1",
                 aws_access_key_id=KEY,
                 aws_secret_access_key=SECRET)

In [None]:
bucket=s3.Bucket("dexter-redshift")
log_data_files = [filename.key for filename in bucket.objects.filter(Prefix='')]
log_data_files

In [None]:
roleArn = iam.get_role(RoleName=DWH_IAM_ROLE_NAME)['Role']['Arn']

In [None]:
try:
    response = redshift.create_cluster(
    ClusterType=DWH_CLUSTER_TYPE,
    NodeType=DWH_NODE_TYPE,
    
    DBName=DWH_DB,
    ClusterIdentifier=DWH_CLUSTER_IDENTIFIER,
    MasterUsername=DWH_DB_USER,
    MasterUserPassword=DWH_DB_PASSWORD,
        
    IamRoles=[roleArn]
    
    )
except Exception as e:
    print(e)

In [None]:
redshift.describe_clusters(ClusterIdentifier=DWH_CLUSTER_IDENTIFIER)

In [None]:
def redshiftProps(props):
    pd.set_option('display.max_colwidth', -1)
    keys=["ClusterIdentifier","NodeType","ClusterStatus","MasterUsername","DBName","Endpoint","VpcId"]
    x = [(k,v) for k,v in props.items() if k in keys]
    return pd.DataFrame(data=x, columns=["Key","Value"])

clusterProp = redshift.describe_clusters(ClusterIdentifier=DWH_CLUSTER_IDENTIFIER)['Clusters'][0]
redshiftProps(clusterProp)

In [None]:
DWH_ENDPOINT = clusterProp['Endpoint']['Address']
DWH_ROLE_ARN = clusterProp['IamRoles'][0]['IamRoleArn']
DBNAME = clusterProp['DBName']
DB_USER = clusterProp['MasterUsername']

In [None]:
try:
    vpc = ec2.Vpc(id=clusterProp['VpcId'])
    defaultSG = list(vpc.security_groups.all())[0]
    print(defaultSG)
    
    defaultSG.authorize_ingress(
    GroupName=defaultSG.group_name,
    CidrIp='0.0.0.0/0',
    IpProtocol='TCP',
    FromPort=int(DWH_DB_PORT),
    ToPort=int(DWH_DB_PORT))
except Exception as e:
    print(e)

In [None]:
try:
    conn = psycopg2.connect(host=DWH_ENDPOINT, dbname=DBNAME, user=DB_USER, password=DWH_DB_PASSWORD, port=DWH_DB_PORT)
    conn.set_session(autocommit=True)
    cur = conn.cursor()
except psycopg2.Error as e:
    print(e)

In [None]:
try:
    cur.execute("""create table users(
    userid integer not null distkey sortkey,
    username char(8),
    firstname varchar(30),
    lastname varchar(30),
    city varchar(30),
    state char(2),
    email varchar(100),
    phone char(14),
    likesports boolean,
    liketheatre boolean,
    likeconcerts boolean,
    likejazz boolean,
    likeclassical boolean,
    likeopera boolean,
    likerock boolean,
    likevegas boolean,
    likebroadway boolean,
    likemusicals boolean
    );
    """)
except psycopg2.Error as e:
    print(e)

In [None]:
try:
    cur.execute("""create table venue(
    venueid smallint not null distkey sortkey,
    venuename varchar(100),
    venuecity varchar(30),
    veneustate char(2),
    venueseats integer
    );
    """)
except psycopg2.Error as e:
    print(e)

In [None]:
try:
    cur.execute("""create table category(
    catid smallint not null distkey sortkey,
    catgroup varchar(10),
    catname varchar(10),
    catdesc varchar(20)
    );
    
    create table date(
    dateid smallint not null distkey sortkey,
    caldate date not null,
    day char(3) not null,
    week smallint not null,
    month char(5) not null,
    qtr char(5) not null,
    year smallint not null,
    holiday boolean default('N')
    );
    """)
except psycopg2.Error as e:
    print(e)

In [None]:
try:
    cur.execute("""
    copy users from 's3://dexter-redshift/allusers_pipe.txt'
    credentials 'aws_iam_role=arn:aws:iam::136178993549:role/redshift-s3'
    delimiter '|'
    region 'ap-south-1'
    """)
except psycopg2.Error as e:
    print(e)

In [None]:
try:
    cur.execute("""
    copy venue from 's3://dexter-redshift/venue_pipe.txt'
    credentials 'aws_iam_role=arn:aws:iam::136178993549:role/redshift-s3'
    delimiter '|'
    region 'ap-south-1'
    """)
except psycopg2.Error as e:
    print(e)

In [None]:
try:
    cur.execute("""
    copy category from 's3://dexter-redshift/category_pipe.txt'
    credentials 'aws_iam_role=arn:aws:iam::136178993549:role/redshift-s3'
    delimiter '|'
    region 'ap-south-1'
    """)
except psycopg2.Error as e:
    print(e)

In [None]:
try:
    cur.execute("""
    copy date from 's3://dexter-redshift/date2008_pipe.txt'
    credentials 'aws_iam_role=arn:aws:iam::136178993549:role/redshift-s3'
    delimiter '|'
    region 'ap-south-1'
    """)
except psycopg2.Error as e:
    print(e)

In [None]:
try:
    cur.execute("""
    select * from users;
    """)
except psycopg2.Error as e:
    print(e)

In [None]:
row = cur.fetchone()
print(row)

In [None]:
try:
    conn.close()
except psycopg2.Error as e:
    print(e)

In [None]:
redshift.delete_cluster(ClusterIdentifier=DWH_CLUSTER_IDENTIFIER, SkipFinalClusterSnapshot=True)