In [32]:
import boto3 as b3
import pandas as pd
import psycopg2
import json
import configparser
config = configparser.ConfigParser()
config.read_file(open('cluster.config'))

In [33]:
# pulling params from the config file into variables

KEY = config.get("AWS", "KEY")
SECRET = config.get("AWS", "SECRET")

DWH_CLUSTER_TYPE = config.get("DWH", "DWH_CLUSTER_TYPE")
DWH_NUM_NODES = config.get("DWH", "DWH_NUM_NODES")
DWH_NODE_TYPE = config.get("DWH", "DWH_NODE_TYPE")
DWH_CLUSTER_IDENTIFIER = config.get("DWH", "DWH_CLUSTER_IDENTIFIER")
DWH_DB = config.get("DWH", "DWH_DB")
DWH_DB_USER = config.get("DWH", "DWH_DB_USER")
DWH_DB_PASSWORD = config.get("DWH", "DWH_DB_PASSWORD")
DWH_PORT = config.get("DWH", "DWH_PORT")
DWH_IAM_ROLE_NAME = config.get("DWH", "DWH_IAM_ROLE_NAME")

In [34]:
# configuring the params/variables into a dataframe

pd.DataFrame(
    {
        "Param":
            ["DWH_CLUSTER_TYPE", "DWH_NUM_NODES", "DWH_NODE_TYPE", "DWH_CLUSTER_IDENTIFIER", "DWH_DB", "DWH_DB_USER", "DWH_DB_PASSWORD", "DWH_PORT", "DWH_IAM_ROLE_NAME"],
        "Value":
            [DWH_CLUSTER_TYPE, DWH_NUM_NODES, DWH_NODE_TYPE, DWH_CLUSTER_IDENTIFIER, DWH_DB, DWH_DB_USER, DWH_DB_PASSWORD, DWH_PORT, DWH_IAM_ROLE_NAME]
    }
)

Unnamed: 0,Param,Value
0,DWH_CLUSTER_TYPE,single-node
1,DWH_NUM_NODES,1
2,DWH_NODE_TYPE,dc2.large
3,DWH_CLUSTER_IDENTIFIER,joes-first-redshift
4,DWH_DB,joes-first-db
5,DWH_DB_USER,awsuser
6,DWH_DB_PASSWORD,CoolBeans!1234
7,DWH_PORT,5439
8,DWH_IAM_ROLE_NAME,redshift-s3-access


In [35]:
# creates the ec2 object

ec2 = b3.resource(
    'ec2',
    region_name="us-east-1",
    aws_access_key_id=KEY,
    aws_secret_access_key=SECRET
)

In [36]:
# creates the rest of the aws objects

s3 = b3.resource(
    's3',
    region_name="us-east-1",
    aws_access_key_id=KEY,
    aws_secret_access_key=SECRET
)

# notice that these use "client" instead of "resource"

iam = b3.client(
    'iam',
    region_name="us-east-1",
    aws_access_key_id=KEY,
    aws_secret_access_key=SECRET
)

redshift = b3.client(
    'redshift',
    region_name="us-east-1",
    aws_access_key_id=KEY,
    aws_secret_access_key=SECRET
)

In [37]:
# connecting to s3 bucket object

bucket = s3.Bucket("redshift4joe")

# checking to make sure that we can see the filenames in the bucket

log_data_files = [filename.key for filename in bucket.objects.filter(Prefix='')]
#log_data_files = [filename.key for filename in bucket.objects.filter(Prefix='all')]
log_data_files 

['allevents_pipe.txt',
 'allusers_pipe.txt',
 'category_pipe.txt',
 'date2008_pipe.txt',
 'listings_pipe.txt',
 'sales_tab.txt',
 'venue_pipe.txt']

In [38]:
# giving permissions to the bucket via the IAM role

roleArn = iam.get_role(RoleName=DWH_IAM_ROLE_NAME)['Role']['Arn']

# notice that it basically pulled the ARN role from the IAM object

In [40]:
# now creating redshift cluster
# boto3 + redshift commands documentation
# https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/redshift.html

try:
    response = redshift.create_cluster(
        # cluster + node params
        ClusterType=DWH_CLUSTER_TYPE,
        NodeType=DWH_NODE_TYPE,

        # identifiers + credentials
        DBName=DWH_DB,
        ClusterIdentifier=DWH_CLUSTER_IDENTIFIER,
        MasterUsername=DWH_DB_USER,
        MasterUserPassword=DWH_DB_PASSWORD,

        # roles for s3 access
        IamRoles=[roleArn]
    )
except Exception as e:
    print(e)

In [41]:
# return info about the newly created redshift cluster
# wait for the cluster to be fully available before running

redshift.describe_clusters(ClusterIdentifier=DWH_CLUSTER_IDENTIFIER)['Clusters'][0] # getting just the cluster info

{'ClusterIdentifier': 'joes-first-redshift',
 'NodeType': 'dc2.large',
 'ClusterStatus': 'available',
 'ClusterAvailabilityStatus': 'Available',
 'MasterUsername': 'awsuser',
 'DBName': 'joes-first-db',
 'Endpoint': {'Address': 'joes-first-redshift.caiyquvcunwp.us-east-1.redshift.amazonaws.com',
  'Port': 5439},
 'ClusterCreateTime': datetime.datetime(2024, 6, 6, 13, 44, 35, 685000, tzinfo=tzutc()),
 'AutomatedSnapshotRetentionPeriod': 1,
 'ManualSnapshotRetentionPeriod': -1,
 'ClusterSecurityGroups': [],
 'VpcSecurityGroups': [{'VpcSecurityGroupId': 'sg-0d7d30122d0774bed',
   'Status': 'active'}],
 'ClusterParameterGroups': [{'ParameterGroupName': 'default.redshift-1.0',
   'ParameterApplyStatus': 'in-sync'}],
 'ClusterSubnetGroupName': 'default',
 'VpcId': 'vpc-0b003b84caf9ee659',
 'AvailabilityZone': 'us-east-1e',
 'PreferredMaintenanceWindow': 'wed:05:00-wed:05:30',
 'PendingModifiedValues': {},
 'ClusterVersion': '1.0',
 'AllowVersionUpgrade': True,
 'NumberOfNodes': 1,
 'Publicly

In [53]:
# creating a format to make the previous code more readable

def prettyRedshiftProps(props):
    pd.set_option('display.max_colwidth', None) # pd.set_option('display.max_colwidth', -1)
    keysToShow = ["ClusterIdentifier", "NodeType", "ClusterStatus", "MasterUsername", "DBName", "Endpoint", "VpcId"]
    x = [(k, v) for k, v in props.items() if k in keysToShow]
    return pd.DataFrame(data=x, columns=["Key","Value"])

# using the format/function

myClusterProps = redshift.describe_clusters(ClusterIdentifier=DWH_CLUSTER_IDENTIFIER)['Clusters'][0] # getting the json value of the "describe_clusters" command
prettyRedshiftProps(myClusterProps)

Unnamed: 0,Key,Value
0,ClusterIdentifier,joes-first-redshift
1,NodeType,dc2.large
2,ClusterStatus,available
3,MasterUsername,awsuser
4,DBName,joes-first-db
5,Endpoint,"{'Address': 'joes-first-redshift.caiyquvcunwp.us-east-1.redshift.amazonaws.com', 'Port': 5439}"
6,VpcId,vpc-0b003b84caf9ee659


In [54]:
# fetching the cluster properties into variables

DWH_ENDPOINT = myClusterProps['Endpoint']['Address']
DWH_ROLE_ARN = myClusterProps['IamRoles'][0]['IamRoleArn']
DB_NAME = myClusterProps['DBName']
DB_USER = myClusterProps['MasterUsername']

In [61]:
# attaching the vpc security group to the ec2 instance
# next part won't run if the security group from this part isn't correct

try:
    vpc = ec2.Vpc(id=myClusterProps['VpcId'])
    defaultSg = list(vpc.security_groups.all())[1] # had to change this from [0] after I recreated the cluster 
    print(defaultSg)

    defaultSg.authorize_ingress(
        GroupName=defaultSg.group_name,
        CidrIp='0.0.0.0/0',
        IpProtocol='TCP',
        FromPort=int(DWH_PORT),
        ToPort=int(DWH_PORT)
    )
except Exception as e:
    print(e)

ec2.SecurityGroup(id='sg-0d7d30122d0774bed')
An error occurred (InvalidPermission.Duplicate) when calling the AuthorizeSecurityGroupIngress operation: the specified rule "peer: 0.0.0.0/0, TCP, from port: 5439, to port: 5439, ALLOW" already exists


In [64]:
####### starting to load data into redshift cluster from s3 data
# using the psycopg2 lib to connect to redshift

try:
    conn = psycopg2.connect(host=DWH_ENDPOINT, dbname=DB_NAME, user=DB_USER, password=DWH_DB_PASSWORD, port=DWH_PORT)
except psycopg2.Error as e:
    print("Error: Couldn't make connection to the DB")
    print(e)

# making connection with the DB
# autocommit is to commit all transactions to the DB

conn.set_session(autocommit=True)

In [65]:
# creating a cursor for the DB connection

try:
    cur = conn.cursor()
except psycopg2.Error as e:
    print("Error: Couldn't make a cursor to the DB")
    print(e)

In [70]:
# creating tables in redshift for the data from s3

try:
    cur.execute("""create table users(
    userid integer not null distkey sortkey,
    username char(8),
    firstname varchar(30),
    lastname varchar(30),
    city varchar(30),
    state char(2),
    email varchar(100),
    phone char(14),
    likesports boolean,
    liketheatre boolean,
    likeconcerts boolean,
    likejazz boolean,
    likeclassical boolean,
    likeopera boolean,
    likerock boolean,
    likevegas boolean,
    likebroadway boolean,
    likemusicals boolean);""")
except psycopg2.Error as e:
    print("Error: Table creation issue")
    print(e)

In [71]:
try:
    cur.execute("""create table venue(
    venueid smallint not null distkey sortkey,
    venuename varchar(100),
    venuecity varchar(30),
    venuestate char(2),
    venueseats integer);""")
except psycopg2.Error as e:
    print("Error: Table creation issue")
    print(e)

In [72]:
try:
    cur.execute("""create table category(
    catid smallint not null distkey sortkey,
    catgroup varchar(10),
    catname varchar(10),
    catdesc varchar(50));

    create table date(
    dateid smallint not null distkey sortkey,
    caldate date not null,
    day character(3) not null,
    week smallint not null,
    month character(5) not null,
    qtr character(5) not null,
    year smallint not null,
    holiday boolean default('N'));
    
    create table event(
    eventid integer not null distkey,
    venueid smallint not null,
    catid smallint not null,
    dateid smallint not null sortkey,
    eventname varchar(200),
    starttime timestamp);

    create table listing(
    listid integer not null distkey,
    sellerid integer not null,
    eventid integer not null,
    dateid smallint not null sortkey,
    numtickets smallint not null,
    priceperticket decimal(8,2),
    totalprice decimal(8,2),
    listtime timestamp);

    """)
except psycopg2.Error as e:
    print("Error: Table creation issue")
    print(e)

In [74]:
# loading data from s3 > redshift using the copy command
# use the files' URIs for the path + use IAM role (credentials)

try:
    cur.execute("""
    copy users from 's3://.../allusers_pipe.txt'
    credentials 'aws_iam_role=arn:aws:iam::...:role/...'
    delimiter '|'
    region 'us-east-1'
    """)
except psycopg2.Error as e:
    print("Error: Table creation issue")
    print(e)

In [83]:
# testing to see if data load worked below

"""
# pulling data into the cusor with a select qry

try:
    cur.execute("""
    select * from users limit 10;
    """)
except psycopg2.Error as e:
    print("Error: Table creation issue")
    print(e)

# displaying/printing the data out to view in Jupyter

row = cur.fetchone()
while row:
    print(row)
    row = cur.fetchone()
"""

IndentationError: unexpected indent (1203029168.py, line 8)

In [84]:
try:
    cur.execute("""
    copy category from 's3://.../category_pipe.txt'
    credentials 'aws_iam_role=arn:aws:iam::...:role/...'
    delimiter '|'
    region 'us-east-1'
    """)
except psycopg2.Error as e:
    print("Error: Table creation issue")
    print(e)

In [85]:
try:
    cur.execute("""
    copy date from 's3://.../date2008_pipe.txt'
    credentials 'aws_iam_role=arn:aws:iam::...:role/...'
    delimiter '|'
    region 'us-east-1'
    """)
except psycopg2.Error as e:
    print("Error: Table creation issue")
    print(e)

In [86]:
try:
    cur.execute("""
    copy event from 's3://.../allevents_pipe.txt'
    credentials 'aws_iam_role=arn:aws:iam::...:role/...'
    delimiter '|'
    region 'us-east-1'
    """)
except psycopg2.Error as e:
    print("Error: Table creation issue")
    print(e)

In [88]:
try:
    cur.execute("""
    copy venue from 's3://.../venue_pipe.txt'
    credentials 'aws_iam_role=arn:aws:iam::...:role/...'
    delimiter '|'
    region 'us-east-1'
    """)
except psycopg2.Error as e:
    print("Error: Table creation issue")
    print(e)

In [89]:
try:
    cur.execute("""
    copy listing from 's3://.../listings_pipe.txt'
    credentials 'aws_iam_role=arn:aws:iam::...:role/...'
    delimiter '|'
    region 'us-east-1'
    """)
except psycopg2.Error as e:
    print("Error: Table creation issue")
    print(e)

In [90]:
# MAKE SURE TO CLOSE CONNECTION TO THE DB

try:
    conn.close()
except psycopg2.Error as e:
    print(e)

In [91]:
# deleting the redshift cluster

redshift.delete_cluster(ClusterIdentifier=DWH_CLUSTER_IDENTIFIER, SkipFinalClusterSnapshot=True)

{'Cluster': {'ClusterIdentifier': 'joes-first-redshift',
  'NodeType': 'dc2.large',
  'ClusterStatus': 'deleting',
  'ClusterAvailabilityStatus': 'Modifying',
  'MasterUsername': 'awsuser',
  'DBName': 'joes-first-db',
  'Endpoint': {'Address': 'joes-first-redshift.caiyquvcunwp.us-east-1.redshift.amazonaws.com',
   'Port': 5439},
  'ClusterCreateTime': datetime.datetime(2024, 6, 6, 13, 44, 35, 685000, tzinfo=tzutc()),
  'AutomatedSnapshotRetentionPeriod': 1,
  'ManualSnapshotRetentionPeriod': -1,
  'ClusterSecurityGroups': [],
  'VpcSecurityGroups': [{'VpcSecurityGroupId': 'sg-0d7d30122d0774bed',
    'Status': 'active'}],
  'ClusterParameterGroups': [{'ParameterGroupName': 'default.redshift-1.0',
    'ParameterApplyStatus': 'in-sync'}],
  'ClusterSubnetGroupName': 'default',
  'VpcId': 'vpc-0b003b84caf9ee659',
  'AvailabilityZone': 'us-east-1e',
  'PreferredMaintenanceWindow': 'wed:05:00-wed:05:30',
  'PendingModifiedValues': {},
  'ClusterVersion': '1.0',
  'AllowVersionUpgrade': True