In [None]:
import configparser
import psycopg2
import boto3
import json
import pandas as pd

In [None]:
config = configparser.ConfigParser()
config.read_file(open('dwh.cfg'))
aws_key = config.get('AWS','KEY')
aws_secret = config.get('AWS','SECRET')
arn_role = config.get('IAM_ROLE','role_name')
dwh_cluster_type = config.get('CLUSTER', 'dwh_cluster_type')
dwh_num_nodes = config.get('CLUSTER', 'dwh_num_nodes')
dwh_node_type = config.get('CLUSTER', 'dwh_node_type')
dwh_cluster = config.get('CLUSTER','dwh_cluster')
db_name = config.get('CLUSTER','db_name')
db_user = config.get('CLUSTER','db_user')
db_password = config.get('CLUSTER','db_password')
db_port = config.get('CLUSTER','db_port')

### Create redshift client

In [None]:
redshift = boto3.client('redshift'
                        , region_name = 'us-west-2'
                        , aws_access_key_id = aws_key
                        , aws_secret_access_key = aws_secret)

iam = boto3.client('iam'
                    , region_name = 'us-west-2'
                    , aws_access_key_id = aws_key
                    , aws_secret_access_key = aws_secret)

### Create New ARN Role & Attach Policy

In [None]:
# TODO: Create the IAM role
try:
    print('1.1 Creating a new IAM Role')
    dwhRole = iam.create_role(
          RoleName = arn_role
        , Description = 'IAM Role for udacity dwh project, allowing Redshift access to S3 Bucket (ReadOnly)'
        , AssumeRolePolicyDocument=json.dumps(
            {'Statement': [{'Action': 'sts:AssumeRole',
               'Effect': 'Allow',
               'Principal': {'Service': 'redshift.amazonaws.com'}}],
             'Version': '2012-10-17'})
    )
    
except Exception as e:
    print(e)
    
print("1.2 Attaching Policy")

iam.attach_role_policy(RoleName=arn_role,
                       PolicyArn="arn:aws:iam::aws:policy/AmazonS3ReadOnlyAccess"
                      )['ResponseMetadata']['HTTPStatusCode']

### Get Arn from new IAM Role & Write to Config file

In [None]:
role_arn = iam.get_role(RoleName=arn_role)['Role']['Arn']

# Set value for key in config file 
config.set('IAM_ROLE', 'ARN', role_arn)

# Writing our configuration file to 'dwh.cfg'
with open('dwh.cfg', 'w') as configfile:
    config.write(configfile)

### Create Redshift Cluster
- Using client:
    - create cluster
    - create cluster security group
    - create iam role
    - attach policy (read s3) to iam role

In [None]:
try:
    response = redshift.create_cluster(        
        #HW
        ClusterType=dwh_cluster_type,
        NodeType=dwh_node_type,
        NumberOfNodes=int(dwh_num_nodes),

        #Identifiers & Credentials
        DBName=db_name,
        ClusterIdentifier=dwh_cluster,
        MasterUsername=db_user,
        MasterUserPassword=db_password,
        
        #Roles (for s3 access)
        IamRoles=[role_arn]  
    )
except Exception as e:
    print(e)

In [None]:
%run create_cluster.py

In [None]:
%run delete_cluster.py

### Create Staging Table & Load data from s3 bucket

- Create cluster
- Log in using psycopg2 credentials from config file
- Write 'Create table statement' & execute query using psycopg2 cursor


#### S3 Song Dataset

- s3://udacity-dend/song_data
- Example data:
{ "num_songs": 1
 , "artist_id": "ARJIE2Y1187B994AB7"
 , "artist_latitude": null
 , "artist_longitude": null
 , "artist_location": ""
 , "artist_name": "Line Renaud"
 , "song_id": "SOUPIRU12A6D4FA1E1"
 , "title": "Der Kleine Dompfaff"
 , "duration": 152.92036
 , "year": 0
 }


#### S3 Log Dataset
- Log data: s3://udacity-dend/log_data

In [None]:
import psycopg2
import configparser

config = configparser.ConfigParser()
config.read_file(open('dwh.cfg'))

conn = psycopg2.connect(user = config.get('CLUSTER', 'db_user')
                        , password = config.get('CLUSTER', 'db_password')
                        , host = config.get('CLUSTER', 'dwh_host')
                        , port = config.get('CLUSTER', 'db_port')
                        , database = config.get('CLUSTER', 'db_name')
                       )

conn.set_session(autocommit=True)
cur = conn.cursor()

In [None]:
query_create_staging_table_song_data = """
    CREATE TABLE staging_songs (
        artist_id VARCHAR,
        artist_location VARCHAR,
        artist_latitude FLOAT,
        artist_longitude FLOAT,
        artist_name VARCHAR,
        duration FLOAT,
        num_songs INT,
        song_id VARCHAR,
        title VARCHAR,
        year INT
    );
"""


cur.execute(query_create_staging_table_song_data)

In [None]:
query_staging_songs_copy = f"""
    COPY staging_songs
    FROM {config.get('S3','song_data')}
    credentials 'aws_iam_role={config.get('IAM_ROLE','arn')}'
    FORMAT AS JSON 'auto';
"""
cur.execute(query_staging_songs_copy)

In [None]:
query_staging_events_copy = f"""
    COPY staging_events
    FROM {config.get('S3','log_data')}
    credentials 'aws_iam_role={config.get('IAM_ROLE','arn')}'
    json {config.get('S3','log_jsonpath')};
"""
cur.execute(query_staging_events_copy)

In [None]:
pd.read_sql_query("select * from stl_load_errors", conn) 

In [None]:
import boto3

s3 = boto3.resource('s3', region_name = 'us-west-2', aws_access_key_id = aws_key, aws_secret_access_key = aws_secret)
s3_bucket_songs = s3.Bucket("udacity-dend")

In [None]:
df_songs_staging = pd.read_sql_query("select * from staging_songs", conn)
df_songs_staging.describe()

In [None]:
pd.read_sql_query("select * from artists", conn)

In [7]:
%run create_cluster.py

In [8]:
%run create_tables.py

In [10]:
%run etl.py

In [1]:
%run delete_cluster.py

An error occurred (ClusterNotFound) when calling the DeleteCluster operation: Cluster dwhhostudacityproject not found.


In [None]:
from sql_queries import staging_events_copy, staging_songs_copy

cur.execute(staging_events_copy)
cur.execute(staging_songs_copy)

In [None]:
dfe = pd.read_sql_query('select * from staging_events', conn)
dfs = pd.read_sql_query('select * from staging_songs', conn)

In [None]:
pd.read_sql_query('select * from songplays', conn).head(5)

In [None]:
from sql_queries import songplay_table_drop, songplay_table_create, songplay_table_insert

cur.execute(songplay_table_drop)
cur.execute(songplay_table_create)
cur.execute(songplay_table_insert)

In [None]:
df_songplays = pd.read_sql_query('select * from songplays', conn)
df_songs = pd.read_sql_query('select * from songs', conn)
df_user = pd.read_sql_query('select * from users', conn)

In [None]:
df_songs.head(10)