In [1]:
import configparser
from datetime import datetime
import os
from pyspark import SparkConf, SparkContext
from pyspark.sql import SparkSession
from pyspark.sql.functions import udf, col, monotonically_increasing_id
from pyspark.sql.functions import year, month, dayofmonth, hour, weekofyear, date_format, dayofweek, to_timestamp
from pyspark.sql.types import *

In [2]:
# Get config information for Local/Cloud development

config = configparser.ConfigParser()
config.read('dl.cfg')

['dl.cfg']

In [3]:
# Get Credentials for AWS Cloud development
os.environ['AWS_ACCESS_KEY_ID']=config.get('AWS','AWS_ACCESS_KEY_ID')
os.environ['AWS_SECRET_ACCESS_KEY']=config.get('AWS','AWS_SECRET_ACCESS_KEY')

In [None]:
# Get S3 path when working on AWS - Dont use for local developement

input_data = config.get('S3','INPUT_DATA')
output_data = config.get('S3','OUTPUT_DATA')

In [4]:
# get local path for Input/output data - Don't use for AWS developement

input_data = config.get('LOCAL','INPUT_DATA') 
output_data = config.get('LOCAL','OUTPUT_DATA')

In [5]:
#define a function o create a spark session

def create_spark_session():
    '''
    Initializes global spark session object with relevant packages and AWS configs
    '''
    spark = SparkSession \
        .builder \
        .config("spark.jars.packages", "org.apache.hadoop:hadoop-aws:2.7.0") \
        .getOrCreate()
    return spark

In [6]:
# create spark session
spark = create_spark_session()

# Schema for Song Play Analysis

### Dataset - Raw JSON data structures

* **log_data**: log_data contains data about what users have done (columns: event_id, artist, auth, firstName, gender, itemInSession, lastName, length, level, location, method, page, registration, sessionId, song, status, ts, userAgent, userId)
* **song_data**: song_data contains data about songs and artists (columns: num_songs, artist_id, artist_latitude, artist_longitude, artist_location, artist_name, song_id, title, duration, year)


Using the song and log datasets, we will create a star schema optimized for queries on song play analysis. This includes the following tables.

### Fact Table

* **songplays**: song play data together with user, artist, and song info (songplay_id, start_time, user_id, level, song_id, artist_id, session_id, location, user_agent)

### Dimension Tables

* **users**: user info (columns: user_id, first_name, last_name, gender, level)
* **songs**: song info (columns: song_id, title, artist_id, year, duration)
* **artists**: artist info (columns: artist_id, name, location, latitude, longitude)
* **time**: detailed time info about song plays (columns: start_time, hour, day, week, month, year, weekday)

# 1. Process Song Data : Define **songs_table** and **artists_table** from **songs_data**
## load song_data

In [7]:
# get filepath to song data file
song_data = input_data + "song_data/*/*/*/"

In [8]:
# read song data file
df = spark.read.json(song_data)

In [9]:
# print df schema
df.printSchema()

root
 |-- artist_id: string (nullable = true)
 |-- artist_latitude: double (nullable = true)
 |-- artist_location: string (nullable = true)
 |-- artist_longitude: double (nullable = true)
 |-- artist_name: string (nullable = true)
 |-- duration: double (nullable = true)
 |-- num_songs: long (nullable = true)
 |-- song_id: string (nullable = true)
 |-- title: string (nullable = true)
 |-- year: long (nullable = true)



In [10]:
# view firts data in df
df.head(5)

[Row(artist_id='ARDR4AC1187FB371A1', artist_latitude=None, artist_location='', artist_longitude=None, artist_name='Montserrat Caballé;Placido Domingo;Vicente Sardinero;Judith Blegen;Sherrill Milnes;Georg Solti', duration=511.16363, num_songs=1, song_id='SOBAYLL12A8C138AF9', title='Sono andati? Fingevo di dormire', year=0),
 Row(artist_id='AREBBGV1187FB523D2', artist_latitude=None, artist_location='Houston, TX', artist_longitude=None, artist_name="Mike Jones (Featuring CJ_ Mello & Lil' Bran)", duration=173.66159, num_songs=1, song_id='SOOLYAZ12A6701F4A6', title='Laws Patrolling (Album Version)', year=0),
 Row(artist_id='ARMAC4T1187FB3FA4C', artist_latitude=40.82624, artist_location='Morris Plains, NJ', artist_longitude=-74.47995, artist_name='The Dillinger Escape Plan', duration=207.77751, num_songs=1, song_id='SOBBUGU12A8C13E95D', title='Setting Fire to Sleeping Giants', year=2004),
 Row(artist_id='ARPBNLO1187FB3D52F', artist_latitude=40.71455, artist_location='New York, NY', artist_lo

In [11]:
# describe dataset
df.describe().show()

+-------+------------------+------------------+---------------+------------------+-----------+------------------+---------+------------------+--------------------+-----------------+
|summary|         artist_id|   artist_latitude|artist_location|  artist_longitude|artist_name|          duration|num_songs|           song_id|               title|             year|
+-------+------------------+------------------+---------------+------------------+-----------+------------------+---------+------------------+--------------------+-----------------+
|  count|                71|                31|             71|                31|         71|                71|       71|                71|                  71|               71|
|   mean|              null| 36.55297161290323|           null|-73.25123258064517|       null|239.72967605633804|      1.0|              null|                null|785.9577464788732|
| stddev|              null|12.431023413063544|           null| 36.05807592882607|       n

## 1.2 Define **songs** table
 SQL Query to define songs table
```sql
song_table_insert = ("""
INSERT INTO dim_song(song_id, title, artist_id, year, duration)
SELECT DISTINCT song_id as song_id,
                title as title,
                artist_id as artist_id,
                year as year,
                duration as duration
FROM staging_songs
WHERE song_id IS NOT NULL;
""")
```

In [12]:
# Songs table using Spark SQL API

df.createOrReplaceTempView("staging_songs")
songs_table = spark.sql("""
SELECT DISTINCT song_id as song_id,
                title as title,
                artist_id as artist_id,
                year as year,
                duration as duration
FROM staging_songs
WHERE song_id IS NOT NULL
""")

In [13]:
songs_table.printSchema()
songs_table.describe().show()

root
 |-- song_id: string (nullable = true)
 |-- title: string (nullable = true)
 |-- artist_id: string (nullable = true)
 |-- year: long (nullable = true)
 |-- duration: double (nullable = true)

+-------+------------------+--------------------+------------------+-----------------+------------------+
|summary|           song_id|               title|         artist_id|             year|          duration|
+-------+------------------+--------------------+------------------+-----------------+------------------+
|  count|                71|                  71|                71|               71|                71|
|   mean|              null|                null|              null|785.9577464788732|239.72967605633815|
| stddev|              null|                null|              null| 980.957119153384|106.56277912134071|
|    min|SOAOIBZ12AB01815BE|A Higher Place (A...|AR051KA1187B98B2FF|                0|          29.54404|
|    max|SOZVMJI12AB01808AF|   ¿Dónde va Chichi?|ARYKCQI1187F

In [14]:
# extract columns to create songs table using Spark Dataframe API

songs_table = df.select(["song_id", "title", "artist_id", "year", "duration"]) \
                .dropna(how = "any", subset = ["song_id", "artist_id"]) \
                .where(df["song_id"].isNotNull()) \
                .dropDuplicates()

In [15]:
songs_table.printSchema()
songs_table.describe().show()

root
 |-- song_id: string (nullable = true)
 |-- title: string (nullable = true)
 |-- artist_id: string (nullable = true)
 |-- year: long (nullable = true)
 |-- duration: double (nullable = true)

+-------+------------------+--------------------+------------------+-----------------+------------------+
|summary|           song_id|               title|         artist_id|             year|          duration|
+-------+------------------+--------------------+------------------+-----------------+------------------+
|  count|                71|                  71|                71|               71|                71|
|   mean|              null|                null|              null|785.9577464788732|239.72967605633815|
| stddev|              null|                null|              null| 980.957119153384|106.56277912134071|
|    min|SOAOIBZ12AB01815BE|A Higher Place (A...|AR051KA1187B98B2FF|                0|          29.54404|
|    max|SOZVMJI12AB01808AF|   ¿Dónde va Chichi?|ARYKCQI1187F

In [16]:
# write songs table to parquet files partitioned by year and artist
songs_table.write.mode("overwrite") \
                .partitionBy("year", "artist_id") \
                .parquet((output_data + "songs_table.parquet"))

## 1.3 Define artists table

SQL Query to define artist table
```sql
artist_table_insert = ("""
INSERT INTO dim_artist(artist_id, name, location, latitude, longitude)
SELECT DISTINCT artist_id as artist_id,
                artist_name as name,
                artist_location as location,
                artist_latitude as latitude,
                artist_longitude as longitude
FROM staging_songs
where artist_id IS NOT NULL;
""")
```

In [17]:
# artist table using Spark SQL API

df.createOrReplaceTempView("staging_songs")
artists_table = spark.sql("""
SELECT DISTINCT artist_id as artist_id,
                artist_name as name,
                artist_location as location,
                artist_latitude as latitude,
                artist_longitude as longitude
FROM staging_songs
where artist_id IS NOT NULL
""")

In [18]:
artists_table.printSchema()
artists_table.describe().show()

root
 |-- artist_id: string (nullable = true)
 |-- name: string (nullable = true)
 |-- location: string (nullable = true)
 |-- latitude: double (nullable = true)
 |-- longitude: double (nullable = true)

+-------+------------------+---------+--------------+------------------+------------------+
|summary|         artist_id|     name|      location|          latitude|         longitude|
+-------+------------------+---------+--------------+------------------+------------------+
|  count|                69|       69|            69|                31|                31|
|   mean|              null|     null|          null| 36.55297161290323|-73.25123258064517|
| stddev|              null|     null|          null|12.431023413063542| 36.05807592882608|
|    min|AR051KA1187B98B2FF|  40 Grit|              |           -13.442|        -122.42005|
|    max|ARYKCQI1187FB3B18F|lextrical|Zagreb Croatia|          56.27609|           15.9676|
+-------+------------------+---------+--------------+-------

In [19]:
# artist table using Spark Dataframe API

artists_table = df.selectExpr("artist_id", "artist_name as name", "artist_location as location", "artist_latitude as latitude", "artist_longitude as longitude") \
                .dropna(how = "any", subset = ["artist_id"]) \
                .where(df["artist_id"].isNotNull()) \
                .dropDuplicates()

In [20]:
artists_table.printSchema()
artists_table.describe().show()

root
 |-- artist_id: string (nullable = true)
 |-- name: string (nullable = true)
 |-- location: string (nullable = true)
 |-- latitude: double (nullable = true)
 |-- longitude: double (nullable = true)

+-------+------------------+---------+--------------+------------------+------------------+
|summary|         artist_id|     name|      location|          latitude|         longitude|
+-------+------------------+---------+--------------+------------------+------------------+
|  count|                69|       69|            69|                31|                31|
|   mean|              null|     null|          null| 36.55297161290323|-73.25123258064515|
| stddev|              null|     null|          null|12.431023413063542| 36.05807592882608|
|    min|AR051KA1187B98B2FF|  40 Grit|              |           -13.442|        -122.42005|
|    max|ARYKCQI1187FB3B18F|lextrical|Zagreb Croatia|          56.27609|           15.9676|
+-------+------------------+---------+--------------+-------

In [21]:
# write artists table to parquet files
artists_table.write.mode("overwrite") \
                .parquet((output_data + "artists_table.parquet"))

# 2. Process Log Data : Define **songs_table** and **artists_table** from **songs_data**

In [22]:
# get filepath to log data file
log_data = input_data + "log_data/*/*/*.json"

In [23]:
# read log data file
df = spark.read.json(log_data)

In [24]:
df.printSchema()
df.describe().show()

root
 |-- artist: string (nullable = true)
 |-- auth: string (nullable = true)
 |-- firstName: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- itemInSession: long (nullable = true)
 |-- lastName: string (nullable = true)
 |-- length: double (nullable = true)
 |-- level: string (nullable = true)
 |-- location: string (nullable = true)
 |-- method: string (nullable = true)
 |-- page: string (nullable = true)
 |-- registration: double (nullable = true)
 |-- sessionId: long (nullable = true)
 |-- song: string (nullable = true)
 |-- status: long (nullable = true)
 |-- ts: long (nullable = true)
 |-- userAgent: string (nullable = true)
 |-- userId: string (nullable = true)

+-------+------------------+----------+---------+------+------------------+--------+------------------+-----+--------------------+------+-------+--------------------+------------------+--------------------+-----------------+--------------------+--------------------+-----------------+
|summary|         

In [25]:
# filter by actions for song plays
df = df.filter(df.page == "NextSong")
df.describe().show()

+-------+------------------+---------+---------+------+------------------+--------+------------------+-----+--------------------+------+--------+--------------------+-----------------+--------------------+------+--------------------+--------------------+------------------+
|summary|            artist|     auth|firstName|gender|     itemInSession|lastName|            length|level|            location|method|    page|        registration|        sessionId|                song|status|                  ts|           userAgent|            userId|
+-------+------------------+---------+---------+------+------------------+--------+------------------+-----+--------------------+------+--------+--------------------+-----------------+--------------------+------+--------------------+--------------------+------------------+
|  count|              6820|     6820|     6820|  6820|              6820|    6820|              6820| 6820|                6820|  6820|    6820|                6820|            

## 2.1 define users_table

SQL Query to define users table
```sql
user_table_insert = ("""
INSERT INTO dim_user(user_id, first_name, last_name, gender, level)
SELECT DISTINCT userId as user_id,
                firstName as first_name,
                lastName as last_name,
                gender as gender,
                level as level
FROM staging_events
where userId IS NOT NULL;
""")
```

In [26]:
# extract columns for users table using Spark SQL API

df.createOrReplaceTempView("staging_events")
users_table = spark.sql("""
    SELECT DISTINCT userId as user_id,
                    firstName as first_name,
                    lastName as last_name,
                    gender as gender,
                    level as level
    FROM staging_events
    where userId IS NOT NULL
""")

In [27]:
users_table.printSchema()
users_table.describe().show()

root
 |-- user_id: string (nullable = true)
 |-- first_name: string (nullable = true)
 |-- last_name: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- level: string (nullable = true)

+-------+------------------+----------+---------+------+-----+
|summary|           user_id|first_name|last_name|gender|level|
+-------+------------------+----------+---------+------+-----+
|  count|               104|       104|      104|   104|  104|
|   mean| 51.50961538461539|      null|     null|  null| null|
| stddev|29.035296629035347|      null|     null|  null| null|
|    min|                10|    Adelyn| Arellano|     F| free|
|    max|                99|   Zachary|    Young|     M| paid|
+-------+------------------+----------+---------+------+-----+



In [28]:
# extract columns for users table using Spark Dataframe API

users_table = df.selectExpr("userId as user_id", "firstName as first_name", "lastName as last_name", "gender as gender", "level as level") \
                .dropna(how = "any", subset = ["user_id"]) \
                .where(df["userId"].isNotNull()) \
                .dropDuplicates()

In [29]:
users_table.printSchema()
users_table.describe().show()

root
 |-- user_id: string (nullable = true)
 |-- first_name: string (nullable = true)
 |-- last_name: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- level: string (nullable = true)

+-------+------------------+----------+---------+------+-----+
|summary|           user_id|first_name|last_name|gender|level|
+-------+------------------+----------+---------+------+-----+
|  count|               104|       104|      104|   104|  104|
|   mean| 51.50961538461539|      null|     null|  null| null|
| stddev|29.035296629035354|      null|     null|  null| null|
|    min|                10|    Adelyn| Arellano|     F| free|
|    max|                99|   Zachary|    Young|     M| paid|
+-------+------------------+----------+---------+------+-----+



In [30]:
# write users table to parquet files
users_table.write.mode("overwrite") \
                .parquet((output_data + "users_table.parquet"))

## 2.2 define timestamp_table

SQL Query to define users table
```sql
time_table_insert = ("""
INSERT INTO dim_time(start_time, hour, day, week, month, year, weekday)
SELECT distinct ts,
                EXTRACT(hour from ts),
                EXTRACT(day from ts),
                EXTRACT(week from ts),
                EXTRACT(month from ts),
                EXTRACT(year from ts),
                EXTRACT(weekday from ts)
FROM staging_events
WHERE ts IS NOT NULL;
""")
```

In [31]:
# create timestamp column from original timestamp column ts
get_timestamp = udf(lambda x:datetime.fromtimestamp(int(int(x)/1000)), TimestampType())

df = df.withColumn("start_time", get_timestamp(df.ts))

In [32]:
# extract columns to create time table using Spark SQL API
df.createOrReplaceTempView("staging_events")

time_table =spark.sql("""
                        SELECT
                            start_time,
                            hour(start_time) as hour,
                            dayofmonth(start_time) as day,
                            weekofyear(start_time) as week,
                            month(start_time) as month,
                            year(start_time) as year,
                            dayofweek(start_time) as weekday      
                        FROM
                            (
                            SELECT DISTINCT start_time
                            FROM staging_events as log
                            WHERE log.ts IS NOT NULL
                            )
                        ORDER BY start_time
                        """)

In [33]:
time_table.printSchema()
time_table.describe().show()
time_table.show(5)

root
 |-- start_time: timestamp (nullable = true)
 |-- hour: integer (nullable = true)
 |-- day: integer (nullable = true)
 |-- week: integer (nullable = true)
 |-- month: integer (nullable = true)
 |-- year: integer (nullable = true)
 |-- weekday: integer (nullable = true)

+-------+------------------+------------------+------------------+-----+------+-----------------+
|summary|              hour|               day|              week|month|  year|          weekday|
+-------+------------------+------------------+------------------+-----+------+-----------------+
|  count|              6813|              6813|              6813| 6813|  6813|             6813|
|   mean|13.476442095992955|17.253192426243945|46.380742697783646| 11.0|2018.0|4.181124321150741|
| stddev| 5.892004556825478|  8.10917362423492|1.1835604099384778|  0.0|   0.0|1.727828792226424|
|    min|                 0|                 1|                44|   11|  2018|                1|
|    max|                23|          

In [34]:
# extract columns to create time table using Spark DAtaframe API

time_table = (df.withColumn("hour", hour(df.start_time))
                .withColumn("day", dayofmonth(df.start_time))
                .withColumn("week", weekofyear(df.start_time))
                .withColumn("month", month(df.start_time))
                .withColumn("year", year(df.start_time))
                .withColumn("weekday", dayofweek(df.start_time))
                .select(["start_time", "hour", "day", "week", "month", "year", "weekday"])
                .dropDuplicates()
                .orderBy("start_time"))

In [35]:
time_table.printSchema()
time_table.describe().show()
time_table.show(5)

root
 |-- start_time: timestamp (nullable = true)
 |-- hour: integer (nullable = true)
 |-- day: integer (nullable = true)
 |-- week: integer (nullable = true)
 |-- month: integer (nullable = true)
 |-- year: integer (nullable = true)
 |-- weekday: integer (nullable = true)

+-------+------------------+------------------+------------------+-----+------+-----------------+
|summary|              hour|               day|              week|month|  year|          weekday|
+-------+------------------+------------------+------------------+-----+------+-----------------+
|  count|              6813|              6813|              6813| 6813|  6813|             6813|
|   mean|13.476442095992955|17.253192426243945|46.380742697783646| 11.0|2018.0|4.181124321150741|
| stddev| 5.892004556825478|  8.10917362423492|1.1835604099384778|  0.0|   0.0|1.727828792226424|
|    min|                 0|                 1|                44|   11|  2018|                1|
|    max|                23|          

In [36]:
# write time table to parquet files partitioned by year and month
time_table.write.mode("overwrite") \
                .partitionBy("year", "month") \
                .parquet((output_data + "time_table.parquet"))

## 2.3 define songplays_table

SQL Query to define users table
```sql
songplay_table_insert = ("""
INSERT INTO fact_songplay(start_time, user_id, level, song_id, artist_id, session_id, location, user_agent)
SELECT DISTINCT to_timestamp(to_char(se.ts, '9999-99-99 99:99:99'),'YYYY-MM-DD HH24:MI:SS'),
                se.userId as user_id,
                se.level as level,
                ss.song_id as song_id,
                ss.artist_id as artist_id,
                se.sessionId as session_id,
                se.location as location,
                se.userAgent as user_agent
FROM staging_events se
JOIN staging_songs ss ON se.song = ss.title AND se.artist = ss.artist_name;
""")
```

In [37]:
# read in song data to use for songplays table
song_df = spark.read.parquet(output_data + "songs_table.parquet")

In [38]:
song_df.printSchema()

root
 |-- song_id: string (nullable = true)
 |-- title: string (nullable = true)
 |-- duration: double (nullable = true)
 |-- year: integer (nullable = true)
 |-- artist_id: string (nullable = true)



In [48]:
# define temp table

df.createOrReplaceTempView("staging_events")
song_df.createOrReplaceTempView("staging_songs")

In [51]:
spark.sql("""
        SELECT *
        FROM staging_songs
""").printSchema()

root
 |-- song_id: string (nullable = true)
 |-- title: string (nullable = true)
 |-- duration: double (nullable = true)
 |-- year: integer (nullable = true)
 |-- artist_id: string (nullable = true)



In [52]:
spark.sql("""
        SELECT *
        FROM staging_events
""").printSchema()

root
 |-- artist: string (nullable = true)
 |-- auth: string (nullable = true)
 |-- firstName: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- itemInSession: long (nullable = true)
 |-- lastName: string (nullable = true)
 |-- length: double (nullable = true)
 |-- level: string (nullable = true)
 |-- location: string (nullable = true)
 |-- method: string (nullable = true)
 |-- page: string (nullable = true)
 |-- registration: double (nullable = true)
 |-- sessionId: long (nullable = true)
 |-- song: string (nullable = true)
 |-- status: long (nullable = true)
 |-- ts: long (nullable = true)
 |-- userAgent: string (nullable = true)
 |-- userId: string (nullable = true)
 |-- start_time: timestamp (nullable = true)



In [57]:
# define songplays using Spark Dataframe API
songplays_table = (df.join(song_df, df.song == song_df.title, how="left")
                     .withColumn("songplay_id", monotonically_increasing_id())
                     .withColumn("year", year(df.start_time))
                     .withColumn("month", month(df.start_time))
                     .selectExpr("songplay_id", "start_time", "year", "month", "userId as user_id", "level", 
                                 "song_id", "artist_id", "sessionId as session_id", "location", 
                                 "userAgent as user_agent"))


In [58]:
songplays_table.head()
songplays_table.describe().show()

+-------+--------------------+--------------------+-----+------------------+-----+------------------+------------------+-----------------+--------------------+--------------------+
|summary|         songplay_id|                year|month|           user_id|level|           song_id|         artist_id|       session_id|            location|          user_agent|
+-------+--------------------+--------------------+-----+------------------+-----+------------------+------------------+-----------------+--------------------+--------------------+
|  count|                6820|                6820| 6820|              6820| 6820|                 4|                 4|             6820|                6820|                6820|
|   mean|1.833988878894428...|              2018.0| 11.0| 54.68123167155425| null|              null|              null|599.1818181818181|                null|                null|
| stddev|1.590451413093264...|1.714303188838417...|  0.0|28.162734412152382| null|             

In [None]:
# write songplays table to parquet files partitioned by year and month

songplays_table.write.mode("overwrite") \
                    .partitionBy("year", "month") \
                    .parquet(output_data + "songplays_table.parquet")