### Delete Existing Resources

In [1]:
%%time
!python scripts/delete_resources.py

ERROR:root:Cluster 'dwhCluster' does not exist.
ERROR:root:Another error occurred: CusterProps must not be None. Skipping Ingress revocation.
ERROR:root:Cluster 'dwhCluster' does not exist. Skipping cluster deletion.
ERROR:root:IAM role 'dwhRole' does not exist. Skipping IAM role deletion.
CPU times: user 8.95 ms, sys: 11 ms, total: 20 ms
Wall time: 1.62 s


### Create Required Resources
This includes the IAM role, RedShift cluster, and ingress rules.

In [2]:
%%time
!python scripts/create_resources.py

INFO:root:IAM Role Created: ARN arn:aws:iam::736720720705:role/dwhRole
INFO:root:Cluster is creating...2.384185791015625e-07 seconds elapsed.
INFO:root:Cluster is creating...10.238889217376709 seconds elapsed.
INFO:root:Cluster is creating...20.997263431549072 seconds elapsed.
INFO:root:Cluster is creating...31.69753861427307 seconds elapsed.
INFO:root:Cluster is creating...42.36496829986572 seconds elapsed.
INFO:root:Cluster is creating...53.10138392448425 seconds elapsed.
INFO:root:Cluster is creating...63.818397998809814 seconds elapsed.
INFO:root:Cluster is creating...74.51712822914124 seconds elapsed.
CPU times: user 671 ms, sys: 239 ms, total: 911 ms
Wall time: 1min 29s


### Create Tables
Staging tables will first be created, followed by fact and dimension tables.

In [5]:
%%time
!python scripts/create_tables.py

CPU times: user 45 ms, sys: 24.5 ms, total: 69.6 ms
Wall time: 6.27 s


**ERD**

Outlined below is the entity relationship diagram for the generated tables:

![erd](images/erd.png)

The fact and dimension tables are arranged in a star schema, while the staging tables are isolated.

### ELT
Data is loaded from S3 into the staging tables, and the data is then transformed into the star schema.

In [7]:
%%time
!python scripts/etl.py load

INFO:root:Loading staging data.
INFO:root:Staging tables loaded.
CPU times: user 37.9 s, sys: 12.3 s, total: 50.2 s
Wall time: 1h 54min 11s


In [9]:
%%time
!python scripts/etl.py etl

INFO:root:Performing ETL.
INFO:root:ETL completed.
CPU times: user 37.6 ms, sys: 20.1 ms, total: 57.6 ms
Wall time: 5.45 s


### Sample Queries
Simple queries are run to gather information about the database, as well as simple analytics insights.

In [12]:
import psycopg2
import pandas as pd
from scripts.helpers import LoadConfig

In [13]:
# Load pararameters from dwh.cfg
config = LoadConfig(autoload=True)

# Update connection_params with your actual configuration
connection_params = {
    "dbname": config.get("CLUSTER", "DB_NAME"),
    "user": config.get("CLUSTER", "DB_USER"),
    "password": config.get("CLUSTER", "DB_PASSWORD"),
    "host": config.get("CLUSTER", "HOST"),
    "port": config.get("CLUSTER", "DB_PORT")
}

# Function to execute query and fetch results into a pandas dataframe
def execute_query(query, connection_params):
    """
    Executes a query using psycopg2.

    Parameters:
    - query (str): Query string.
    - conn: psycopg2 connection object.

    Returns:
    - DataFrame: pandas DataFrame continaing query results.
    """
    with psycopg2.connect(**connection_params) as conn:
        with conn.cursor() as cursor:
            cursor.execute(query)
            columns = [desc[0] for desc in cursor.description]
            data = cursor.fetchall()
    return pd.DataFrame(data, columns=columns)


**Table Row Counts**

A simple query which counts the number of rows of each table, and orders the results in descending order.

In [17]:
# Table row counts
query = """ 
select 'staging_events' as table_name, count(*) as row_count from staging_events
union select 'staging_songs' as table_name, count(*) as row_count from staging_songs
union select 'f_songplay' as table_name, count(*) as row_count from f_songplay
union select 'd_artist' as table_name, count(*) as row_count from d_artist
union select 'd_song' as table_name, count(*) as row_count from d_song
union select 'd_user' as table_name, count(*) as row_count from d_user
union select 'd_time' as table_name, count(*) as row_count from d_time
order by 2 desc;
"""

execute_query(query, connection_params)

Unnamed: 0,table_name,row_count
0,staging_songs,385252
1,d_song,384995
2,d_artist,45266
3,staging_events,8056
4,f_songplay,6964
5,d_time,6487
6,d_user,105


**Top Played Artists**

- Results indicate that Coldplay is the most played artist of all time (as far as the songplay data goes).
- Aggregations are performed quickly, even though aggregating over all the data.
- Caching is a potential factor here however, as the subquery was run and tested separately.

In [18]:
# Top played artists
# Note: we perform a subquery to return a single artist per artist_id
# There is a one-to-many relationship on the artist_id and name columns
# e.g. ABCD -> Some Artist; ABCD -> Some Artist; Some Backing Artist
query = """
select 
	da.name,
	count(fs.song_id) as song_play_count
from f_songplay fs
join (
	select
		artist_id,
		name,
		row_number() over (partition by artist_id order by name) as row_number
	from d_artist
) da on da.artist_id = fs.artist_id and da.row_number = 1
group by da.name
order by 2 desc
limit 10;
"""

execute_query(query, connection_params)

Unnamed: 0,name,song_play_count
0,Coldplay,58
1,Killers,57
2,Kings Of Leon,56
3,3 Drives On A Vinyl,55
4,John Mayer,47
5,Big Tuck : DJ Smallz,42
6,Ace Karaoke Productions,41
7,Kanye West,41
8,Jack Johnson,38
9,Dwight Yoakam,38


**Songs by Greatest Listen Time**

- Results indicate that Greece 2000 is the most played song by listen time.
- Query takes a bit longer than the previous query, again assumed to be due to differences in caching activities.
- Overall, given that aggregations are performed over all songplay data, the query is still quick to run in approximately 15 seconds.

In [19]:
# Top played songs by listen time
# We perform a similar window function on the d_artist table here
query = """
select
	ds.title as song_title,
	da.name as artist_name,
	sum(ds.duration) as song_play_time
from f_songplay fs
join d_song ds on ds.song_id = fs.song_id
join (
	select
		artist_id,
		name,
		row_number() over (partition by artist_id order by name) as row_number
	from d_artist
) da on da.artist_id = fs.artist_id and da.row_number = 1
group by 1,2
order by 3 desc
limit 10;
"""

execute_query(query, connection_params)

Unnamed: 0,song_title,artist_name,song_play_time
0,Greece 2000,3 Drives On A Vinyl,24762.8812
1,Sehr kosmisch,Harmonia,13771.32771
2,You're The One,Dwight Yoakam,8854.3701
3,Stronger,Kanye West,8737.26728
4,What Goes Around...Comes Around,Justin Timberlake,6554.84754
5,Yellow,Coldplay,6440.21772
6,Revelry,Kings Of Leon,5448.47742
7,Bring Me To Life,Evanescence,5299.00763
8,Horn Concerto No. 4 in E flat K495: II. Romanc...,Barry Tuckwell/Academy of St Martin-in-the-Fie...,5266.01587
9,Just Dance,Lady GaGa,5222.5238


### Cleanup

Now that we have confirmed that the scripts run, and queries can be executed in a timely manner, the resources will now be deleted. This will prevent excessing costs.

In [20]:
%%time
!python scripts/delete_resources.py

CPU times: user 625 ms, sys: 122 ms, total: 747 ms
Wall time: 1min 19s
