In [5]:
import boto3
import pandas as pd
import psycopg2
import json

In [4]:
#%pip install boto3
%pip install psycopg2

Collecting psycopg2
  Downloading psycopg2-2.9.5-cp38-cp38-win_amd64.whl (1.2 MB)Note: you may need to restart the kernel to use updated packages.
Installing collected packages: psycopg2
Successfully installed psycopg2-2.9.5



In [10]:
# Load DWH Params from config file

import configparser
config = configparser.ConfigParser()
config.read_file(open('cluster.config'))

config.get('DWH','DWH_CLUSTER_IDENTIFIER')

'redshift-cluster-1'

In [9]:
#Assign config values to constants

KEY                    = config.get('AWS','KEY')
SECRET                 = config.get('AWS','SECRET')

DWH_CLUSTER_TYPE       = config.get("DWH","DWH_CLUSTER_TYPE")
DWH_NUM_NODES          = config.get("DWH","DWH_NUM_NODES")
DWH_NODE_TYPE          = config.get("DWH","DWH_NODE_TYPE")

DWH_CLUSTER_IDENTIFIER = config.get("DWH","DWH_CLUSTER_IDENTIFIER")
DWH_DB                 = config.get("DWH","DWH_DB")
DWH_DB_USER            = config.get("DWH","DWH_DB_USER")
DWH_DB_PASSWORD        = config.get("DWH","DWH_DB_PASSWORD")
DWH_PORT               = config.get("DWH","DWH_PORT")

DWH_IAM_ROLE_NAME      = config.get("DWH", "DWH_IAM_ROLE_NAME")

(DWH_DB_USER, DWH_DB_PASSWORD, DWH_DB)

('awsuser', 'Passw0rd123', 'testdb')

In [11]:
#Create dataframe
pd.DataFrame({"Param":
                  ["DWH_CLUSTER_TYPE", "DWH_NUM_NODES", "DWH_NODE_TYPE", "DWH_CLUSTER_IDENTIFIER", "DWH_DB", "DWH_DB_USER", "DWH_DB_PASSWORD", "DWH_PORT", "DWH_IAM_ROLE_NAME"],
              "Value":
                  [DWH_CLUSTER_TYPE, DWH_NUM_NODES, DWH_NODE_TYPE, DWH_CLUSTER_IDENTIFIER, DWH_DB, DWH_DB_USER, DWH_DB_PASSWORD, DWH_PORT, DWH_IAM_ROLE_NAME]
             })

Unnamed: 0,Param,Value
0,DWH_CLUSTER_TYPE,single-node
1,DWH_NUM_NODES,1
2,DWH_NODE_TYPE,dc2.large
3,DWH_CLUSTER_IDENTIFIER,redshift-cluster1
4,DWH_DB,testdb
5,DWH_DB_USER,awsuser
6,DWH_DB_PASSWORD,Passw0rd123
7,DWH_PORT,5439
8,DWH_IAM_ROLE_NAME,redshift-s3-access


In [12]:
#connect to AWS instances

ec2 = boto3.resource('ec2',
                       region_name="ap-northeast-1",
                       aws_access_key_id=KEY,
                       aws_secret_access_key=SECRET
                    )

s3 = boto3.resource('s3',
                       region_name="ap-northeast-1",
                       aws_access_key_id=KEY,
                       aws_secret_access_key=SECRET
                   )

iam = boto3.client('iam',aws_access_key_id=KEY,
                     aws_secret_access_key=SECRET,
                     region_name='ap-northeast-1'
                  )

redshift = boto3.client('redshift',
                       region_name="ap-northeast-1",
                       aws_access_key_id=KEY,
                       aws_secret_access_key=SECRET
                       )

In [13]:
#create object for s3 bucket

bucket = s3.Bucket("la-dev-test-bucket")

data_files = [filename.key for filename in bucket.objects.filter(Prefix=b)]

data_files

['allevents_pipe.txt',
 'allusers_pipe.txt',
 'category_pipe.txt',
 'date2008_pipe.txt',
 'listings_pipe.txt',
 'new_user.csv',
 'sales_tab.txt',
 'venue_pipe.txt']

In [14]:
#give redshift explicit access to the s3 bucket

role_arn = iam.get_role(RoleName=DWH_IAM_ROLE_NAME)['Role']['Arn']

role_arn

'arn:aws:iam::012925758763:role/redshift-s3-access'

In [15]:
#create redshift cluster

try:
    response = redshift.create_cluster(
        
        ClusterType=DWH_CLUSTER_TYPE,
        NodeType=DWH_NODE_TYPE,
        NumberOfNodes=int(DWH_NUM_NODES),

        #Identifiers & Credentials
        DBName=DWH_DB,
        ClusterIdentifier=DWH_CLUSTER_IDENTIFIER,
        MasterUsername=DWH_DB_USER,
        MasterUserPassword=DWH_DB_PASSWORD,
        
        #Roles (for s3 access)
        IamRoles=[role_arn]  
    )
except Exception as e:
    print(e)

In [24]:
#get cluster metadata

def pretty_cluster_info(json):
    x = [(k,v) for k,v in json.items()]
    return pd.DataFrame(data=x, columns=["Key", "Value"])

cluster_info = redshift.describe_clusters(ClusterIdentifier=DWH_CLUSTER_IDENTIFIER)['Clusters'][0]
pretty_cluster_info(cluster_info)

Unnamed: 0,Key,Value
0,ClusterIdentifier,redshift-cluster1
1,NodeType,dc2.large
2,ClusterStatus,available
3,ClusterAvailabilityStatus,Available
4,MasterUsername,awsuser
5,DBName,testdb
6,Endpoint,{'Address': 'redshift-cluster1.cwb12s4gufsc.ap...
7,ClusterCreateTime,2022-11-18 11:06:09.407000+00:00
8,AutomatedSnapshotRetentionPeriod,1
9,ManualSnapshotRetentionPeriod,-1


In [27]:
#attach vpc security group to the redshift cluster through ec2

DWH_ENDPOINT = cluster_info['Endpoint']['Address']
DWH_ROLE_ARN = cluster_info['IamRoles'][0]['IamRoleArn']
DB_NAME = cluster_info['DBName']
DB_USER = cluster_info['MasterUsername']

#opening TCP port for connecting to the end point of the cluster
try:
    vpc = ec2.Vpc(id=cluster_info['VpcId'])
    defaultSg = list(vpc.security_groups.all())[0]
    print(defaultSg)
    print(defaultSg.group_name)

    defaultSg.authorize_ingress(
        GroupName = defaultSg.group_name,  
        CidrIp ='0.0.0.0/0', 
        IpProtocol ='TCP', 
        FromPort = int(DWH_PORT),
        ToPort = int(DWH_PORT)
        )
except Exception as e:
    print(e)

ec2.SecurityGroup(id='sg-0f2f36a080130bb80')
default


In [39]:
#connect to redshift database

try:
    conn = psycopg2.connect(dbname=DB_NAME, host=DWH_ENDPOINT, port=DWH_PORT, user=DB_USER, password=DWH_DB_PASSWORD)
except Exception as e:
    print(e)

conn.set_session(autocommit=True)

#create cursor
try:
    cur = conn.cursor()
except Exception as e:
    print(e)

In [36]:
#create tables in the database

try:
    cur.execute(
    """
    create table users(
	userid integer not null distkey sortkey,
	username char(8),
	firstname varchar(30),
	lastname varchar(30),
	city varchar(30),
	state char(2),
	email varchar(100),
	phone char(14),
	likesports boolean,
	liketheatre boolean,
	likeconcerts boolean,
	likejazz boolean,
	likeclassical boolean,
	likeopera boolean,
	likerock boolean,
	likevegas boolean,
	likebroadway boolean,
	likemusicals boolean);
    """
    )
except Exception as e:
    print("Database table creation failed.")
    print(e)

Database table creation failed.
Relation "users" already exists



In [37]:
#create tables in the database

try:
    cur.execute(
    """
create table venue(
	venueid smallint not null distkey sortkey,
	venuename varchar(100),
	venuecity varchar(30),
	venuestate char(2),
	venueseats integer);
create table category(
	catid smallint not null distkey sortkey,
	catgroup varchar(10),
	catname varchar(10),
	catdesc varchar(50));
create table date(
	dateid smallint not null distkey sortkey,
	caldate date not null,
	day character(3) not null,
	week smallint not null,
	month character(5) not null,
	qtr character(5) not null,
	year smallint not null,
	holiday boolean default('N'));
create table event(
	eventid integer not null distkey,
	venueid smallint not null,
	catid smallint not null,
	dateid smallint not null sortkey,
	eventname varchar(200),
	starttime timestamp);
create table listing(
	listid integer not null distkey,
	sellerid integer not null,
	eventid integer not null,
	dateid smallint not null  sortkey,
	numtickets smallint not null,
	priceperticket decimal(8,2),
	totalprice decimal(8,2),
	listtime timestamp);
create table sales(
	salesid integer not null,
	listid integer not null distkey,
	sellerid integer not null,
	buyerid integer not null,
	eventid integer not null,
	dateid smallint not null sortkey,
	qtysold smallint not null,
	pricepaid decimal(8,2),
	commission decimal(8,2),
	saletime timestamp);
    """
    )
except Exception as e:
    print("Database table creation failed.")
    print(e)

In [41]:
#load data to the database tables created

try:
    cur.execute(
    """
    copy users from 's3://la-dev-test-bucket/allusers_pipe.txt' 
    credentials 'aws_iam_role=arn:aws:iam::012925758763:role/redshift-s3-access'
    delimiter '|' 
    region 'ap-northeast-1';
    """
    )
except Exception as e:
    print(e)

In [42]:
#query the table

try:
    cur.execute(
    """
    SELECT * FROM users
    """
    )
except Exception as e:
    print(e)
    
row = cur.fetchone()
print(row)

(1, 'JSG99FHE', 'Rafael', 'Taylor', 'Kent', 'WA', 'Etiam.laoreet.libero@sodalesMaurisblandit.edu', '(664) 602-4412', True, True, None, False, True, None, None, True, False, True)


In [43]:
#delete the cluster

redshift.delete_cluster(ClusterIdentifier=DWH_CLUSTER_IDENTIFIER, SkipFinalClusterSnapshot=True)

{'Cluster': {'ClusterIdentifier': 'redshift-cluster1',
  'NodeType': 'dc2.large',
  'ClusterStatus': 'deleting',
  'ClusterAvailabilityStatus': 'Modifying',
  'MasterUsername': 'awsuser',
  'DBName': 'testdb',
  'Endpoint': {'Address': 'redshift-cluster1.cwb12s4gufsc.ap-northeast-1.redshift.amazonaws.com',
   'Port': 5439},
  'ClusterCreateTime': datetime.datetime(2022, 11, 18, 11, 6, 9, 407000, tzinfo=tzutc()),
  'AutomatedSnapshotRetentionPeriod': 1,
  'ManualSnapshotRetentionPeriod': -1,
  'ClusterSecurityGroups': [],
  'VpcSecurityGroups': [{'VpcSecurityGroupId': 'sg-0f2f36a080130bb80',
    'Status': 'active'}],
  'ClusterParameterGroups': [{'ParameterGroupName': 'default.redshift-1.0',
    'ParameterApplyStatus': 'in-sync'}],
  'ClusterSubnetGroupName': 'default',
  'VpcId': 'vpc-0a63de8a7ba8840f1',
  'AvailabilityZone': 'ap-northeast-1c',
  'PreferredMaintenanceWindow': 'fri:14:30-fri:15:00',
  'PendingModifiedValues': {},
  'ClusterVersion': '1.0',
  'AllowVersionUpgrade': True,