In [1]:
import boto3
from project3_iam_user import * #import ACCESS_KEY_ID and SECRET_ACCESS_KEY

In [2]:
DWH_CLUSTER_TYPE = 'single-node'
#DWH_NUM_NODES = 2
DWH_NODE_TYPE = 'dc2.large'
DWH_CLUSTER_IDENTIFIER = 'project3-cluster'
DWH_DB = 'project3_db'
DWH_DB_USER = 'p3user'
DWH_DB_PASSWORD = 'p3Passw0rd'
DWH_PORT = 5439
DWH_IAM_ROLE_NAME =  'project3_redshift_role'
REGION = 'us-west-2'

# Create clients for IAM and Redshift

In [3]:
iam = boto3.client('iam',aws_access_key_id=ACCESS_KEY_ID,
                     aws_secret_access_key=SECRET_ACCESS_KEY,
                     region_name=REGION
                  )

redshift = boto3.client('redshift',
                       region_name=REGION,
                       aws_access_key_id=ACCESS_KEY_ID,
                       aws_secret_access_key=SECRET_ACCESS_KEY
                       )

ec2 = boto3.resource('ec2',
                       region_name=REGION,
                       aws_access_key_id=ACCESS_KEY_ID,
                       aws_secret_access_key=SECRET_ACCESS_KEY
                    )

# Create IAM Role for Redshift
Make Redshift able to read s3

In [4]:
from botocore.exceptions import ClientError
import json

#Create the role, 
try: #it might exists already
    print("1.1 Creating a new IAM Role") 
    dwhRole = iam.create_role(
        Path='/',
        RoleName=DWH_IAM_ROLE_NAME,
        Description = "Allows Redshift clusters to call AWS services on your behalf.",
        AssumeRolePolicyDocument=json.dumps(
            {'Statement': [{'Action': 'sts:AssumeRole',
               'Effect': 'Allow',
               'Principal': {'Service': 'redshift.amazonaws.com'}}],
             'Version': '2012-10-17'})
    )    
except Exception as e:
    print(e)
    
    
print("1.2 Attaching Policy")

iam.attach_role_policy(RoleName=DWH_IAM_ROLE_NAME,
                       PolicyArn="arn:aws:iam::aws:policy/AmazonS3ReadOnlyAccess"
                      )['ResponseMetadata']['HTTPStatusCode']

print("1.3 Get the IAM role ARN")
dwh_role_arn = iam.get_role(RoleName=DWH_IAM_ROLE_NAME)['Role']['Arn']

print(dwh_role_arn)

1.1 Creating a new IAM Role
An error occurred (EntityAlreadyExists) when calling the CreateRole operation: Role with name project3_redshift_role already exists.
1.2 Attaching Policy
1.3 Get the IAM role ARN
arn:aws:iam::929805762734:role/project3_redshift_role


# Create Redshift cluster
Create a Redshift cluster using the preset parameters and associate the created role to it

In [5]:
try:
    response = redshift.create_cluster(        
        #HW
        ClusterType = DWH_CLUSTER_TYPE,
        NodeType = DWH_NODE_TYPE,
        #NumberOfNodes = DWH_NUM_NODES,

        #Identifiers & Credentials
        DBName = DWH_DB,
        ClusterIdentifier = DWH_CLUSTER_IDENTIFIER,
        MasterUsername = DWH_DB_USER,
        MasterUserPassword = DWH_DB_PASSWORD,
        
        #Roles (for s3 access)
        IamRoles=[dwh_role_arn]  
    )
except Exception as e:
    print(e)



# Get cluster properties

In [6]:
# wait until the cluster is ready
waiter = redshift.get_waiter('cluster_available')
waiter.wait( 
    ClusterIdentifier = DWH_CLUSTER_IDENTIFIER,
    WaiterConfig={
        'Delay': 30,
        'MaxAttempts': 20
    }
)

#get properties dictionary
myClusterProps = redshift.describe_clusters(ClusterIdentifier=DWH_CLUSTER_IDENTIFIER)['Clusters'][0]

# Open a TCP port to access the cluster endpoint

In [7]:
try: #the rule might already exist
    vpc = ec2.Vpc(id=myClusterProps['VpcId'])
    defaultSg = list(vpc.security_groups.all())[0]
    print(defaultSg)
    defaultSg.authorize_ingress(
        GroupName=defaultSg.group_name,
        CidrIp='0.0.0.0/0',
        IpProtocol='TCP',
        FromPort=int(DWH_PORT),
        ToPort=int(DWH_PORT)
    )
except Exception as e:
    print(e)

ec2.SecurityGroup(id='sg-0006eb2f148068d87')
An error occurred (InvalidPermission.Duplicate) when calling the AuthorizeSecurityGroupIngress operation: the specified rule "peer: 0.0.0.0/0, TCP, from port: 5439, to port: 5439, ALLOW" already exists


# Write cluster props to cfg file

In [8]:
file_text = f"""[CLUSTER]
HOST='{myClusterProps['Endpoint']['Address']}'
DB_NAME='{DWH_DB}'
DB_USER='{DWH_DB_USER}'
DB_PASSWORD='{DWH_DB_PASSWORD}'
DB_PORT='{myClusterProps['Endpoint']['Port']}'

[IAM_ROLE]
ARN={dwh_role_arn}

[S3]
LOG_DATA='s3://udacity-dend/log_data'
LOG_JSONPATH='s3://udacity-dend/log_json_path.json'
SONG_DATA='s3://udacity-dend/song_data'

[REGION]
REGION='{REGION}'
"""

f = open('dwh.cfg', 'w')
f.write(file_text)
f.close()

# Run create_tables.py

In [9]:
! python3 create_tables.py

# Run etl.py

In [13]:
! python3 etl.py

starting load
starting insert

INSERT INTO users (id, first_name, last_name, gender, level)
SELECT userid,
       firstname,
       lastname,
       gender,
       level
FROM staging_events
WHERE page = 'NextSong'


INSERT INTO songs (id, title, artist_id, year, duration)
SELECT song_id,
       title,
       artist_id,
       year,
       duration
FROM staging_songs


INSERT INTO artists (id, name, location, latitude, longitude)
SELECT artist_id,
       artist_name,
       artist_location,
       artist_latitude,
       artist_longitude
FROM staging_songs


INSERT INTO time (start_time, hour, day, week, month, year, weekday)
SELECT TIMESTAMP 'epoch' + ts::BIGINT/1000 * INTERVAL '1 second' AS epoch_to_timestamp,
       DATE_PART('hour', epoch_to_timestamp),
       DATE_PART('day', epoch_to_timestamp),
       DATE_PART('week', epoch_to_timestamp),
       DATE_PART('month', epoch_to_timestamp),
       DATE_PART('year', epoch_to_timestamp),
       DATE_PART('weekday', epoch_to_timestamp)
F

# Delete cluster

In [14]:
redshift.delete_cluster( ClusterIdentifier='project3-cluster',  SkipFinalClusterSnapshot=True)

{'Cluster': {'ClusterIdentifier': 'project3-cluster',
  'NodeType': 'dc2.large',
  'ClusterStatus': 'deleting',
  'ClusterAvailabilityStatus': 'Modifying',
  'MasterUsername': 'p3user',
  'DBName': 'project3_db',
  'Endpoint': {'Address': 'project3-cluster.cqh4kd6a34p7.us-west-2.redshift.amazonaws.com',
   'Port': 5439},
  'ClusterCreateTime': datetime.datetime(2021, 12, 27, 21, 10, 10, 326000, tzinfo=tzutc()),
  'AutomatedSnapshotRetentionPeriod': 1,
  'ManualSnapshotRetentionPeriod': -1,
  'ClusterSecurityGroups': [],
  'VpcSecurityGroups': [{'VpcSecurityGroupId': 'sg-0006eb2f148068d87',
    'Status': 'active'}],
  'ClusterParameterGroups': [{'ParameterGroupName': 'default.redshift-1.0',
    'ParameterApplyStatus': 'in-sync'}],
  'ClusterSubnetGroupName': 'default',
  'VpcId': 'vpc-0ab9503a980b66838',
  'AvailabilityZone': 'us-west-2c',
  'PreferredMaintenanceWindow': 'thu:08:30-thu:09:00',
  'PendingModifiedValues': {},
  'ClusterVersion': '1.0',
  'AllowVersionUpgrade': True,
  'Nu