In [2]:
import boto3
import pandas as pd
import json
import psycopg2

In [3]:
#read config file

import configparser
config = configparser.ConfigParser()
config.read_file(open("F:\\Projects\\Redshift_Cluster\\details.config"))

In [4]:
#set values in variables by providing section name and desired value

key = config.get("AWS","key")
secret = config.get("AWS","secret")

DWH_CLUSTER_TYPE = config.get("DWH","DWH_CLUSTER_TYPE")
DWH_NUM_NODES = config.get("DWH","DWH_NUM_NODES")
DWH_NODE_TYPE = config.get("DWH","DWH_NODE_TYPE")
DWH_CLUSTER_IDENTIFIER = config.get("DWH","DWH_CLUSTER_IDENTIFIER")
DWH_DB = config.get("DWH","DWH_DB")
DWH_DB_USER = config.get("DWH","DWH_DB_USER")
DWH_DB_PASSWORD= config.get("DWH","DWH_DB_PASSWORD")
DWH_PORT = config.get("DWH","DWH_PORT")
DWH_IAM_ROLE_NAME =config.get("DWH","DWH_IAM_ROLE_NAME")

(DWH_DB_USER, DWH_DB_PASSWORD, DWH_DB)

('awsuser', 'Passw0rd123', 'myfirstdb')

In [5]:
#create connection with aws

ec2 = boto3.resource("ec2",
                    region_name="us-east-1",
                    aws_access_key_id=key,
                    aws_secret_access_key=secret)

In [6]:
s3 = boto3.resource("s3",
                    region_name="us-east-1",
                    aws_access_key_id=key,
                    aws_secret_access_key=secret)

iam = boto3.client("iam",
                    region_name="us-east-1",
                    aws_access_key_id=key,
                    aws_secret_access_key=secret)

redshift = boto3.client("redshift",
                    region_name="us-east-1",
                    aws_access_key_id=key,
                    aws_secret_access_key=secret)

In [23]:
#get the value from s3 bucket
bucket = s3.Bucket("shehry-test")
log_data_files = [filename.key for filename in bucket.objects.filter(Prefix="")]
log_data_files

['allevents_pipe.txt',
 'allusers_pipe.txt',
 'category_pipe.txt',
 'date2008_pipe.txt',
 'listings_pipe.txt',
 'sales_tab.txt',
 'venue_pipe.txt']

In [8]:
#iam role access to redshift-s3 buckets

roleArn = iam.get_role(RoleName=DWH_IAM_ROLE_NAME)["Role"]["Arn"]

In [None]:
#creating redshift cluster

try:
    response = redshift.create_cluster(
    ClusterType = DWH_CLUSTER_TYPE,
    NodeType = DWH_NODE_TYPE,
        #identifiers & credentials
    DBName = DWH_DB,
    ClusterIdentifier = DWH_CLUSTER_IDENTIFIER,
    MasterUsername = DWH_DB_USER,
    MasterUserPassword = DWH_DB_PASSWORD,
        #roles for s3 access
    IamRoles =[roleArn]
    )
except Exception as e:
    print(e)

In [9]:
# by using this method all the details of cluster appeared

redshift.describe_clusters(ClusterIdentifier=DWH_CLUSTER_IDENTIFIER)

{'Clusters': [{'ClusterIdentifier': 'my-first-redshift',
   'NodeType': 'dc2.large',
   'ClusterStatus': 'available',
   'ClusterAvailabilityStatus': 'Available',
   'MasterUsername': 'awsuser',
   'DBName': 'myfirstdb',
   'Endpoint': {'Address': 'my-first-redshift.czehrwblbatq.us-east-1.redshift.amazonaws.com',
    'Port': 5439},
   'ClusterCreateTime': datetime.datetime(2023, 3, 19, 22, 41, 42, 934000, tzinfo=tzutc()),
   'AutomatedSnapshotRetentionPeriod': 1,
   'ManualSnapshotRetentionPeriod': -1,
   'ClusterSecurityGroups': [],
   'VpcSecurityGroups': [{'VpcSecurityGroupId': 'sg-00c9a3ca98f432845',
     'Status': 'active'}],
   'ClusterParameterGroups': [{'ParameterGroupName': 'default.redshift-1.0',
     'ParameterApplyStatus': 'in-sync'}],
   'ClusterSubnetGroupName': 'default',
   'VpcId': 'vpc-0446f2b750060a6e6',
   'AvailabilityZone': 'us-east-1e',
   'PreferredMaintenanceWindow': 'sun:04:30-sun:05:00',
   'PendingModifiedValues': {},
   'ClusterVersion': '1.0',
   'AllowVer

In [10]:
#this function is used to show the key and values of the above cluster in dataframe

def redshif(props):
    pd.set_option("display.max_colwidth", -1)
    keysToShow=["ClusterIdentifier","NodeType","ClusterStatus","MasterUsername","DBName","Endpoint","VpcId"]
    x= [(k,v) for k,v in props.items() if k in keysToShow]
    return pd.DataFrame(data=x, columns=["key","values"])

mycluster=redshift.describe_clusters(ClusterIdentifier=DWH_CLUSTER_IDENTIFIER)["Clusters"][0]
redshif(mycluster)

  pd.set_option("display.max_colwidth", -1)


Unnamed: 0,key,values
0,ClusterIdentifier,my-first-redshift
1,NodeType,dc2.large
2,ClusterStatus,available
3,MasterUsername,awsuser
4,DBName,myfirstdb
5,Endpoint,"{'Address': 'my-first-redshift.czehrwblbatq.us-east-1.redshift.amazonaws.com', 'Port': 5439}"
6,VpcId,vpc-0446f2b750060a6e6


In [37]:
#retrieves information from mycluster and store in a variable

DWH_ENDPOINT = mycluster["Endpoint"]["Address"]
DWH_ROLE_ARN = mycluster["IamRoles"][0]["IamRoleArn"]
DB_NAME = mycluster["DBName"]
DB_USER = mycluster["MasterUsername"]

In [36]:
try:
    vpc = ec2.Vpc(id=mycluster["VpcId"])  #get the vpc id
    defaultSg = list(vpc.security_groups.all())[0]  # get the security groups present in vpc and store in variable
    print(defaultSg)
    #then adds a new inbound rule to the selected security group using the authorize_ingress()
    defaultSg.authorize_ingress(
    GroupName = defaultSg.group_name,
    CidrIp = "0.0.0.0/0",
    IpProtocol = "TCP",
    FromPort = int(DWH_PORT),
    ToPort = int(DWH_PORT)
    )
except Exception as e:
    print(e)

ec2.SecurityGroup(id='sg-00c9a3ca98f432845')
An error occurred (InvalidPermission.Duplicate) when calling the AuthorizeSecurityGroupIngress operation: the specified rule "peer: 0.0.0.0/0, TCP, from port: 5439, to port: 5439, ALLOW" already exists


In [13]:
#creating connection with postgres

try:
    conn = psycopg2.connect(host=DWH_ENDPOINT,dbname=DB_NAME,user=DB_USER,password="Passw0rd123",port=5439)
except psycopg2.Error as e:
    print("connection error with postgres")
    print(e)

conn.set_session(autocommit=True)

In [14]:
#create cursor it is used for execution of queries

try:
    cur = conn.cursor()
except psycopg2.Error as e:
    print("error occurred while creating cursor")
    print(e)

In [39]:
#create table

try:
    cur.execute("""create table users(
userid integer not null distkey sortkey,
username char(20),
firstname varchar(20),
lastname varchar(20),
city varchar(20),
state varchar(20),
email varchar(50),
phone char(14),
likesports boolean,
liketheatre boolean,
likeconcerts boolean,
likejazz boolean,
likeclassical boolean,
likeopera boolean,
likerock boolean,
likevegas boolean,
likebroadway boolean,
likemusical boolean);""")
except psycopg2.Error as e:
    print("error occurred while creating table")
    print(e)

In [40]:
try:
    cur.execute("""create table venue(
    venueid smallint not null distkey sortkey,
    venuename varchar(100),
    venuecity varchar(50),
    venuestate char(2),
    venueseats integer);""")
except psycopg2.Error as e:
    print("error occurred while creating venue table")
    print(e)

In [42]:
try:
    cur.execute("""create table category(
    catid smallint not null distkey sortkey,
    catgroup varchar(10),
    catname varchar(10),
    catdesc varchar(50));
    
create table date(
dateid smallint not null distkey sortkey,
caldate date not null,
day character(3) not null,
week smallint not null,
month character(3) not null,
qtr character(5) not null,
year smallint not null,
holiday boolean default('N'));

create table event(
eventid integer not null distkey,
venueid smallint not null,
catid smallint not null,
dateid smallint not null sortkey,
eventname varchar(200),
startime timestamp);

create table listing(
listid integer not null distkey,
sellerid integer not null,
eventid integer not null,
dateid smallint not null sortkey,
numtickets smallint not null,
priceperticket decimal(8,2),
totalprice decimal(8,2),
listtime timestamp);
    """)
    
except psycopg2.Error as e:
    print("error occured")
    print(e)
    

In [38]:
#in this query copy data from s3-bucket and load into table

try:
    cur.execute("""
    COPY users FROM 's3://shehry-test/allusers_pipe.txt'
CREDENTIALS 'aws_iam_role=arn:aws:iam::509705555503:role/redshift-s3-access'
DELIMITER '|'
REGION 'ap-east-1'
MAXERROR 100;


    """)
except psycopg2.Error as e:
    print("error occured while copying")
    print(e)

error occured while copying
S3ServiceException:The unspecified location constraint is incompatible for the region specific endpoint this request was sent to.,Status 400,Error IllegalLocationConstraintException,Rid 5E415CZXGQMCJHSJ,ExtRid HX8KWFxCe8R5Zl+91bYDaoduqardXzbhkGuloCQFP69dTh
DETAIL:  
  -----------------------------------------------
  error:  S3ServiceException:The unspecified location constraint is incompatible for the region specific endpoint this request was sent to.,Status 400,Error IllegalLocationConstraintException,Rid 5E415CZXGQMCJHSJ,ExtRid HX8KWFxCe8R5Zl+91bYDaoduqardXzbhkGuloCQFP69dTh
  code:      8001
  context:   Listing bucket=shehry-test prefix=allusers_pipe.txt
  query:     26647
  location:  s3_utility.cpp:687
  process:   padbmaster [pid=21474]
  -----------------------------------------------




In [33]:
#run the query

try:
    cur.execute("""
   SELECT * FROM users;
    """)
except psycopg2.Error as e:
    print("error occured while copying")
    print(e)

error occured while copying
column "tablename" does not exist in stl_load_errors



In [32]:
#feth the rows from table

row = cur.fetchone()
while row:
    print(row)
    row=cur.fetchone()

(100, 0, 106395, datetime.datetime(2023, 3, 21, 16, 37, 11, 6284), 1073782902, 25774, 's3://shehry-test-bucket/allusers_pipe.txt                                                                                                                                                                                                                       ', 257, 'city                                                                                                                           ', 'varchar   ', '20        ', 27, '257|HLH30HFQ|Camille|Baker|Truth or Consequences|QC|Etiam@semperpretiumneque.com|(811) 323-6439||||TRUE|TRUE|TRUE||||                                                                                                                                                                                                                                                                                                                                                                                

In [39]:
#close the postgres connection

try:
    conn.close()
except Exception as e:
    print(e)

In [40]:
#delete the entire cluster

redshift.delete_cluster(ClusterIdentifier=DWH_CLUSTER_IDENTIFIER, SkipFinalClusterSnapshot=True)

{'Cluster': {'ClusterIdentifier': 'my-first-redshift',
  'NodeType': 'dc2.large',
  'ClusterStatus': 'deleting',
  'ClusterAvailabilityStatus': 'Modifying',
  'MasterUsername': 'awsuser',
  'DBName': 'myfirstdb',
  'Endpoint': {'Address': 'my-first-redshift.czehrwblbatq.us-east-1.redshift.amazonaws.com',
   'Port': 5439},
  'ClusterCreateTime': datetime.datetime(2023, 3, 19, 22, 41, 42, 934000, tzinfo=tzutc()),
  'AutomatedSnapshotRetentionPeriod': 1,
  'ManualSnapshotRetentionPeriod': -1,
  'ClusterSecurityGroups': [],
  'VpcSecurityGroups': [{'VpcSecurityGroupId': 'sg-00c9a3ca98f432845',
    'Status': 'active'}],
  'ClusterParameterGroups': [{'ParameterGroupName': 'default.redshift-1.0',
    'ParameterApplyStatus': 'in-sync'}],
  'ClusterSubnetGroupName': 'default',
  'VpcId': 'vpc-0446f2b750060a6e6',
  'AvailabilityZone': 'us-east-1e',
  'PreferredMaintenanceWindow': 'sun:04:30-sun:05:00',
  'PendingModifiedValues': {},
  'ClusterVersion': '1.0',
  'AllowVersionUpgrade': True,
  'Nu