In [75]:
import configparser

config = configparser.ConfigParser()
config.read_file(open('dwh.cfg'))

# amazon aws
KEY = config.get('AWS', 'key')
SECRET = config.get('AWS', 'secret')

# Redshift
DWH_DB = config.get('DWH', 'DWH_DB')
DWH_DB_USER = config.get('DWH', 'DWH_DB_USER')
DWH_DB_PASSWORD = config.get('DWH', 'DWH_DB_PASSWORD')
DWH_PORT = config.get('DWH', 'DWH_PORT')
DWH_CLUSTER_TYPE = config.get('DWH', 'DWH_CLUSTER_TYPE')
DWH_NUM_NODES = config.get('DWH', 'DWH_NUM_NODES')
DWH_NODE_TYPE = config.get('DWH', 'DWH_NODE_TYPE')
DWH_IAM_ROLE_NAME = config.get('DWH', 'DWH_IAM_ROLE_NAME')
DWH_CLUSTER_IDENTIFIER = config.get('DWH', 'DWH_CLUSTER_IDENTIFIER')
DWH_SCHEMA = config.get('DWH', 'DWH_SCHEMA')
DWH_LOG_STAGING_TABLE = config.get('DWH', 'DWH_LOG_STAGING_TABLE')
DWH_SONG_STAGING_TABLE = config.get('DWH', 'DWH_SONG_STAGING_TABLE')
DWH_REGION = config.get('DWH','DWH_REGION')

# s3
LOG_JSON_FORMAT = config.get('S3', 'LOG_JSON_FORMAT')
S3_BUCKET_LOG_JSON_PATH = config.get('S3', 'S3_BUCKET_LOG_JSON_PATH')
S3_BUCKET_SONG_JSON_PATH = config.get('S3', 'S3_BUCKET_SONG_JSON_PATH')

In [79]:
DWH_LOG_STAGING_TABLE

'log_staging'

In [3]:
import boto3
from config import *
import json
from botocore.exceptions import ClientError
import utils
from smart_open import open


def create_iam_role():
    iam = boto3.client('iam',
                       aws_access_key_id=KEY,
                       aws_secret_access_key=SECRET,
                       region_name='us-west-2'
                       )
    print("1.1 creating role")
    try:
        iam.create_role(
            Path='/',
            RoleName=DWH_IAM_ROLE_NAME,
            Description="Allows Redshift to call AWS Services.",
            AssumeRolePolicyDocument=json.dumps(
                {'Statement': [{'Action': 'sts:AssumeRole',
                  'Effect': 'Allow',
                  'Principal': {'Service': 'redshift.amazonaws.com'}}],
                 'Version': '2012-10-17'})
            )

    except ClientError as e:
        print(f'ERROR: {e}')

    print("1.2 Attaching Policy")
    try:
        iam.attach_role_policy(
            RoleName=DWH_IAM_ROLE_NAME,
            PolicyArn="arn:aws:iam::aws:policy/AmazonS3ReadOnlyAccess")\
                        ['ResponseMetadata']['HTTPStatusCode']
    except ClientError as e:
        print(f'ERROR: {e}')

    print("1.3 Get the IAM role ARN")
    roleArn = iam.get_role(RoleName=DWH_IAM_ROLE_NAME)['Role']['Arn']
    return roleArn


def create_redshift_cluster(roleArn):
    print("1.1 Client is created ...")
    redshift = boto3.client('redshift',
                            region_name="us-west-2",
                            aws_access_key_id=KEY,
                            aws_secret_access_key=SECRET
                            )
    try:
        print("1.2 Cluster config is being created ...")
        redshift.create_cluster(
            # HW
            ClusterType=DWH_CLUSTER_TYPE,
            NodeType=DWH_NODE_TYPE,
            NumberOfNodes=int(DWH_NUM_NODES),

            # Identifiers & Credentials
            DBName=DWH_DB,
            ClusterIdentifier=DWH_CLUSTER_IDENTIFIER,
            MasterUsername=DWH_DB_USER,
            MasterUserPassword=DWH_DB_PASSWORD,

            # Roles (for s3 access)
            IamRoles=[roleArn])
    except ClientError as e:
        print(f'ERROR: {e}')

    print("1.3 Cluster is being created ...")
    while redshift.describe_clusters(
            ClusterIdentifier=DWH_CLUSTER_IDENTIFIER)\
            ['Clusters'][0]['ClusterStatus'] != 'available':
        utils.animate()

    print("\r1.4 Cluster is created successfully ...")
    return redshift.describe_clusters(
        ClusterIdentifier=DWH_CLUSTER_IDENTIFIER)\
    ['Clusters'][0]['Endpoint']['Address']


def delete_redshift_cluster():
    print("1.1 Client is created ...")
    redshift = boto3.client('redshift',
                            region_name="us-west-2",
                            aws_access_key_id=KEY,
                            aws_secret_access_key=SECRET
                            )
    print("1.2 Cluster is identified ...")
    try:
        redshift.delete_cluster(
            ClusterIdentifier=DWH_CLUSTER_IDENTIFIER,
            SkipFinalClusterSnapshot=True)
    except ClientError as e:
        print(f'ERROR: {e}')

    try:
        print("1.3 Cluster is being deleted ...")
        while redshift.describe_clusters(
                ClusterIdentifier=DWH_CLUSTER_IDENTIFIER)\
                ['Clusters'][0]['ClusterStatus'] == 'deleting':
            utils.animate()
    except:
        print("\r1.4 Cluster is deleted successfully ...")
    return None


def create_bucket(bucket_name):
    """ Create an Amazon S3 bucket

    :param bucket_name: Unique string name
    :return: True if bucket is created, else False
    """
    s3 = boto3.client('s3')
    try:
        s3.create_bucket(Bucket=bucket_name)
    except ClientError as e:
        print(f'ERROR: {e}')
        return False
    return True


def upload_bucket(bucket_name, key, output_name):
    """

    :param bucket_name: Your S3 BucketName
    :param key: Original Name and type of the file you want to upload
                into s3
    :param output_name: Output file name(The name you want to give to
                        the file after we upload to s3)
    :return:
    """
    s3 = boto3.client('s3')
    s3.upload_file(key, bucket_name, output_name)


def list_bucket(bucket_name, prefix):
    """

    :param bucket_name: Your S3 BucketName
    :param key: Original Name and type of the file you want to upload
                into s3
    :param output_name: Output file name(The name you want to give to
                        the file after we upload to s3)
    :return:
    """
    files = []
    s3 = boto3.resource('s3',
                        region_name="us-west-2",
                        aws_access_key_id=KEY,
                        aws_secret_access_key=SECRET
                        )
    bucket = s3.Bucket(bucket_name)
    for obj in bucket.objects.filter(Prefix=prefix):
        files.append(obj)
    return files


def s3_read(s3_path):
    """
    Read a file from an S3 source.

    Parameters
    ----------
    source : str
        Path starting with s3://, e.g. 's3://bucket-name/key/foo.bar'
    profile_name : str, optional
        AWS profile

    Returns
    -------
    content : bytes

    botocore.exceptions.NoCredentialsError
        Botocore is not able to find your credentials. Either specify
        profile_name or add the environment variables AWS_ACCESS_KEY_ID,
        AWS_SECRET_ACCESS_KEY and AWS_SESSION_TOKEN.
        See https://boto3.readthedocs.io/en/latest/guide/configuration.html
    """
    for line in open(s3_path, 'rb', encoding='utf-8'):
        print(line.decode('utf8'))


In [4]:
create_iam_role()

1.1 creating role
ERROR: An error occurred (EntityAlreadyExists) when calling the CreateRole operation: Role with name dwh-role already exists.
1.2 Attaching Policy
1.3 Get the IAM role ARN


'arn:aws:iam::764499268961:role/dwh-role'

In [7]:
roleArn = 'arn:aws:iam::764499268961:role/dwh-role'

In [8]:
create_redshift_cluster(roleArn)

1.1 Client is created ...
1.2 Cluster config is being created ...
1.3 Cluster is being created ...
1.4 Cluster is created successfully ...


'dwh-cluster.cgjrwscs7tjx.us-west-2.redshift.amazonaws.com'

In [11]:
from sql_queries import create_table_queries, drop_table_queries
from config import *
import psycopg2
import argparse


def create_database(cur, conn):
    """
    This function drops all the tables in the database
    :param cur:
    :param conn:
    :return:
    """
    cur.execute("CREATE SCHEMA IF NOT EXISTS {}".format(DWH_SCHEMA))
    conn.commit()
    cur.execute("SET search_path to {}".format(DWH_SCHEMA))
    conn.commit()
    return None


def drop_tables(cur, conn):
    """
    This function drops all the tables in the database
    :param cur:
    :param conn:
    :return:
    """
    cur.execute("SET search_path to {}".format(DWH_SCHEMA))
    conn.commit()
    for query in drop_table_queries:
        cur.execute(query)
        conn.commit()
    return None


def create_tables(cur, conn):
    """
    This function creates all the tables in the database
    :param cur:
    :param conn:
    :return:
    """
    cur.execute("SET search_path to {}".format(DWH_SCHEMA))
    conn.commit()
    for query in create_table_queries:
        cur.execute(query)
        conn.commit()
    return None

In [69]:
# DROP TABLES
# ----------------------------------------------------------------------
log_staging_table_drop = "DROP TABLE IF EXISTS log_staging CASCADE"
song_staging_table_drop = "DROP TABLE IF EXISTS song_staging CASCADE"
songplay_table_drop = "DROP TABLE IF EXISTS songplays CASCADE;"
user_table_drop = "DROP TABLE IF EXISTS users;"
song_table_drop = "DROP TABLE IF EXISTS songs;"
artist_table_drop = "DROP TABLE IF EXISTS artists;"
time_table_drop = "DROP TABLE IF EXISTS time;"

# CREATE TABLES
# ----------------------------------------------------------------------
log_staging_table_create = """
 CREATE TABLE IF NOT EXISTS log_staging (
    artist VARCHAR(50), 
    auth VARCHAR(50), 
    firstName VARCHAR(50), 
    gender VARCHAR(10), 
    iteminSession INTEGER, 
    lastName VARCHAR(50), 
    length NUMERIC, 
    level VARCHAR(10), 
    location VARCHAR(100), 
    method VARCHAR(10),
    page VARCHAR(50), 
    registration NUMERIC, 
    sessionId INTEGER, 
    song VARCHAR(50),
    status INTEGER,
    ts TIMESTAMP,
    userAgent VARCHAR(100),
    userId INTEGER);    
"""

song_staging_table_create = """
 CREATE TABLE IF NOT EXISTS song_staging (
    num_songs INTEGER, 
    artist_id VARCHAR(50), 
    artist_latitude NUMERIC, 
    artist_longitude NUMERIC, 
    artist_location VARCHAR(100), 
    artist_name VARCHAR(100), 
    song_id NUMERIC, 
    title VARCHAR(50), 
    duration NUMERIC, 
    year INTEGER);    
"""

# facts ----------------------------------------------------------------
songplay_table_create = """
 CREATE TABLE IF NOT EXISTS songplays (
    songplay_id INTEGER IDENTITY(0,1) PRIMARY KEY, 
    start_time TIMESTAMP NOT NULL REFERENCES time(start_time) sortkey, 
    user_id VARCHAR(50) NOT NULL REFERENCES users(user_id), 
    level VARCHAR(10) NOT NULL, 
    song_id VARCHAR(50) NOT NULL REFERENCES songs(song_id) distkey, 
    artist_id VARCHAR(50) NOT NULL REFERENCES artists(artist_id), 
    session_id INTEGER NOT NULL, 
    location VARCHAR(100) NOT NULL, 
    user_agent VARCHAR(50) NOT NULL);
"""

# dimensions -----------------------------------------------------------
user_table_create = """
 CREATE TABLE IF NOT EXISTS users (
    user_id VARCHAR(50) PRIMARY KEY sortkey, 
    first_name VARCHAR(50), 
    last_name VARCHAR(50), 
    gender VARCHAR(10), 
    level VARCHAR(10) NOT NULL)
    diststyle ALL;
"""

song_table_create = """
 CREATE TABLE IF NOT EXISTS songs (
    song_id VARCHAR(50) PRIMARY KEY distkey, 
    title VARCHAR(100) NOT NULL, 
    artist_id VARCHAR(50) NOT NULL, 
    year INTEGER NOT NULL,
    duration NUMERIC NOT NULL);
"""

artist_table_create = """
 CREATE TABLE IF NOT EXISTS artists (
    artist_id VARCHAR(50) PRIMARY KEY sortkey, 
    name VARCHAR(100) NOT NULL, 
    location VARCHAR NOT NULL, 
    latitude NUMERIC NOT NULL, 
    longitude NUMERIC NOT NULL)
    diststyle ALL;
"""

time_table_create = """
 CREATE TABLE IF NOT EXISTS time (
    start_time TIMESTAMP UNIQUE NOT NULL sortkey, 
    hour INTEGER NOT NULL, 
    day INTEGER NOT NULL, 
    week INTEGER NOT NULL, 
    month INTEGER NOT NULL, 
    year INTEGER NOT NULL, 
    week_day VARCHAR)
    diststyle ALL;
"""

# INSERT RECORDS
# ----------------------------------------------------------------------
songplay_table_insert = ("""
INSERT INTO songplays (start_time, user_id, level, song_id, artist_id,
 session_id, location, user_agent) 
 SELECT DISTINCT lgs.ts, 
                 lsg.userId, 
                 nvl(lgs.level, 'empty'), 
                 ssg.song_id, 
                 lsg.artistId,
                 lsg.sessionId, 
                 nvl(lgs.location, 'empty'), 
                 nvl(lgs.userAgent, 'empty')
 FROM log_staging lgs
 INNER JOIN song_staging ssg ON lgs.song = ssg.title
 WHERE lgs.page = 'NextSong';
""")

user_table_insert = ("""
INSERT INTO users (user_id, first_name, last_name, gender, level) 
  SELECT DISTINCT lgs.userId, 
                  nvl(lgs.firstName, 'empty'), 
                  nvl(lgs.lastName, 'empty'),  
                  nvl(lgs.gender, 'empty'),  
                  nvl(lgs.level, 'empty'), 
  FROM log_staging lgs
  WHERE lgs.userId IS NOT NULL;
""")

song_table_insert = ("""
INSERT INTO songs (song_id, title, artist_id, year, duration) 
 SELECT DISTINCT ssg.song_id, 
                 ssg.title, 
                 ssg.artist_id, 
                 ssg.year, 
                 nvl(ssg.duration, 0.0)
  FROM song_staging ssg
""")

artist_table_insert = ("""
INSERT INTO artists (artist_id, name, location, latitude, longitude) 
 SELECT DISTINCT ssg.artist_id, 
                 ssg.artist_name, 
                 nvl(ssg.artist_location, 'empty'), 
                 nvl(ssg.artist_latitude, 0.0), 
                 nvl(ssg.artist_longitude, 0.0)
 FROM song_staging ssg
 WHERE ssg.artist_id IS NOT NULL;
""")

time_table_insert = ("""
INSERT INTO time (start_time, hour, day, week, month, year, week_day)
 SELECT DISTINCT se.ts, 
                 DATE_PART(hour, se.ts) :: INTEGER, 
                 DATE_PART(day, se.ts) :: INTEGER, 
                 DATE_PART(week, se.ts) :: INTEGER,
                 DATE_PART(month, se.ts) :: INTEGER,
                 DATE_PART(year, se.ts) :: INTEGER,
                 DATE_PART(dow, se.ts) :: INTEGER
 FROM log_staging lsg
 WHERE lsg.page = 'NextSong';
""")

# FIND SONGS
# you'll need to get the song ID and artist ID by querying the songs
# and artists tables to find matches based on song title, artist name,
# and song duration time
song_select = ("""
SELECT s.song_id, s.artist_id FROM songs s
 JOIN artists a ON s.artist_id=a.artist_id
 WHERE s.title = %s AND a.name=%s AND s.duration=%s;
""")

# QUERY LISTS

create_table_queries = [log_staging_table_create,
                        song_staging_table_create,
                        user_table_create,
                        song_table_create,
                        artist_table_create,
                        time_table_create,
                        songplay_table_create]
insert_table_queries = [user_table_insert,
                        song_table_insert,
                        artist_table_insert,
                        time_table_insert,
                        songplay_table_insert]
drop_table_queries = [log_staging_table_drop,
                      song_staging_table_drop,
                      songplay_table_drop,
                      user_table_drop,
                      song_table_drop,
                      artist_table_drop,
                      time_table_drop]



In [70]:
DWH_ENDPOINT = 'dwh-cluster.cgjrwscs7tjx.us-west-2.redshift.amazonaws.com'

# create postgres connection
conn_string = "postgresql://{}:{}@{}:{}/{}".format(
                DWH_DB_USER,
                DWH_DB_PASSWORD,
                DWH_ENDPOINT,
                DWH_PORT,
                DWH_DB
)
conn = psycopg2.connect(conn_string)
cur = conn.cursor()
create_database(cur, conn)
drop_tables(cur, conn)
create_tables(cur, conn)

In [93]:
from config import *
import psycopg2
import argparse
from sql_queries import *


def process_data_staging(cur, conn, iam_role):
    """
    i/p: cursor, connection, filepath and ETL function
    returns: None
    This is a helper function for extracting, transforming and loading
    data onto the relational database
    """
    cur.execute("SET search_path to {}".format(DWH_SCHEMA))
    conn.commit()
    copy_log_command = """
                        copy {}.{} from '{}' credentials 'aws_iam_role={}' emptyasnull blanksasnull format as json '{}' region 'us-west-2' timeformat 'auto';
                        """.format(DWH_SCHEMA, DWH_LOG_STAGING_TABLE, S3_BUCKET_LOG_JSON_PATH, iam_role, LOG_JSON_FORMAT)
    print(copy_log_command)
    cur.execute(copy_log_command)
    conn.commit()
    cur.execute("SET search_path to {}".format(DWH_SCHEMA))
    conn.commit()
    copy_song_command = """
                        copy {}.{} from '{}' credentials 'aws_iam_role={}' emptyasnull blanksasnull json 'auto' region 'us-west-2' timeformat 'auto';
                            """.format(DWH_SCHEMA, DWH_SONG_STAGING_TABLE, S3_BUCKET_SONG_JSON_PATH,
                                       iam_role)
    cur.execute(copy_song_command)
    conn.commit()
    return None


def insert_data_into_tables(cur, conn):
    """
    This function creates all the tables in the database
    :param cur:
    :param conn:
    :return:
    """
    cur.execute("SET search_path to {}".format(DWH_SCHEMA))
    conn.commit()
    for query in insert_table_queries:
        cur.execute(query)
        conn.commit()
    return None

In [None]:
DWH_ENDPOINT = 'dwh-cluster.cgjrwscs7tjx.us-west-2.redshift.amazonaws.com'
iam_role = 'arn:aws:iam::764499268961:role/dwh-role'

# create postgres connection
conn_string = "postgresql://{}:{}@{}:{}/{}".format(
    DWH_DB_USER,
    DWH_DB_PASSWORD,
    DWH_ENDPOINT,
    DWH_PORT,
    DWH_DB
)
conn = psycopg2.connect(conn_string)
cur = conn.cursor()
copy_song_command = """
                    copy {}.{} from '{}' iam_role 'arn:aws:iam::764499268961:role/dwh-role' emptyasnull blanksasnull json 'auto' region 'us-west-2' timeformat 'auto';
                    commit;""".format(DWH_SCHEMA, DWH_SONG_STAGING_TABLE, S3_BUCKET_SONG_JSON_PATH)
print(copy_song_command)
cur.execute(copy_song_command)
conn.commit()
#process_data_staging(cur, conn, iam_role)
# insert_data_into_tables(conn, cur)

In [None]:
%%sql 
copy sparkify.song_staging 
from 's3://udacity-dend/song_data/' 
credentials 'aws_iam_role=arn:aws:iam::764499268961:role/dwh-role' 
emptyasnull 
blanksasnull 
json 'auto' 
region 'us-west-2' 
timeformat 'auto';

In [None]:
%%sql 
copy sparkify.log_staging
from 's3://udacity-dend/log_data/2018' 
credentials 'aws_iam_role=arn:aws:iam::764499268961:role/dwh-role'
emptyasnull
blanksasnull
format as json 's3://udacity-dend/log_json_path.json'
timeformat 'auto';

In [None]:
%sql SELECT * FROM stl_load_errors LIMIT 5


In [13]:
import sql_queries

In [None]:
import boto3
from config import *
import json
from botocore.exceptions import ClientError
import time

iam = boto3.client('iam', aws_access_key_id=KEY,
                       aws_secret_access_key=SECRET,
                       region_name='us-west-2'
                       )
try:
    iam.create_role(Path='/',
                    RoleName=DWH_IAM_ROLE_NAME,
                    Description="Allows Redshift clusters to call AWS services on your behalf.",
                    AssumeRolePolicyDocument=json.dumps(
                        {'Statement': [{'Action': 'sts:AssumeRole',
                          'Effect': 'Allow',
                          'Principal': {'Service': 'redshift.amazonaws.com'}}],
                         'Version': '2012-10-17'})
                    )

except ClientError as e:
    print(f'ERROR: {e}')

print("1.2 Attaching Policy")

iam.attach_role_policy(RoleName=DWH_IAM_ROLE_NAME,
                       PolicyArn="arn:aws:iam::aws:policy/AmazonS3ReadOnlyAccess"
                       )['ResponseMetadata']['HTTPStatusCode']
print("1.3 Get the IAM role ARN")
roleArn = iam.get_role(RoleName=DWH_IAM_ROLE_NAME)['Role']['Arn']
print(roleArn)

In [64]:
print("1.1 Client is created ...")
redshift = boto3.client('redshift',
                        region_name="us-west-2",
                        aws_access_key_id=KEY,
                        aws_secret_access_key=SECRET
                        )
try:
    print("1.2 Cluster config is being created ...")
    redshift.create_cluster(
        # HW
        ClusterType=DWH_CLUSTER_TYPE,
        NodeType=DWH_NODE_TYPE,
        NumberOfNodes=int(DWH_NUM_NODES),

        # Identifiers & Credentials
        DBName=DWH_DB,
        ClusterIdentifier=DWH_CLUSTER_IDENTIFIER,
        MasterUsername=DWH_DB_USER,
        MasterUserPassword=DWH_DB_PASSWORD,

        # Roles (for s3 access)
        IamRoles=[roleArn])
except ClientError as e:
    print(f'ERROR: {e}')

print("1.3 Cluster is being created ...")
while redshift.describe_clusters(
        ClusterIdentifier=DWH_CLUSTER_IDENTIFIER)\
        ['Clusters'][0]['ClusterStatus'] != 'available':
    utils.animate()

print("\r1.4 Cluster is created successfully ...")

1.1 Client is created ...
1.2 Cluster config is being created ...
1.3 Cluster is being created ...
1.4 Cluster is created successfully ...


In [65]:
DWH_ENDPOINT = redshift.describe_clusters(
        ClusterIdentifier=DWH_CLUSTER_IDENTIFIER)\
    ['Clusters'][0]['Endpoint']['Address']

In [38]:
%load_ext sql

In [39]:
conn_string="postgresql://{}:{}@{}:{}/{}".format(DWH_DB_USER, DWH_DB_PASSWORD, DWH_ENDPOINT, DWH_PORT, DWH_DB)
print(conn_string)
%sql $conn_string

postgresql://dwhuser:Passw0rd@dwh-cluster.cgjrwscs7tjx.us-west-2.redshift.amazonaws.com:5439/dwh


'Connected: dwhuser@dwh'

In [56]:
%sql DROP TABLE IF EXISTS sparkify.log_staging

 * postgresql://dwhuser:***@dwh-cluster.cgjrwscs7tjx.us-west-2.redshift.amazonaws.com:5439/dwh
Done.


[]

In [63]:
%%sql 
CREATE TABLE IF NOT EXISTS sparkify.log_staging (
        artist VARCHAR, 
        auth VARCHAR, 
        firstName VARCHAR, 
        gender VARCHAR, 
        itemInSession VARCHAR, 
        lastName VARCHAR, 
        length VARCHAR, 
        level VARCHAR, 
        location VARCHAR, 
        method VARCHAR,
        page VARCHAR, 
        registration VARCHAR, 
        sessionId VARCHAR, 
        song VARCHAR,
        status VARCHAR,
        ts VARCHAR,
        userAgent VARCHAR,
        userId VARCHAR); 

 * postgresql://dwhuser:***@dwh-cluster.cgjrwscs7tjx.us-west-2.redshift.amazonaws.com:5439/dwh
Done.


[]

In [67]:
%%sql
DROP TABLE IF EXISTS sparkify.log_staging

 * postgresql://dwhuser:***@dwh-cluster.cgjrwscs7tjx.us-west-2.redshift.amazonaws.com:5439/dwh
Done.


[]

In [None]:
%%sql 
copy sparkify.log_staging
from 's3://udacity-dend/log_data/2018' 
credentials 'aws_iam_role=arn:aws:iam::764499268961:role/dwh-role'
emptyasnull
blanksasnull
json 'auto'
timeformat 'auto';

In [85]:
%%sql
SELECT * FROM sparkify.log_staging LIMIT 5

 * postgresql://dwhuser:***@dwh-cluster.cgjrwscs7tjx.us-west-2.redshift.amazonaws.com:5439/dwh
0 rows affected.


artist,auth,firstname,gender,iteminsession,lastname,length,level,location,method,page,registration,sessionid,song,status,ts,useragent,userid


In [None]:
%sql COPY sparkify.log_staging FROM 's3://udacity-dend/log_data/2018' iam_role 'aws_iam_role=arn:aws:iam::764499268961:role/dwh-role' region 'us-west-2' FORMAT AS JSON 's3://udacity-dend/log_json_path.json' timeformat 'epochmillisecs'

In [48]:
%%sql

 CREATE TABLE IF NOT EXISTS sparkify.log_staging (
    artist VARCHAR(50), 
    auth VARCHAR(50), 
    firstname VARCHAR(50), 
    gender VARCHAR(10), 
    iteminsession INTEGER, 
    lastname VARCHAR(50), 
    length NUMERIC, 
    level VARCHAR(10), 
    location VARCHAR(100), 
    method VARCHAR(10),
    page VARCHAR(50), 
    registration NUMERIC, 
    sessionid INTEGER, 
    song VARCHAR(50),
    status INTEGER,
    ts TIMESTAMP,
    useragent VARCHAR(100),
    userid INTEGER);    


 * postgresql://dwhuser:***@dwh-cluster.cgjrwscs7tjx.us-west-2.redshift.amazonaws.com:5439/dwh
Done.


[]

In [None]:
%sql SELECT * FROM log_staging LIMIT 5

In [None]:
%sql SELECT * FROM STL_LOAD_ERRORS ORDER BY starttime DESC LIMIT 100

In [108]:
print("1.1 Client is created ...")
redshift = boto3.client('redshift',
                        region_name="us-west-2",
                        aws_access_key_id=KEY,
                        aws_secret_access_key=SECRET
                        )
print("1.2 Cluster is identified ...")
try:
    redshift.delete_cluster(
        ClusterIdentifier=DWH_CLUSTER_IDENTIFIER,
        SkipFinalClusterSnapshot=True)
except ClientError as e:
    print(f'ERROR: {e}')

try:
    print("1.3 Cluster is being deleted ...")
    while redshift.describe_clusters(
            ClusterIdentifier=DWH_CLUSTER_IDENTIFIER)\
            ['Clusters'][0]['ClusterStatus'] == 'deleting':
        utils.animate()
except:
    print("\r1.4 Cluster is deleted successfully ...")

1.1 Client is created ...
1.2 Cluster is identified ...
1.3 Cluster is being deleted ...
1.4 Cluster is deleted successfully ...


In [None]:
iam.detach_role_policy(RoleName=DWH_IAM_ROLE_NAME, PolicyArn="arn:aws:iam::aws:policy/AmazonS3ReadOnlyAccess")
iam.delete_role(RoleName=DWH_IAM_ROLE_NAME)

In [105]:
%%sql
select t.table_name
from information_schema.tables t
where t.table_schema = 'sparkify'
order by t.table_name;

 * postgresql://dwhuser:***@dwh-cluster.cgjrwscs7tjx.us-west-2.redshift.amazonaws.com:5439/dwh
7 rows affected.


table_name
artists
log_staging
song_staging
songplays
songs
time
users
