In [1]:
import pyspark
from pyspark import SparkConf
from pyspark.sql import SparkSession
import pandas as pd

In [2]:
spark = SparkSession \
        .builder \
        .master("local[*]") \
        .appName('etl') \
        .getOrCreate()

In [66]:
log_path = 'data/log-data/'
song_path = 'data/song_data/*/*/*/*.json'

In [67]:
log_data = spark.read.json(log_path)

In [68]:
log_data.printSchema()

root
 |-- artist: string (nullable = true)
 |-- auth: string (nullable = true)
 |-- firstName: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- itemInSession: long (nullable = true)
 |-- lastName: string (nullable = true)
 |-- length: double (nullable = true)
 |-- level: string (nullable = true)
 |-- location: string (nullable = true)
 |-- method: string (nullable = true)
 |-- page: string (nullable = true)
 |-- registration: double (nullable = true)
 |-- sessionId: long (nullable = true)
 |-- song: string (nullable = true)
 |-- status: long (nullable = true)
 |-- ts: long (nullable = true)
 |-- userAgent: string (nullable = true)
 |-- userId: string (nullable = true)



In [69]:
log_data.describe().show(truncate=10)

+-------+----------+----------+---------+------+-------------+--------+----------+-----+----------+------+-------+------------+----------+----------+----------+----------+----------+----------+
|summary|    artist|      auth|firstName|gender|itemInSession|lastName|    length|level|  location|method|   page|registration| sessionId|      song|    status|        ts| userAgent|    userId|
+-------+----------+----------+---------+------+-------------+--------+----------+-----+----------+------+-------+------------+----------+----------+----------+----------+----------+----------+
|  count|      6820|      8056|     7770|  7770|         8056|    7770|      6820| 8056|      7770|  8056|   8056|        7770|      8056|      6820|      8056|      8056|      7770|      8056|
|   mean|     266.5|      null|     null|  null|   21.1988...|    null|247.032...| null|      null|  null|   null|  1.54077...|598.167...|1388.36...|202.897...|1.54248...|      null|54.4639...|
| stddev|109.002...|      null

In [70]:
log_data.count()

8056

In [71]:
#Check for blank values
from pyspark.sql.functions import col

# List of all df columns
columns = log_data.columns
null = []
# Loop
for column in columns:
    # Filter 
    count = log_data.filter(col(column) == '').count()
    null.append(count)

pd.DataFrame({'column': columns, 'nulls': null})

Unnamed: 0,column,nulls
0,artist,0
1,auth,0
2,firstName,0
3,gender,0
4,itemInSession,0
5,lastName,0
6,length,0
7,level,0
8,location,0
9,method,0


In [130]:
log_data.show(5, truncate=10)

+----------+---------+---------+------+-------------+--------+---------+-----+----------+------+--------+------------+---------+----------+------+----------+----------+------+----------+
|    artist|     auth|firstName|gender|itemInSession|lastName|   length|level|  location|method|    page|registration|sessionId|      song|status|        ts| userAgent|userId|start_time|
+----------+---------+---------+------+-------------+--------+---------+-----+----------+------+--------+------------+---------+----------+------+----------+----------+------+----------+
|  Harmonia|Logged In|     Ryan|     M|            0|   Smith|655.77751| free|San Jos...|   PUT|NextSong|  1.54101...|      583|Sehr ko...|   200|1542241...|"Mozill...|    26|2018-11...|
|The Pro...|Logged In|     Ryan|     M|            1|   Smith|260.07465| free|San Jos...|   PUT|NextSong|  1.54101...|      583|The Big...|   200|1542242...|"Mozill...|    26|2018-11...|
|     Train|Logged In|     Ryan|     M|            2|   Smith|205

In [50]:
song_data = spark.read.json(song_path)

In [51]:
song_data.describe().show()

+-------+------------------+------------------+---------------+------------------+-----------+------------------+---------+------------------+--------------------+-----------------+
|summary|         artist_id|   artist_latitude|artist_location|  artist_longitude|artist_name|          duration|num_songs|           song_id|               title|             year|
+-------+------------------+------------------+---------------+------------------+-----------+------------------+---------+------------------+--------------------+-----------------+
|  count|                71|                31|             71|                31|         71|                71|       71|                71|                  71|               71|
|   mean|              null| 36.55297161290323|           null|-73.25123258064517|       null|239.72967605633804|      1.0|              null|                null|785.9577464788732|
| stddev|              null|12.431023413063542|           null| 36.05807592882608|       n

In [126]:
song_data.show(5)

+------------------+---------------+-----------------+----------------+--------------------+---------+---------+------------------+--------------------+----+
|         artist_id|artist_latitude|  artist_location|artist_longitude|         artist_name| duration|num_songs|           song_id|               title|year|
+------------------+---------------+-----------------+----------------+--------------------+---------+---------+------------------+--------------------+----+
|ARDR4AC1187FB371A1|           null|                 |            null|Montserrat Caball...|511.16363|        1|SOBAYLL12A8C138AF9|Sono andati? Fing...|   0|
|AREBBGV1187FB523D2|           null|      Houston, TX|            null|Mike Jones (Featu...|173.66159|        1|SOOLYAZ12A6701F4A6|Laws Patrolling (...|   0|
|ARMAC4T1187FB3FA4C|       40.82624|Morris Plains, NJ|       -74.47995|The Dillinger Esc...|207.77751|        1|SOBBUGU12A8C13E95D|Setting Fire to S...|2004|
|ARPBNLO1187FB3D52F|       40.71455|     New York, N

### Dimension Tables

#### USERS:
- user_id, first_name, last_name, gender, level

In [73]:
from pyspark.sql.types import *

users = log_data.select(col("userId").cast(IntegerType()).alias("user_id"),
                        col("firstName").alias("first_name"),
                        col("lastName").alias("last_name"),
                        col("gender"),
                        col("level")
                       ).where(col("user_id").isNotNull()).dropDuplicates(["user_id"])

In [74]:
users.printSchema()

root
 |-- user_id: integer (nullable = true)
 |-- first_name: string (nullable = true)
 |-- last_name: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- level: string (nullable = true)



In [75]:
users.show(5)

+-------+----------+---------+------+-----+
|user_id|first_name|last_name|gender|level|
+-------+----------+---------+------+-----+
|      2|   Jizelle| Benjamin|     F| free|
|      3|     Isaac|   Valdez|     M| free|
|      4|    Alivia|  Terrell|     F| free|
|      5|    Elijah|    Davis|     M| free|
|      6|   Cecilia|    Owens|     F| free|
+-------+----------+---------+------+-----+
only showing top 5 rows



#### SONGS:
- song_id, title, artist_id, year, duration

In [101]:
songs = song_data.selectExpr('song_id',
                             'title as song_title',
                             'artist_id',
                             'year',
                             'duration').distinct()

In [102]:
songs.printSchema()

root
 |-- song_id: string (nullable = true)
 |-- song_title: string (nullable = true)
 |-- artist_id: string (nullable = true)
 |-- year: long (nullable = true)
 |-- duration: double (nullable = true)



In [103]:
songs.show(5)

+------------------+--------------------+------------------+----+---------+
|           song_id|          song_title|         artist_id|year| duration|
+------------------+--------------------+------------------+----+---------+
|SOBBUGU12A8C13E95D|Setting Fire to S...|ARMAC4T1187FB3FA4C|2004|207.77751|
|SONYPOM12A8C13B2D7|I Think My Wife I...|ARDNS031187B9924F0|2005|186.48771|
|SOAOIBZ12AB01815BE|I Hold Your Hand ...|ARPBNLO1187FB3D52F|2000| 43.36281|
|SOOLYAZ12A6701F4A6|Laws Patrolling (...|AREBBGV1187FB523D2|   0|173.66159|
|SONWXQJ12A8C134D94|The Ballad Of Sle...|ARNF6401187FB57032|1994|  305.162|
+------------------+--------------------+------------------+----+---------+
only showing top 5 rows



#### ARTISTS:
- artist_id, name, location, latitude, longitude

In [104]:
artists = song_data.selectExpr('artist_id',
                              'artist_name as name',
                              'artist_location as location',
                              'artist_latitude as latitude',
                              'artist_longitude as longitude').distinct()

In [105]:
artists.printSchema()

root
 |-- artist_id: string (nullable = true)
 |-- name: string (nullable = true)
 |-- location: string (nullable = true)
 |-- latitude: double (nullable = true)
 |-- longitude: double (nullable = true)



In [106]:
artists.show(5)

+------------------+--------------------+--------------------+--------+---------+
|         artist_id|                name|            location|latitude|longitude|
+------------------+--------------------+--------------------+--------+---------+
|ARPBNLO1187FB3D52F|            Tiny Tim|        New York, NY|40.71455|-74.00712|
|ARMAC4T1187FB3FA4C|The Dillinger Esc...|   Morris Plains, NJ|40.82624|-74.47995|
|AREBBGV1187FB523D2|Mike Jones (Featu...|         Houston, TX|    null|     null|
|ARDR4AC1187FB371A1|Montserrat Caball...|                    |    null|     null|
|ARNF6401187FB57032|   Sophie B. Hawkins|New York, NY [Man...|40.79086|-73.96644|
+------------------+--------------------+--------------------+--------+---------+
only showing top 5 rows



#### TIME:
- start_time, hour, day, week, month, year, weekday

In [107]:
from pyspark.sql.functions import *

time = log_data.selectExpr(
    'CAST(ts/1000 AS TIMESTAMP) AS start_time',
    'date_format(from_unixtime(ts / 1000), "HH") as hour',
    'date_format(from_unixtime(ts / 1000), "dd") as day',
    'weekofyear(from_unixtime(ts / 1000)) as week',
    'month(from_unixtime(ts / 1000)) as month',
    'year(from_unixtime(ts / 1000)) as year',
    'dayofweek(from_unixtime(ts / 1000)) as weekday'
)

In [108]:
time.printSchema()

root
 |-- start_time: timestamp (nullable = true)
 |-- hour: string (nullable = true)
 |-- day: string (nullable = true)
 |-- week: integer (nullable = true)
 |-- month: integer (nullable = true)
 |-- year: integer (nullable = true)
 |-- weekday: integer (nullable = true)



In [109]:
time.show(5)

+--------------------+----+---+----+-----+----+-------+
|          start_time|hour|day|week|month|year|weekday|
+--------------------+----+---+----+-----+----+-------+
|2018-11-14 22:30:...|  22| 14|  46|   11|2018|      4|
|2018-11-14 22:41:...|  22| 14|  46|   11|2018|      4|
|2018-11-14 22:45:...|  22| 14|  46|   11|2018|      4|
|2018-11-15 01:44:...|  01| 15|  46|   11|2018|      5|
|2018-11-15 03:48:...|  03| 15|  46|   11|2018|      5|
+--------------------+----+---+----+-----+----+-------+
only showing top 5 rows



### Fact Table
#### SONGPLAYS
- songplay_id, start_time, user_id, level, song_id, artist_id, session_id, location, user_agent

In [131]:
log_data.show(5, truncate=10)

+----------+---------+---------+------+-------------+--------+---------+-----+----------+------+--------+------------+---------+----------+------+----------+----------+------+----------+
|    artist|     auth|firstName|gender|itemInSession|lastName|   length|level|  location|method|    page|registration|sessionId|      song|status|        ts| userAgent|userId|start_time|
+----------+---------+---------+------+-------------+--------+---------+-----+----------+------+--------+------------+---------+----------+------+----------+----------+------+----------+
|  Harmonia|Logged In|     Ryan|     M|            0|   Smith|655.77751| free|San Jos...|   PUT|NextSong|  1.54101...|      583|Sehr ko...|   200|1542241...|"Mozill...|    26|2018-11...|
|The Pro...|Logged In|     Ryan|     M|            1|   Smith|260.07465| free|San Jos...|   PUT|NextSong|  1.54101...|      583|The Big...|   200|1542242...|"Mozill...|    26|2018-11...|
|     Train|Logged In|     Ryan|     M|            2|   Smith|205

In [123]:
# Fact table contains records associated with song plays (page = NextSong)
log_data = log_data.filter(log_data.page == "NextSong")
log_data = log_data.withColumn("start_time",to_timestamp(from_unixtime(col("ts") / 1000)))

songplays = log_data.join(song_data,
                         (log_data.song == song_data.title) & (log_data.artist == song_data.artist_name) & (log_data.length == song_data.duration), how='inner') \
                        .select(monotonically_increasing_id().alias("songplay_id"), \
                                col("start_time"), \
                                col("userId").alias("user_id"), \
                                "level","song_id","artist_id", \
                                col("sessionId").alias("session_id"), \
                                "location", \
                                col("userAgent").alias("user_agent")).drop_duplicates()

songplays = songplays.withColumn("month",month(col("start_time"))).withColumn("year", year(col("start_time")))

In [132]:
songplays.show(5, truncate=10)

+-----------+----------+-------+-----+----------+----------+----------+----------+----------+-----+----+
|songplay_id|start_time|user_id|level|   song_id| artist_id|session_id|  location|user_agent|month|year|
+-----------+----------+-------+-----+----------+----------+----------+----------+----------+-----+----+
|          0|2018-11...|     15| paid|SOZCTXZ...|AR5KOSW...|       818|Chicago...|"Mozill...|   11|2018|
+-----------+----------+-------+-----+----------+----------+----------+----------+----------+-----+----+



### Write Parquet Files (Local)

In [113]:
output_path = 'output/'

In [125]:
users.write.parquet((output_path + "users") , mode="overwrite")
songs.write.parquet((output_path + "songs") , mode="overwrite",  partitionBy=["year", "artist_id"])
artists.write.parquet((output_path + "artists") , mode="overwrite")
time.write.parquet((output_path + "time") , mode="overwrite",  partitionBy=["year", "month"])
songplays.write.parquet((output_path + "songplays") , mode="overwrite",  partitionBy=["year", "month"])