## Infrastructure-as-code set-up using AWS's SDK for Python

In [1]:
# load libraries
import pandas as pd
import boto3
import json
import configparser
import sql
import s3fs
from botocore.exceptions import ClientError

### Configuration file

In [2]:
# load parameters from a local configuration file
config = configparser.ConfigParser()
config.read_file(open('dwh.cfg'))

KEY                    = config.get('AWS','KEY')
SECRET                 = config.get('AWS','SECRET')
REGION                 = config.get('AWS','REGION')

DWH_CLUSTER_TYPE       = config.get("DWH","DWH_CLUSTER_TYPE")
DWH_NUM_NODES          = config.get("DWH","DWH_NUM_NODES")
DWH_NODE_TYPE          = config.get("DWH","DWH_NODE_TYPE")
DWH_CLUSTER_IDENTIFIER = config.get("DWH","DWH_CLUSTER_IDENTIFIER")

DWH_DB                 = config.get("DWH","DWH_DB")
DWH_DB_USER            = config.get("DWH","DWH_DB_USER")
DWH_DB_PASSWORD        = config.get("DWH","DWH_DB_PASSWORD")
DWH_PORT               = config.get("DWH","DWH_PORT")

DWH_IAM_ROLE_NAME      = config.get("DWH", "DWH_IAM_ROLE_NAME")

(DWH_DB_USER, DWH_DB_PASSWORD, DWH_DB)

pd.DataFrame({"Param":
                  ["DWH_CLUSTER_TYPE", "DWH_NUM_NODES",
                   "DWH_NODE_TYPE", "DWH_CLUSTER_IDENTIFIER",
                    "DWH_DB", "DWH_DB_USER", "DWH_DB_PASSWORD",
                    "DWH_PORT", "DWH_IAM_ROLE_NAME"],
              "Value":
                  [DWH_CLUSTER_TYPE, DWH_NUM_NODES,
                   DWH_NODE_TYPE, DWH_CLUSTER_IDENTIFIER, 
                   DWH_DB, DWH_DB_USER, DWH_DB_PASSWORD, 
                   DWH_PORT, DWH_IAM_ROLE_NAME]
             })

Unnamed: 0,Param,Value
0,DWH_CLUSTER_TYPE,multi-node
1,DWH_NUM_NODES,4
2,DWH_NODE_TYPE,dc2.large
3,DWH_CLUSTER_IDENTIFIER,project-cluster-1
4,DWH_DB,sparkify
5,DWH_DB_USER,projectuser
6,DWH_DB_PASSWORD,PRojectpassword01!!
7,DWH_PORT,5439
8,DWH_IAM_ROLE_NAME,neidhpa-dwh-project


### AWS Clients

In [3]:
# create clients for IAM, EC2, S3 and Redshift
ec2 = boto3.resource('ec2',
                       region_name=REGION,
                       aws_access_key_id=KEY,
                       aws_secret_access_key=SECRET
                    )

s3 = boto3.resource('s3',
                       region_name=REGION,
                       aws_access_key_id=KEY,
                       aws_secret_access_key=SECRET
                   )

iam = boto3.client('iam',aws_access_key_id=KEY,
                     aws_secret_access_key=SECRET,
                     region_name=REGION
                  )

redshift = boto3.client('redshift',
                       region_name=REGION,
                       aws_access_key_id=KEY,
                       aws_secret_access_key=SECRET
                       )

s3_client = boto3.client('s3',
                        region_name=REGION,
                        aws_access_key_id=KEY,
                        aws_secret_access_key=SECRET
                        )

### Exploratory Data Analysis

In [4]:
# List all buckets in the account
response = s3_client.list_buckets()
buckets = response['Buckets']

# For each bucket, try to list its objects
for bucket in buckets:
    bucket_name = bucket['Name']
    try:
        s3_client.list_objects_v2(Bucket=bucket_name)
        print(f'Access to bucket {bucket_name} is successful.')
    except ClientError as e:
        print(f'Access to bucket {bucket_name} is denied.')

Access to bucket aws-glue-assets-302422927713-us-west-2 is denied.
Access to bucket neidhpa-airflow is successful.
Access to bucket neidhpa-airflow-project is successful.
Access to bucket neidhpa-dwh-project is successful.
Access to bucket neidhpa-lake-house is successful.
Access to bucket neidhpa-lakehouse-project is successful.


In [5]:
# Set the name of the S3 bucket for the project
bucket_name = 'neidhpa-dwh-project'

fs = s3fs.S3FileSystem(
    key=KEY, 
    secret=SECRET,
    client_kwargs={'region_name': REGION}
)

# Use the 'fs' object to interact with your S3 bucket.
# For instance, to list all files in a bucket:

files = fs.ls(f'{bucket_name}/')

for file in files:
    print(file)

neidhpa-dwh-project/log-data
neidhpa-dwh-project/log_json_path.json
neidhpa-dwh-project/song-data


In [6]:
# Create a function to combine all JSONs in a given prefix into a single dataframe
def jsons_to_df(bucket_name: str, prefix: str) -> pd.DataFrame:
    """
    Converts all of the JSONs within a given prefix into a single Pandas dataframe.

    Args:
        bucket_name (str): The name of the S3 bucket.
        prefix (str): The prefix in the S3 bucket to get the JSON files from.

    Returns:
        pd.DataFrame: A dataframe containing the data from all the JSON files in the given prefix.
    """
    
    # Initialize s3fs object
    fs = s3fs.S3FileSystem(
        key=KEY,
        secret=SECRET,
        client_kwargs={'region_name': REGION})

    # Initialize a list to hold all dataframes
    dfs = []

    # Walk through the directories and files in the prefix
    for root, dirs, files in fs.walk(f'{bucket_name}/{prefix}'):
        for file in files:
            # Create the full file path
            file_path = f"{root}/{file}"

            # Open the file and load it into a pandas dataframe
            with fs.open(file_path, 'r') as f:
                df = pd.read_json(f, lines=True)

            # Append the dataframe to the list
            dfs.append(df)

    # Concatenate all the dataframes in the list into a single dataframe
    final_df = pd.concat(dfs, ignore_index=True)

    return final_df

In [7]:
# Create a dataframe for the log data
log_df = jsons_to_df(bucket_name, 'log-data')

In [8]:
# Show the first 3 rows of the dataframe
log_df.head(n=3)

Unnamed: 0,artist,auth,firstName,gender,itemInSession,lastName,length,level,location,method,page,registration,sessionId,song,status,ts,userAgent,userId
0,,Logged In,Walter,M,0,Frye,,free,"San Francisco-Oakland-Hayward, CA",GET,Home,1540919000000.0,38,,200,1541105830796,"""Mozilla/5.0 (Macintosh; Intel Mac OS X 10_9_4...",39
1,,Logged In,Kaylee,F,0,Summers,,free,"Phoenix-Mesa-Scottsdale, AZ",GET,Home,1540345000000.0,139,,200,1541106106796,"""Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebK...",8
2,Des'ree,Logged In,Kaylee,F,1,Summers,246.30812,free,"Phoenix-Mesa-Scottsdale, AZ",PUT,NextSong,1540345000000.0,139,You Gotta Be,200,1541106106796,"""Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebK...",8


In [9]:
# Show the shape and datatypes of the dataframe
log_df.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 8056 entries, 0 to 8055
Data columns (total 18 columns):
 #   Column         Non-Null Count  Dtype  
---  ------         --------------  -----  
 0   artist         6820 non-null   object 
 1   auth           8056 non-null   object 
 2   firstName      7770 non-null   object 
 3   gender         7770 non-null   object 
 4   itemInSession  8056 non-null   int64  
 5   lastName       7770 non-null   object 
 6   length         6820 non-null   float64
 7   level          8056 non-null   object 
 8   location       7770 non-null   object 
 9   method         8056 non-null   object 
 10  page           8056 non-null   object 
 11  registration   7770 non-null   float64
 12  sessionId      8056 non-null   int64  
 13  song           6820 non-null   object 
 14  status         8056 non-null   int64  
 15  ts             8056 non-null   int64  
 16  userAgent      7770 non-null   object 
 17  userId         8056 non-null   object 
dtypes: float

In [10]:
def count_json_files(bucket_name: str, prefix: str) -> int:
    """
    Counts the number of JSON files within a given prefix in an S3 bucket.

    Args:
        bucket_name (str): The name of the S3 bucket.
        prefix (str): The prefix in the S3 bucket to count the JSON files in.
        key (str): The AWS access key ID.
        secret (str): The AWS secret access key.

    Returns:
        int: The number of JSON files in the given prefix.
    """
    # Initialize s3fs object
    fs = s3fs.S3FileSystem(
        key=KEY,
        secret=SECRET,
        client_kwargs={'region_name': REGION})

    # Initialize a counter
    count = 0

    # Walk through the directories and files in the prefix
    for root, dirs, files in fs.walk(f'{bucket_name}/{prefix}'):
        for file in files:
            # Check if the file is a JSON file
            if file.lower().endswith('.json'):
                count += 1

    return count

- The data are JSON files stored within the *song-data* bucket
- Within this bucket there are 8 subfolders labelled alphabetically A-H, each of these subfolders has 26 subfolders labelled alphabetically A-Z, and each of these subfolders has 26 subfolders labeled alphabetically A-Z.
- Each individual JSON file is stored within one of the lowest level subfolders

In [11]:
lowest_prefix = 'song-data/A/A/A'
num_json_files = count_json_files(bucket_name=bucket_name, prefix=lowest_prefix)
print(f"There are {num_json_files} JSON files in the prefix {lowest_prefix}")

There are 24 JSON files in the prefix song-data/A/A/A


In [12]:
# Create a dataframe for the a subset of the song data
example_songs_df = jsons_to_df(bucket_name, lowest_prefix)

In [13]:
# Show the first 3 rows of the dataframe
example_songs_df.head(n=3)

Unnamed: 0,song_id,num_songs,title,artist_name,artist_latitude,year,duration,artist_id,artist_longitude,artist_location
0,SOBLFFE12AF72AA5BA,1,Scream,Adelitas Way,,2009,213.9424,ARJNIUY12298900C91,,
1,SOQPWCR12A6D4FB2A3,1,A Poor Recipe For Civic Cohesion,Western Addiction,37.77916,2005,118.07302,AR73AIO1187B9AD57B,-122.42005,"San Francisco, CA"
2,SOCIWDW12A8C13D406,1,Soul Deep,The Box Tops,35.14968,1969,148.03546,ARMJAGH1187FB546F3,-90.04892,"Memphis, TN"


In [14]:
# Show the shape and datatypes of the dataframe
example_songs_df.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 24 entries, 0 to 23
Data columns (total 10 columns):
 #   Column            Non-Null Count  Dtype  
---  ------            --------------  -----  
 0   song_id           24 non-null     object 
 1   num_songs         24 non-null     int64  
 2   title             24 non-null     object 
 3   artist_name       24 non-null     object 
 4   artist_latitude   12 non-null     float64
 5   year              24 non-null     int64  
 6   duration          24 non-null     float64
 7   artist_id         24 non-null     object 
 8   artist_longitude  12 non-null     float64
 9   artist_location   24 non-null     object 
dtypes: float64(3), int64(2), object(5)
memory usage: 2.0+ KB


### IAM Role

In [15]:
#1.1 Create the role, 
try:
    print("1.1 Creating a new IAM Role") 
    dwhRole = iam.create_role(
        Path='/',
        RoleName=DWH_IAM_ROLE_NAME,
        Description = "Allows Redshift clusters to call AWS services on your behalf.",
        AssumeRolePolicyDocument=json.dumps(
            {'Statement': [{'Action': 'sts:AssumeRole',
               'Effect': 'Allow',
               'Principal': {'Service': 'redshift.amazonaws.com'}}],
             'Version': '2012-10-17'})
    )    
except Exception as e:
    print(e)
    
    
print("1.2 Attaching Policy")

iam.attach_role_policy(RoleName=DWH_IAM_ROLE_NAME,
                       PolicyArn="arn:aws:iam::aws:policy/AmazonS3ReadOnlyAccess"
                      )['ResponseMetadata']['HTTPStatusCode']

iam.attach_role_policy(RoleName=DWH_IAM_ROLE_NAME,
                       PolicyArn="arn:aws:iam::aws:policy/AmazonRedshiftFullAccess"
                      )['ResponseMetadata']['HTTPStatusCode']

print("1.3 Get the IAM role ARN")
roleArn = iam.get_role(RoleName=DWH_IAM_ROLE_NAME)['Role']['Arn']

print(roleArn)

1.1 Creating a new IAM Role
1.2 Attaching Policy
1.3 Get the IAM role ARN
arn:aws:iam::302422927713:role/neidhpa-dwh-project


### Create Redshift cluster

In [16]:
try:
    response = redshift.create_cluster(        
        #HW
        ClusterType=DWH_CLUSTER_TYPE,
        NodeType=DWH_NODE_TYPE,
        NumberOfNodes=int(DWH_NUM_NODES),

        #Identifiers & Credentials
        DBName=DWH_DB,
        ClusterIdentifier=DWH_CLUSTER_IDENTIFIER,
        MasterUsername=DWH_DB_USER,
        MasterUserPassword=DWH_DB_PASSWORD,
        
        #Roles (for s3 access)
        IamRoles=[roleArn]  
    )

    # Wait for cluster creation to complete
    redshift.get_waiter('cluster_available').\
        wait(ClusterIdentifier=DWH_CLUSTER_IDENTIFIER)
    
    # Retrieve the endpoint of the cluster
    cluster_props = redshift.\
        describe_clusters(ClusterIdentifier=DWH_CLUSTER_IDENTIFIER)['Clusters'][0]
    DWH_ENDPOINT = cluster_props['Endpoint']['Address']
    
    print("Cluster created successfully!")
    print("DWH_ENDPOINT:", DWH_ENDPOINT)

except Exception as e:
    print(e)

Cluster created successfully!
DWH_ENDPOINT: project-cluster-1.cavkzsgzmk8j.us-west-2.redshift.amazonaws.com


In [17]:
# Function to neatly display select properties of the Redshift cluster
def RedshiftProps(props):
    # Set the maximum width of a column to display the entire content of a cell
    pd.set_option('display.max_colwidth', None)
    
    # List of keys (properties) to display
    keysToShow = ["ClusterIdentifier", "NodeType", "ClusterStatus",
                "MasterUsername", "DBName", "Endpoint",
                "NumberOfNodes", 'VpcId']
    
    # Create a list of tuples where each tuple is (key, value)
    # Include only the keys that are present in keysToShow list
    x = [(k, v) for k,v in props.items() if k in keysToShow]
    
    # Create and return a dataframe from the list of tuples
    # with column names as "Key" and "Value"
    return pd.DataFrame(data=x, columns=["Key", "Value"])

# Get the properties of the Redshift cluster
# This method returns a response object from which we extract the cluster details
# For this project, we assume there's only one cluster and we access this by index [0]
myClusterProps = redshift.describe_clusters(ClusterIdentifier=DWH_CLUSTER_IDENTIFIER)['Clusters'][0]

# Call the RedshiftProps function defined above to display select properties of the cluster
RedshiftProps(myClusterProps)


Unnamed: 0,Key,Value
0,ClusterIdentifier,project-cluster-1
1,NodeType,dc2.large
2,ClusterStatus,available
3,MasterUsername,projectuser
4,DBName,sparkify
5,Endpoint,"{'Address': 'project-cluster-1.cavkzsgzmk8j.us-west-2.redshift.amazonaws.com', 'Port': 5439}"
6,VpcId,vpc-0478d8d5f8fd4539b
7,NumberOfNodes,4


In [18]:
# Get the cluster endpoint and role ARN from the properties
DWH_ENDPOINT = myClusterProps['Endpoint']['Address']
DWH_ROLE_ARN = myClusterProps['IamRoles'][0]['IamRoleArn']
# Print the values
print("DWH_ENDPOINT :: ", DWH_ENDPOINT)
print("DWH_ROLE_ARN :: ", DWH_ROLE_ARN)

DWH_ENDPOINT ::  project-cluster-1.cavkzsgzmk8j.us-west-2.redshift.amazonaws.com
DWH_ROLE_ARN ::  arn:aws:iam::302422927713:role/neidhpa-dwh-project


In [19]:
try:
    # Use the VPC ID property of the Redshift cluster to get the VPC
    vpc = ec2.Vpc(id=myClusterProps['VpcId'])
    
    # From all the security groups associated with this VPC, get the first one.
    # This assumes the first security group is the default one.
    defaultSg = list(vpc.security_groups.all())[0]
    print(defaultSg)
    
    # Get the existing inbound (ingress) rules for this security group
    existing_rules = defaultSg.ip_permissions
    print("Existing rules:")
    
    # Check each existing rule to see if the rule we want to add already exists
    for rule in existing_rules:
        print(rule)
        # The rule we want to add is for inbound TCP traffic on a certain port (DWH_PORT),
        # allowing any IP to connect (0.0.0.0/0)
        if (
            rule['IpProtocol'].lower() == 'tcp' and
            rule['FromPort'] == int(DWH_PORT) and
            rule['ToPort'] == int(DWH_PORT) and
            any(x['CidrIp'] == '0.0.0.0/0' for x in rule['IpRanges'])
        ):
            print("Rule already exists")
            break
    else:
        # If the loop doesn't break, the rule does not exist and we the rule
        defaultSg.authorize_ingress(
            GroupName=defaultSg.group_name,
            CidrIp='0.0.0.0/0',
            IpProtocol='TCP',
            FromPort=int(DWH_PORT),
            ToPort=int(DWH_PORT)
        )
        
# If there's an error (e.g. permissions error when trying to modify security group)
# print the error message.
except Exception as e:
    print(e)

ec2.SecurityGroup(id='sg-05ad93b78bd502d9e')
Existing rules:
{'FromPort': 5439, 'IpProtocol': 'tcp', 'IpRanges': [{'CidrIp': '0.0.0.0/0', 'Description': 'Allow traffic from anywhere in the world'}], 'Ipv6Ranges': [], 'PrefixListIds': [], 'ToPort': 5439, 'UserIdGroupPairs': []}
Rule already exists


In [20]:
# Load the SQL extension
%load_ext sql

In [21]:
# Confirm the connection to the Redshift cluster
conn_string="postgresql://{}:{}@{}:{}/{}".\
    format(DWH_DB_USER, DWH_DB_PASSWORD, DWH_ENDPOINT, DWH_PORT,DWH_DB)
print(conn_string)
%sql $conn_string

postgresql://projectuser:PRojectpassword01!!@project-cluster-1.cavkzsgzmk8j.us-west-2.redshift.amazonaws.com:5439/sparkify


'Connected: projectuser@sparkify'