# Create the sparkify redshift cluster #

This notebook will be used to:
1. setup the redshift cluster
2. Test whether the create_tables.py and etl.py ran successfully
3. Run analytical queries for testing/analysis to ensure the star-schema is correct
4. Check for any errors caused by etl.py




In [1]:
import configparser
import boto3
import psycopg2
from pprint import pprint


### Creating Redshift Cluster ###

In [2]:
config = configparser.ConfigParser()
config.read_file(open('dwh.cfg'))
KEY=config.get('AWS','KEY')
SECRET= config.get('AWS','SECRET')

ARN = config.get('IAM_ROLE','ARN')

DB_NAME = config.get('CLUSTER','DB_NAME')
DB_USER = config.get('CLUSTER','DB_USER')
DB_PWD = config.get('CLUSTER','DB_PASSWORD')
DB_PORT = config.get('CLUSTER','DB_PORT')
DB_HOST = config.get('CLUSTER','HOST')

NODE_TYPE = config.get('DWH','NODE_TYPE')
NODE_NUM = config.get('DWH','NUM_NODES')
CLUSTER_TYPE = config.get('DWH','CLUSTER_TYPE')
CLUSTER_ID = config.get('DWH','CLUSTER_ID')

In [3]:
redshift = boto3.client('redshift',aws_access_key_id=KEY,
                       aws_secret_access_key=SECRET ,region_name = 'us-west-2')

In [4]:
try:
    redshift.create_cluster(
    ClusterType = CLUSTER_TYPE, NodeType = NODE_TYPE, NumberOfNodes = int(NODE_NUM),
    DBName = DB_NAME, ClusterIdentifier = CLUSTER_ID, MasterUsername = DB_USER, MasterUserPassword = DB_PWD,
    IamRoles = [ARN])
except Exception as e:
    print(e)

Run below cell to ensure cluster status is "available". Once cluster is available then run create_table.py to create tables.

In [6]:
mycluster_status = redshift.describe_clusters(ClusterIdentifier=CLUSTER_ID)['Clusters'][0]['ClusterStatus']
mycluster_prop = redshift.describe_clusters(ClusterIdentifier=CLUSTER_ID)['Clusters'][0]

mycluster_status

'available'

### Configuring Security Groups to Ensure redshift ports can recieve the S3 Data ###

In [None]:
ec2 = boto3.resource('ec2',
                       region_name="us-west-2",
                       aws_access_key_id=KEY,
                       aws_secret_access_key=SECRET
                    )

In [None]:
vpc = ec2.Vpc(id=mycluster_prop['VpcId'])
#list(vpc.security_groups.all())[0]


In [None]:
try:
    vpc = ec2.Vpc(id=mycluster_prop['VpcId'])
    defaultSg = list(vpc.security_groups.all())[0]
    print(defaultSg)
    defaultSg.authorize_ingress(
        GroupName=defaultSg.group_name,
        CidrIp='0.0.0.0/0',
        IpProtocol='TCP',
        FromPort=int(DB_PORT),
        ToPort=int(DB_PORT)
    )
except Exception as e:
    print(e)

### Test whether create_table.py ran properly ###

In [7]:
#test whether tables were created correctly
conn = psycopg2.connect(host='sparkify-cluster.cx9rmslxxjof.us-west-2.redshift.amazonaws.com', dbname=DB_NAME, user=DB_USER, password=DB_PWD, port=DB_PORT)
cur = conn.cursor()
qry = """
select table_name,ordinal_position ,column_name,data_type
from information_schema.columns where table_name in 
('staging_events','staging_songs','songplays','users','song','artist','artist','time')
order by table_name,ordinal_position"""

cur.execute(qry)
pprint(cur.fetchall())
conn.close()

[('artist', 1, 'artist_id', 'character varying'),
 ('artist', 2, 'name', 'character varying'),
 ('artist', 3, 'location', 'character varying'),
 ('artist', 4, 'lattitude', 'numeric'),
 ('artist', 5, 'longitude', 'numeric'),
 ('song', 1, 'song_id', 'character varying'),
 ('song', 2, 'title', 'character varying'),
 ('song', 3, 'artist_id', 'character varying'),
 ('song', 4, 'year', 'integer'),
 ('song', 5, 'duration', 'numeric'),
 ('songplays', 1, 'songplay_id', 'integer'),
 ('songplays', 2, 'start_time', 'timestamp without time zone'),
 ('songplays', 3, 'user_id', 'character varying'),
 ('songplays', 4, 'level', 'character varying'),
 ('songplays', 5, 'song_id', 'character varying'),
 ('songplays', 6, 'artist_id', 'character varying'),
 ('songplays', 7, 'session_id', 'integer'),
 ('songplays', 8, 'location', 'character varying'),
 ('songplays', 9, 'user_agent', 'character varying'),
 ('staging_events', 1, 'artist', 'character varying'),
 ('staging_events', 2, 'auth', 'character varying'

Run etl.py

### Function to test tables inserts ###

In [8]:
def check_inserts(table_list):
    """Check row counts and sample 3 rows for table in a list of tables"""
    
    conn = psycopg2.connect(host='sparkify-cluster.cx9rmslxxjof.us-west-2.redshift.amazonaws.com', dbname=DB_NAME, user=DB_USER, password=DB_PWD, port=DB_PORT)
    cur = conn.cursor()
    
    try:
        for table in table_list:
            qry_sample = """
            select * from {}
            limit 3
            """.format(table)
            
            print(qry_sample)
        
            qry_cnt = """
            select count(*) from {}
            """.format(table)
            
            print(qry_cnt)
            
            cur.execute(qry_cnt)
            print("count of rows in {} : ".format(table),cur.fetchall())
            
            print("sample rows in {} : ".format(table))
            cur.execute(qry_sample)
            pprint(cur.fetchall())
            
            
            
    
    except Exception as e:
        print(e)
        print('closing connection')
        conn.close()
        
    conn.close()

### Testing Stage table inserts ###

In [9]:
stage_table_list = ['staging_events','staging_songs']
check_inserts(stage_table_list)


            select * from staging_events
            limit 3
            

            select count(*) from staging_events
            
count of rows in staging_events :  [(8056,)]
sample rows in staging_events : 
[(None,
  'Logged In',
  'Adler',
  'M',
  0,
  'Barrera',
  None,
  'free',
  'New York-Newark-Jersey City, NY-NJ-PA',
  'GET',
  'Home',
  Decimal('1540835983796'),
  248,
  None,
  200,
  1541470364796,
  '"Mozilla/5.0 (Macintosh; Intel Mac OS X 10_9_4) AppleWebKit/537.78.2 '
  '(KHTML, like Gecko) Version/7.0.6 Safari/537.78.2"',
  '100'),
 ('Gustavo Cerati',
  'Logged In',
  'Adler',
  'M',
  1,
  'Barrera',
  Decimal('249'),
  'free',
  'New York-Newark-Jersey City, NY-NJ-PA',
  'PUT',
  'NextSong',
  Decimal('1540835983796'),
  248,
  'Uno Entre 1000',
  200,
  1541470383796,
  '"Mozilla/5.0 (Macintosh; Intel Mac OS X 10_9_4) AppleWebKit/537.78.2 '
  '(KHTML, like Gecko) Version/7.0.6 Safari/537.78.2"',
  '100'),
 ('Limp Bizkit',
  'Logged In',
  'Adler',
  'M',
  2,


### Testing star-schema inserts ###

In [12]:
star_table_list = ['songplays','song','artist','users','time']
check_inserts(star_table_list)


            select * from songplays
            limit 3
            

            select count(*) from songplays
            
count of rows in songplays :  [(333,)]
sample rows in songplays : 
[(44,
  datetime.datetime(9999, 4, 7, 11, 49, 56),
  '54',
  'free',
  'SOTNHIP12AB0183131',
  'ARD46C811C8A414F3F',
  951,
  'Yuba City, CA',
  'Mozilla/5.0 (Windows NT 6.1; WOW64; rv:31.0) Gecko/20100101 Firefox/31.0'),
 (60,
  datetime.datetime(9999, 11, 12, 7, 9, 56),
  '12',
  'free',
  'SOARUPP12AB01842E0',
  'ARD46C811C8A414F3F',
  371,
  'New York-Newark-Jersey City, NY-NJ-PA',
  'Mozilla/5.0 (Windows NT 6.1; rv:31.0) Gecko/20100101 Firefox/31.0'),
 (140,
  datetime.datetime(9999, 7, 14, 13, 6, 36),
  '95',
  'paid',
  'SOQYHVZ12A6D4F93CF',
  'ARRZUPG11F43A69EF7',
  222,
  'Winston-Salem, NC',
  '"Mozilla/5.0 (iPhone; CPU iPhone OS 7_1_2 like Mac OS X) '
  'AppleWebKit/537.51.2 (KHTML, like Gecko) Version/7.0 Mobile/11D257 '
  'Safari/9537.53"')]

            select * from song
         

### Testing analytical queries ###

In [13]:
# Testing analytical queries 

# popular songs by location

user_most_songs = """ 
select sp.location,s.title ,
count(sp.session_id) as number_of_sessions
from songplays sp
join song s on s.song_id=sp.song_id
group by sp.location,s.title 
order by number_of_sessions desc
limit 5
"""

# Top 5 most popular artists

top_5_artists = """ 
select a.name ,
count(sp.session_id) as number_of_sessions
from songplays sp
join artist a on a.artist_id=sp.artist_id
group by a.name 
order by number_of_sessions desc
limit 5
"""

conn = psycopg2.connect(host='sparkify-cluster.cx9rmslxxjof.us-west-2.redshift.amazonaws.com', dbname=DB_NAME, user=DB_USER, password=DB_PWD, port=DB_PORT)
cur = conn.cursor()

print('popular songs by location')

cur.execute(user_most_songs)
pprint(cur.fetchall())

print('Top 5 most popular artists')

cur.execute(top_5_artists)
pprint(cur.fetchall())

conn.close()

popular songs by location
[('Lansing-East Lansing, MI', "You're The One", 5),
 ('San Francisco-Oakland-Hayward, CA', "You're The One", 4),
 ('San Francisco-Oakland-Hayward, CA', "I CAN'T GET STARTED", 4),
 ('Lake Havasu City-Kingman, AZ', "You're The One", 3),
 ('Chicago-Naperville-Elgin, IL-IN-WI', "You're The One", 3)]
Top 5 most popular artists
[('Dwight Yoakam', 37),
 ('Kid Cudi / Kanye West / Common', 10),
 ('Ron Carter', 9),
 ('Lonnie Gordon', 9),
 ('B.o.B', 8)]


### Run cell below if there are any issues when loading data ###

In [11]:
conn = psycopg2.connect(host='sparkify-cluster.cx9rmslxxjof.us-west-2.redshift.amazonaws.com', dbname=DB_NAME, user=DB_USER, password=DB_PWD, port=DB_PORT)
cur = conn.cursor()

qry_e = """SELECT * FROM stl_load_errors;  """

cur.execute(qry_e )
pprint(cur.fetchall())

print('\n')




conn.close()

[]




### Delete the Redshift CLuster ##

In [14]:
#delete cluster
try:
    redshift.delete_cluster( ClusterIdentifier=CLUSTER_ID,  SkipFinalClusterSnapshot=True)
    print('cluster deleted')
except Exception as e:
    print(e)

cluster deleted
