In [1]:
import pandas as pd
import boto3
import json

In [2]:
import configparser
config = configparser.ConfigParser()
# template cfg file is in repo; don't post your api key and secret online
config.read_file(open('~/.aws_creds/dwh.cfg'))

# API credentials for admin account
KEY                    = config.get('AWS','KEY')
SECRET                 = config.get('AWS','SECRET')

# cluster specifications
DWH_CLUSTER_TYPE       = config.get("DWH","DWH_CLUSTER_TYPE")
DWH_NUM_NODES          = config.get("DWH","DWH_NUM_NODES")
DWH_NODE_TYPE          = config.get("DWH","DWH_NODE_TYPE")

DWH_CLUSTER_IDENTIFIER = config.get("DWH","DWH_CLUSTER_IDENTIFIER")
DWH_DB                 = config.get("CLUSTER","DB_NAME")
DWH_DB_USER            = config.get("CLUSTER","DB_USER")
DWH_DB_PASSWORD        = config.get("CLUSTER","DB_PASSWORD")
DWH_PORT               = config.get("CLUSTER","DB_PORT")

DWH_IAM_ROLE_NAME      = config.get("DWH", "DWH_IAM_ROLE_NAME")

pd.DataFrame({"Param":
                  ["DWH_CLUSTER_TYPE", "DWH_NUM_NODES", "DWH_NODE_TYPE", "DWH_CLUSTER_IDENTIFIER", "DWH_DB", "DWH_DB_USER", "DWH_DB_PASSWORD", "DWH_PORT", "DWH_IAM_ROLE_NAME"],
              "Value":
                  [DWH_CLUSTER_TYPE, DWH_NUM_NODES, DWH_NODE_TYPE, DWH_CLUSTER_IDENTIFIER, DWH_DB, DWH_DB_USER, DWH_DB_PASSWORD, DWH_PORT, DWH_IAM_ROLE_NAME]
             })

Unnamed: 0,Param,Value
0,DWH_CLUSTER_TYPE,multi-node
1,DWH_NUM_NODES,4
2,DWH_NODE_TYPE,dc2.large
3,DWH_CLUSTER_IDENTIFIER,dwhCluster
4,DWH_DB,songplay_dwh
5,DWH_DB_USER,master_user
6,DWH_DB_PASSWORD,Sparkify_1987!!salt
7,DWH_PORT,5439
8,DWH_IAM_ROLE_NAME,dwhRole


In [3]:
# create API connections to AWS EC2, IAM and redshift
ec2 = boto3.resource('ec2',
                   region_name="us-west-2",
                       aws_access_key_id=KEY,
                       aws_secret_access_key=SECRET
                    )

s3 = boto3.resource('s3',
                  region_name="us-west-2",
                       aws_access_key_id=KEY,
                       aws_secret_access_key=SECRET
                    )

iam = boto3.client('iam',
                   region_name="us-west-2",
                       aws_access_key_id=KEY,
                       aws_secret_access_key=SECRET
                    )

redshift = boto3.client('redshift',
                        region_name="us-west-2",
                       aws_access_key_id=KEY,
                       aws_secret_access_key=SECRET
                    )

# Examine data in s3 buckets

In [4]:
sampleDbBucket =  s3.Bucket("udacity-dend")

# Iterate over bucket objects starting with "log_data" and print
for obj in sampleDbBucket.objects.filter(Prefix='log_data'):
    print(obj)

s3.ObjectSummary(bucket_name='udacity-dend', key='log_data/')
s3.ObjectSummary(bucket_name='udacity-dend', key='log_data/2018/11/2018-11-01-events.json')
s3.ObjectSummary(bucket_name='udacity-dend', key='log_data/2018/11/2018-11-02-events.json')
s3.ObjectSummary(bucket_name='udacity-dend', key='log_data/2018/11/2018-11-03-events.json')
s3.ObjectSummary(bucket_name='udacity-dend', key='log_data/2018/11/2018-11-04-events.json')
s3.ObjectSummary(bucket_name='udacity-dend', key='log_data/2018/11/2018-11-05-events.json')
s3.ObjectSummary(bucket_name='udacity-dend', key='log_data/2018/11/2018-11-06-events.json')
s3.ObjectSummary(bucket_name='udacity-dend', key='log_data/2018/11/2018-11-07-events.json')
s3.ObjectSummary(bucket_name='udacity-dend', key='log_data/2018/11/2018-11-08-events.json')
s3.ObjectSummary(bucket_name='udacity-dend', key='log_data/2018/11/2018-11-09-events.json')
s3.ObjectSummary(bucket_name='udacity-dend', key='log_data/2018/11/2018-11-10-events.json')
s3.ObjectSummary(b

In [5]:
body = obj.get()['Body'].read().decode("utf-8") 

In [6]:
body.split('\n')[-1]

'{"artist":"Deas Vail","auth":"Logged In","firstName":"Elijah","gender":"M","itemInSession":0,"lastName":"Davis","length":237.68771,"level":"free","location":"Detroit-Warren-Dearborn, MI","method":"PUT","page":"NextSong","registration":1540772343796.0,"sessionId":985,"song":"Anything You Say (Unreleased Version)","status":200,"ts":1543607664796,"userAgent":"\\"Mozilla\\/5.0 (Macintosh; Intel Mac OS X 10_9_4) AppleWebKit\\/537.77.4 (KHTML, like Gecko) Version\\/7.0.5 Safari\\/537.77.4\\"","userId":"5"}'

In [7]:
json.loads(body.split('\n')[0])

{'artist': 'Stephen Lynch',
 'auth': 'Logged In',
 'firstName': 'Jayden',
 'gender': 'M',
 'itemInSession': 0,
 'lastName': 'Bell',
 'length': 182.85669,
 'level': 'free',
 'location': 'Dallas-Fort Worth-Arlington, TX',
 'method': 'PUT',
 'page': 'NextSong',
 'registration': 1540991795796.0,
 'sessionId': 829,
 'song': "Jim Henson's Dead",
 'status': 200,
 'ts': 1543537327796,
 'userAgent': 'Mozilla/5.0 (compatible; MSIE 10.0; Windows NT 6.2; WOW64; Trident/6.0)',
 'userId': '91'}

In [8]:
json_data = [json.loads(b) for b in body.split('\n')]

In [9]:
df = pd.io.json.json_normalize(json_data)

In [10]:
df.head()

Unnamed: 0,artist,auth,firstName,gender,itemInSession,lastName,length,level,location,method,page,registration,sessionId,song,status,ts,userAgent,userId
0,Stephen Lynch,Logged In,Jayden,M,0,Bell,182.85669,free,"Dallas-Fort Worth-Arlington, TX",PUT,NextSong,1540992000000.0,829,Jim Henson's Dead,200,1543537327796,Mozilla/5.0 (compatible; MSIE 10.0; Windows NT...,91
1,Manowar,Logged In,Jacob,M,0,Klein,247.562,paid,"Tampa-St. Petersburg-Clearwater, FL",PUT,NextSong,1540558000000.0,1049,Shell Shock,200,1543540121796,"""Mozilla/5.0 (Macintosh; Intel Mac OS X 10_9_4...",73
2,Morcheeba,Logged In,Jacob,M,1,Klein,257.41016,paid,"Tampa-St. Petersburg-Clearwater, FL",PUT,NextSong,1540558000000.0,1049,Women Lose Weight (Feat: Slick Rick),200,1543540368796,"""Mozilla/5.0 (Macintosh; Intel Mac OS X 10_9_4...",73
3,Maroon 5,Logged In,Jacob,M,2,Klein,231.23546,paid,"Tampa-St. Petersburg-Clearwater, FL",PUT,NextSong,1540558000000.0,1049,Won't Go Home Without You,200,1543540625796,"""Mozilla/5.0 (Macintosh; Intel Mac OS X 10_9_4...",73
4,Train,Logged In,Jacob,M,3,Klein,216.76363,paid,"Tampa-St. Petersburg-Clearwater, FL",PUT,NextSong,1540558000000.0,1049,Hey_ Soul Sister,200,1543540856796,"""Mozilla/5.0 (Macintosh; Intel Mac OS X 10_9_4...",73


In [11]:
cnt = 0
for obj in sampleDbBucket.objects.filter(Prefix='song_data'):
    print(obj)
    cnt += 1
    if cnt > 10:
        break

s3.ObjectSummary(bucket_name='udacity-dend', key='song_data/')
s3.ObjectSummary(bucket_name='udacity-dend', key='song_data/A/A/A/TRAAAAK128F9318786.json')
s3.ObjectSummary(bucket_name='udacity-dend', key='song_data/A/A/A/TRAAAAV128F421A322.json')
s3.ObjectSummary(bucket_name='udacity-dend', key='song_data/A/A/A/TRAAABD128F429CF47.json')
s3.ObjectSummary(bucket_name='udacity-dend', key='song_data/A/A/A/TRAAACN128F9355673.json')
s3.ObjectSummary(bucket_name='udacity-dend', key='song_data/A/A/A/TRAAAEA128F935A30D.json')
s3.ObjectSummary(bucket_name='udacity-dend', key='song_data/A/A/A/TRAAAED128E0783FAB.json')
s3.ObjectSummary(bucket_name='udacity-dend', key='song_data/A/A/A/TRAAAEM128F93347B9.json')
s3.ObjectSummary(bucket_name='udacity-dend', key='song_data/A/A/A/TRAAAEW128F42930C0.json')
s3.ObjectSummary(bucket_name='udacity-dend', key='song_data/A/A/A/TRAAAFD128F92F423A.json')
s3.ObjectSummary(bucket_name='udacity-dend', key='song_data/A/A/A/TRAAAGR128F425B14B.json')


In [12]:
data = obj.get()['Body'].read().decode('utf-8')

In [13]:
json_data = [json.loads(d) for d in data.split('\n')]

In [14]:
df = pd.io.json.json_normalize(json_data)

In [15]:
df.head()

Unnamed: 0,artist_id,artist_latitude,artist_location,artist_longitude,artist_name,duration,num_songs,song_id,title,year
0,ARGE7G11187FB37E05,,"Brooklyn, NY",,Cyndi Lauper,240.63955,1,SONRWUU12AF72A4283,Into The Nightlife,2008


In [16]:
# Create the IAM role
try:
    print('1.1 Creating a new IAM Role')
    dwhRole = iam.create_role(
        Path='/',
        RoleName=DWH_IAM_ROLE_NAME,
        Description = "Allows Redshift clusters to call AWS services on your behalf.",
        AssumeRolePolicyDocument=json.dumps(
            {'Statement': [{'Action': 'sts:AssumeRole', # sts = security token status
               'Effect': 'Allow',
               'Principal': {'Service': 'redshift.amazonaws.com'}}],
             'Version': '2012-10-17'})
    )
except Exception as e:
    print(e)

1.1 Creating a new IAM Role
An error occurred (EntityAlreadyExists) when calling the CreateRole operation: Role with name dwhRole already exists.


In [17]:
# Give IAM read-only access to S3
print('1.2 Attaching Policy')
iam.attach_role_policy(RoleName=DWH_IAM_ROLE_NAME,
                       PolicyArn="arn:aws:iam::aws:policy/AmazonS3ReadOnlyAccess"
                      )['ResponseMetadata']['HTTPStatusCode']

1.2 Attaching Policy


200

In [18]:
# get IAM ARN for creating cluster
roleArn = iam.get_role(RoleName=DWH_IAM_ROLE_NAME)['Role']['Arn']

In [19]:
try:
    response = redshift.create_cluster(        
        # add parameters for hardware
        ClusterType=DWH_CLUSTER_TYPE,
        NodeType=DWH_NODE_TYPE,
        NumberOfNodes=int(DWH_NUM_NODES),

        # add parameters for identifiers & credentials
        DBName=DWH_DB,
        ClusterIdentifier=DWH_CLUSTER_IDENTIFIER,
        MasterUsername=DWH_DB_USER,
        MasterUserPassword=DWH_DB_PASSWORD,
        
        # add parameter for role (to allow s3 access)
        IamRoles=[roleArn]  
    )
except Exception as e:
    print(e)

In [22]:
# view cluster status
def prettyRedshiftProps(props):
    pd.set_option('display.max_colwidth', -1)
    keysToShow = ["ClusterIdentifier", "NodeType", "ClusterStatus", "MasterUsername", "DBName", "Endpoint", "NumberOfNodes", 'VpcId']
    x = [(k, v) for k,v in props.items() if k in keysToShow]
    return pd.DataFrame(data=x, columns=["Key", "Value"])

myClusterProps = redshift.describe_clusters(ClusterIdentifier=DWH_CLUSTER_IDENTIFIER)['Clusters'][0]
prettyRedshiftProps(myClusterProps)

Unnamed: 0,Key,Value
0,ClusterIdentifier,dwhcluster
1,NodeType,dc2.large
2,ClusterStatus,available
3,MasterUsername,master_user
4,DBName,songplay_dwh
5,Endpoint,"{'Address': 'dwhcluster.cvxtjfgdtfwc.us-west-2.redshift.amazonaws.com', 'Port': 5439}"
6,VpcId,vpc-db58d3bf
7,NumberOfNodes,4


# Get DB endpoint connection address and IAM role ARN for connecting and writing to the DB

In [23]:
# get host address for DB and role ARN
# Only the host address is needed for connecting to the DB and querying through python
DWH_ENDPOINT = myClusterProps['Endpoint']['Address']
DWH_ROLE_ARN = myClusterProps['IamRoles'][0]['IamRoleArn']
print("DWH_ENDPOINT :: ", DWH_ENDPOINT)
print("DWH_ROLE_ARN :: ", DWH_ROLE_ARN)

# write 
with open('db_config.cfg', 'w') as f:
    f.write('[DB]\n')
    f.write('DB_ENDPOINT={}\n'.format(DWH_ENDPOINT))
    f.write('DB_IAM_ARN={}\n'.format(DWH_ROLE_ARN))
    
# need to copy-paste these from db_config.cfg to dwh.cfg

DWH_ENDPOINT ::  dwhcluster.cvxtjfgdtfwc.us-west-2.redshift.amazonaws.com
DWH_ROLE_ARN ::  arn:aws:iam::078353780680:role/dwhRole


In [24]:
# open TCP port for access to cluster endpoint
try:
    vpc = ec2.Vpc(id=myClusterProps['VpcId'])
    defaultSg = list(vpc.security_groups.all())[0]
    print(defaultSg)
    
    defaultSg.authorize_ingress(
        # knowledge Q about timeout: https://knowledge.udacity.com/questions/43807
        GroupName='default',  # solution says defaultSg.group_name, but this is incorrect
        CidrIp='0.0.0.0/0',
        IpProtocol='TCP',
        FromPort=int(DWH_PORT),
        ToPort=int(DWH_PORT)
    )
except Exception as e:
    print(e)

ec2.SecurityGroup(id='sg-009833244f0de941a')
An error occurred (InvalidPermission.Duplicate) when calling the AuthorizeSecurityGroupIngress operation: the specified rule "peer: 0.0.0.0/0, TCP, from port: 5439, to port: 5439, ALLOW" already exists


In [25]:
# reload config file with ARN and endpoint updated
config.read_file(open('dwh.cfg'))

# Test connection to the DB and run some EDA/data quality check queries

In [26]:
import psycopg2

conn = psycopg2.connect("host={} dbname={} user={} password={} port={}".format(*config['CLUSTER'].values()))
cur = conn.cursor()

In [27]:
query = """
SELECT * FROM staging_events WHERE song = 'En Mi Mundo' LIMIT 5;"""
cur.execute(query)
for c in cur.fetchall():
    print(c)

('Gary Hobbs', 'Logged In', 'Jahiem', 'M', 1, 'Miles', Decimal('245'), 'free', 'San Antonio-New Braunfels, TX', 'PUT', 'NextSong', Decimal('1540817347796'), 42, 'En Mi Mundo', 200, 1541300092796, '"Mozilla/5.0 (Windows NT 5.1) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/36.0.1985.143 Safari/537.36"', 43)


In [28]:
# ideally this testing would be in a separate notebook
# check database table data
cur.execute('SELECT COUNT(*) FROM staging_events;')
cur.fetchone()

(8056,)

In [29]:
cur.execute('SELECT * FROM staging_events WHERE userId IS NULL LIMIT 5;')
for c in cur.fetchall():
    print(c)

(None, 'Logged Out', None, None, 0, None, None, 'paid', None, 'PUT', 'Login', None, 130, None, 307, 1541519496796, None, None)
(None, 'Logged Out', None, None, 8, None, None, 'free', None, 'GET', 'Home', None, 126, None, 200, 1541522448796, None, None)
(None, 'Logged Out', None, None, 9, None, None, 'free', None, 'GET', 'About', None, 126, None, 200, 1541522507796, None, None)
(None, 'Logged Out', None, None, 10, None, None, 'free', None, 'PUT', 'Login', None, 126, None, 307, 1541522508796, None, None)
(None, 'Logged Out', None, None, 3, None, None, 'paid', None, 'GET', 'Home', None, 128, None, 200, 1541310732796, None, None)


In [30]:
cur.execute("SELECT * FROM staging_events WHERE userId IS NULL AND song='NextSong';")
for c in cur.fetchall():
    print(c)

In [31]:
# for checking errors when loading
cur.execute('SELECT * FROM stl_load_errors;')
for c in cur.fetchall():
    print(c)

In [32]:
query = """
SELECT *
FROM staging_events se
JOIN staging_songs ss
ON (se.artist = ss.artist_name AND se.song = ss.title)
WHERE se.page = 'NextSong' AND se.userId is NULL;
"""

In [33]:
# for checking errors when loading
cur.execute(query)
for c in cur.fetchmany():
    print(c)

In [34]:
cur.execute('SELECT * FROM staging_events LIMIT 1;')

In [35]:
for c in cur.fetchmany():
    print(c)

(None, 'Logged In', 'Adler', 'M', 0, 'Barrera', None, 'free', 'New York-Newark-Jersey City, NY-NJ-PA', 'GET', 'Home', Decimal('1540835983796'), 248, None, 200, 1541470364796, '"Mozilla/5.0 (Macintosh; Intel Mac OS X 10_9_4) AppleWebKit/537.78.2 (KHTML, like Gecko) Version/7.0.6 Safari/537.78.2"', 100)


In [36]:
# need to close connection to allow for importing data
conn.close()

# Deletes cluster and IAM role

In [37]:
#### CAREFUL!!
#-- Uncomment & run to delete the created resources
redshift.delete_cluster( ClusterIdentifier=DWH_CLUSTER_IDENTIFIER,  SkipFinalClusterSnapshot=True)
#### CAREFUL!!

{'Cluster': {'ClusterIdentifier': 'dwhcluster',
  'NodeType': 'dc2.large',
  'ClusterStatus': 'deleting',
  'MasterUsername': 'master_user',
  'DBName': 'songplay_dwh',
  'Endpoint': {'Address': 'dwhcluster.cvxtjfgdtfwc.us-west-2.redshift.amazonaws.com',
   'Port': 5439},
  'ClusterCreateTime': datetime.datetime(2020, 2, 14, 4, 27, 46, 234000, tzinfo=tzlocal()),
  'AutomatedSnapshotRetentionPeriod': 1,
  'ClusterSecurityGroups': [],
  'VpcSecurityGroups': [{'VpcSecurityGroupId': 'sg-2bd71f4d',
    'Status': 'active'}],
  'ClusterParameterGroups': [{'ParameterGroupName': 'default.redshift-1.0',
    'ParameterApplyStatus': 'in-sync'}],
  'ClusterSubnetGroupName': 'default',
  'VpcId': 'vpc-db58d3bf',
  'AvailabilityZone': 'us-west-2a',
  'PreferredMaintenanceWindow': 'fri:12:30-fri:13:00',
  'PendingModifiedValues': {},
  'ClusterVersion': '1.0',
  'AllowVersionUpgrade': True,
  'NumberOfNodes': 4,
  'PubliclyAccessible': True,
  'Encrypted': False,
  'Tags': [],
  'EnhancedVpcRouting': 

In [38]:
# check status
myClusterProps = redshift.describe_clusters(ClusterIdentifier=DWH_CLUSTER_IDENTIFIER)['Clusters'][0]
prettyRedshiftProps(myClusterProps)

Unnamed: 0,Key,Value
0,ClusterIdentifier,dwhcluster
1,NodeType,dc2.large
2,ClusterStatus,deleting
3,MasterUsername,master_user
4,DBName,songplay_dwh
5,Endpoint,"{'Address': 'dwhcluster.cvxtjfgdtfwc.us-west-2.redshift.amazonaws.com', 'Port': 5439}"
6,VpcId,vpc-db58d3bf
7,NumberOfNodes,4


In [39]:
# delete IAM role
#### CAREFUL!!
#-- Uncomment & run to delete the created resources
iam.detach_role_policy(RoleName=DWH_IAM_ROLE_NAME, PolicyArn="arn:aws:iam::aws:policy/AmazonS3ReadOnlyAccess")
iam.delete_role(RoleName=DWH_IAM_ROLE_NAME)
#### CAREFUL!!

{'ResponseMetadata': {'RequestId': 'e3740e2b-ec73-4798-8d3f-bffa3395148d',
  'HTTPStatusCode': 200,
  'HTTPHeaders': {'x-amzn-requestid': 'e3740e2b-ec73-4798-8d3f-bffa3395148d',
   'content-type': 'text/xml',
   'content-length': '200',
   'date': 'Fri, 14 Feb 2020 04:33:06 GMT'},
  'RetryAttempts': 0}}