The purpose of this project is to implement an ETL pipeline to a cloud data warehouse for a fictitious music-streaming startup called Sparkify that is interested in understanding which songs users are listening to. Sparkify has song and event-log datasets stored on S3, at s3://udacity-dend/song_data
and s3://udacity-dend/log_data
respectively. This project implements an ETL pipeline using Python to copy each dataset from S3 to staging tables in Amazon Redshift then insert data into star schema tables from the staging tables. The star schema is used for analytical queries.
The files found in s3://udacity-dend/song_data
are a subset of real data from the Million Song Dataset. Each file is in JSON format and contains metadata about a song and the artist of that song. The files are partitioned by the first three letters of each song's track ID. For example, here are paths to two files in this dataset:
s3://udacity-dend/song_data/A/B/C/TRABCEI128F424C983.json
s3://udacity-dend/song_data/A/A/B/TRAABJL12903CDCF1A.json
Below is an example of what a single song file, TRAABJL12903CDCF1A.json, looks like:
{"num_songs": 1, "artist_id": "ARJIE2Y1187B994AB7", "artist_latitude": null, "artist_longitude": null, "artist_location": "", "artist_name": "Line Renaud", "song_id": "SOUPIRU12A6D4FA1E1", "title": "Der Kleine Dompfaff", "duration": 152.92036, "year": 0}
The second dataset found in s3://udacity-dend/log_data
consists of log files in JSON format generated by this event simulator based on the songs in the dataset above. These simulate activity logs from a music streaming app based on specified configurations.
The log files in this dataset are partitioned by year and month. For example, here are filepaths to two files in this dataset:
s3://udacity-dend/log_data/2018/11/2018-11-12-events.json
s3://udacity-dend/log_data/2018/11/2018-11-13-events.json
Below is an example of what the data in a log file, 2018-11-12-events.json, looks like:
The staging tables' columns directly correspond to those of the files in the datasets. No constraints are specified on any column except for distkey
on song title in both the song and event staging tables; this is done to optimize joins when inserting into the star schema.
The star schema is modeled as follows:
- songplays - records in log data associated with song plays i.e. records with page
NextSong
- songplay_id, start_time, user_id, level, song_id, artist_id, session_id, location, user_agent
-
users - users in the app
- user_id, first_name, ast_name, gender, level
-
songs - songs in music database
- song_id, title, artist_id, year, duration
-
artists - artists in music database
- artist_id, name, location, latitude, longitude
-
time - timestamps of records in songplays broken down into specific units
- start_time, hour, day, week, month, year, weekday
The first column in each table is a primary key. The songplays
table has foreign keys referencing name-matched primary key columns in each dimension table. The primary key of each dimension table is set as a sortkey
, and songplay_id
in the fact table is set as a distkey
due to its even distribution over its range (its data type is int IDENTITY(0,1)
). start_time
in the fact table is set as a sortkey
as well.
For more details on how this data is modeled as it pertains to analytical queries, see this project.
The ETL pipeline begins by extracting data from S3 and directly copying it into the staging tables in Redshift using the following commands:
COPY staging_events FROM `s3://udacity-dend/log_data` iam_role {ARN} json {jsonpaths_file};
COPY staging_songs FROM 's3://udacity-dend/song_data' iam_role {ARN} json 'auto';
Then, data is transformed from its state in the staging tables and inserted into the star schema using commands that include the following:
INSERT INTO songplays (start_time, user_id, level, song_id, artist_id, session_id, location, user_agent)
SELECT
timestamp 'epoch' + se.ts/1000 * interval '1 second',
se.userId,
se.level,
ss.song_id,
ss.artist_id,
se.sessionId,
se.location,
se.userAgent
FROM staging_events se
JOIN staging_songs ss
ON se.song=ss.title
WHERE se.page='NextSong';
INSERT INTO users (user_id, first_name, last_name, gender, level)
SELECT DISTINCT
userId,
firstName,
lastName,
gender,
level
FROM staging_events
WHERE page='NextSong' AND userId NOT IN (SELECT DISTINCT user_id FROM users);
The SELECT DISTINCT
clauses are used to avoid duplicate entries in some of the star schema tables as Redshift does not have an ON CONFLICT
clause like PostgreSQL.
All SQL queries for the entire project are centralized in the sql_queries.py
file for purposes of modularity and ease of maintenance. This file includes queries to drop tables, create tables, and insert records into tables using data from both S3 and other tables.
Before executing the ETL, the create_tables.py
script must be run (use the python3 create_tables.py
terminal command to run the script). This script drops all tables in Redshift (if they exist) and then creates the staging tables and the tables in the star schema. Once the tables have been created, the etl.py
script can be run (with the python3 etl.py
terminal command), which extracts, transforms, then loads all the data from S3 into the staging tables and then from the staging tables to the star schema tables. The dwh.cfg
file stores the necessary Redshift credentials and paths to the data in S3.
Drop Table:
DROP TABLE IF EXISTS songplays CASCADE;
Create Table:
CREATE TABLE IF NOT EXISTS songplays
(
songplay_id int IDENTITY(0,1) PRIMARY KEY distkey,
start_time timestamp NOT NULL sortkey,
user_id varchar NOT NULL,
level varchar,
song_id varchar NOT NULL,
artist_id varchar NOT NULL,
session_id int,
location varchar,
user_agent varchar
);