In [35]:
# Lets try to run the ETL process locally, step by step

In [36]:
# Import
import os
import sys
import configparser
import boto3
from datetime import datetime
import os
import pyspark.sql as Spark
import pyspark.sql.functions as F
import pyspark.sql.types as T

In [37]:
# Creates a spark session, or retrieve a matching one if it exists
spark = Spark.SparkSession \
    .builder \
    .config("spark.jars.packages", "org.apache.hadoop:hadoop-aws:2.7.0") \
    .getOrCreate()

In [38]:
input_data = "./data/"
output_data = "./output/"

In [39]:
# First up, songs data

In [40]:
# read song data file
song_data = input_data + "song_data/*/*/*/*.json"
# Documentation: https://spark.apache.org/docs/latest/api/python/pyspark.sql.html#pyspark.sql.DataFrameReader
# columnNameOfCorruptRecord: allows renaming the new field having malformed string created by PERMISSIVE mode.
# (DEFAULT) PERMISSIVE: To keep corrupt records, an user can set a string type field named columnNameOfCorruptRecord in an user-defined schema
df = spark.read.json(song_data, columnNameOfCorruptRecord='corrupt_record').drop_duplicates()

In [41]:
# Lets display
df.show(n=5)

+------------------+---------------+--------------------+----------------+--------------------+---------+---------+------------------+--------------------+----+
|         artist_id|artist_latitude|     artist_location|artist_longitude|         artist_name| duration|num_songs|           song_id|               title|year|
+------------------+---------------+--------------------+----------------+--------------------+---------+---------+------------------+--------------------+----+
|ARPFHN61187FB575F6|       41.88415|         Chicago, IL|       -87.63241|         Lupe Fiasco|279.97995|        1|SOWQTQZ12A58A7B63E|Streets On Fire (...|   0|
|AR1Y2PT1187FB5B9CE|       27.94017|             Brandon|       -82.32547|         John Wesley|484.62322|        1|SOLLHMX12AB01846DC|   The Emperor Falls|   0|
|AR7G5I41187FB4CE6C|           null|     London, England|            null|            Adam Ant|233.40363|        1|SONHOTT12A8C13493C|     Something Girls|1982|
|AR10USD1187B99F3F1|           nul

In [42]:
# extract columns to create songs table
# Udacity instructions: 
# songs - songs in music database
# song_id, title, artist_id, year, duration
songs_table = df.select(df.song_id,
                        df.title,
                        df.artist_id,
                        df.year,
                        df.duration).drop_duplicates()

In [43]:
# Lets display
songs_table.show(n=5)

+------------------+--------------------+------------------+----+---------+
|           song_id|               title|         artist_id|year| duration|
+------------------+--------------------+------------------+----+---------+
|SOGOSOV12AF72A285E|   ¿Dónde va Chichi?|ARGUVEV1187B98BA17|1997|313.12934|
|SOTTDKS12AB018D69B|It Wont Be Christmas|ARMBR4Y1187B9990EB|   0|241.47546|
|SOBBUGU12A8C13E95D|Setting Fire to S...|ARMAC4T1187FB3FA4C|2004|207.77751|
|SOIAZJW12AB01853F1|          Pink World|AR8ZCNI1187B9A069B|1984|269.81832|
|SONYPOM12A8C13B2D7|I Think My Wife I...|ARDNS031187B9924F0|2005|186.48771|
+------------------+--------------------+------------------+----+---------+
only showing top 5 rows



In [44]:
# write songs table to parquet files partitioned by year and artist
songs_table.write.parquet(output_data + "songs/", 
                          mode="overwrite",
                          partitionBy=["year","artist_id"])

In [45]:
# extract columns to create artists table
# Udacity instructions: 
# artists - artists in music database
# artist_id, name, location, lattitude, longitude
artists_table = df.select(df.artist_id,
                          df.artist_name.alias("name"),
                          df.artist_location.alias("location"),
                          df.artist_latitude.alias("latitude"),
                          df.artist_longitude.alias("longitude")).drop_duplicates()

In [46]:
# Lets display
artists_table.show(n=5)

+------------------+---------------+---------------+--------+----------+
|         artist_id|           name|       location|latitude| longitude|
+------------------+---------------+---------------+--------+----------+
|ARPBNLO1187FB3D52F|       Tiny Tim|   New York, NY|40.71455| -74.00712|
|ARXR32B1187FB57099|            Gob|               |    null|      null|
|AROGWRA122988FEE45|Christos Dantis|               |    null|      null|
|AREVWGE1187B9B890A|     Bitter End|      Noci (BA)| -13.442|  -41.9952|
|ARBGXIG122988F409D|     Steel Rain|California - SF|37.77916|-122.42005|
+------------------+---------------+---------------+--------+----------+
only showing top 5 rows



In [47]:
# write artists table to parquet files
artists_table.write.parquet(output_data + "artists/", 
                            mode="overwrite")

In [48]:
# Next up, log data files

In [49]:
# get filepath to log data file
log_data = input_data + "log_data/*/*/*.json"
# read log data file
df = spark.read.json(log_data, 
                     columnNameOfCorruptRecord='corrupt_record').drop_duplicates()

In [50]:
# Lets display
df.show(n=5)

+-------------+---------+---------+------+-------------+--------+---------+-----+--------------------+------+--------+-----------------+---------+--------------------+------+-------------+--------------------+------+
|       artist|     auth|firstName|gender|itemInSession|lastName|   length|level|            location|method|    page|     registration|sessionId|                song|status|           ts|           userAgent|userId|
+-------------+---------+---------+------+-------------+--------+---------+-----+--------------------+------+--------+-----------------+---------+--------------------+------+-------------+--------------------+------+
|      Fat Joe|Logged In|     Kate|     F|           21| Harrell|241.34485| paid|Lansing-East Lans...|   PUT|NextSong|1.540472624796E12|      605|Safe 2 Say [The I...|   200|1542296032796|"Mozilla/5.0 (X11...|    97|
|  Linkin Park|Logged In|     Kate|     F|           33| Harrell|259.86567| paid|Lansing-East Lans...|   PUT|NextSong|1.540472624796

In [51]:
# filter by actions for song plays
df = df.filter(df.page == "NextSong")

In [52]:
# extract columns for users table
# Udacity instructions:
# users - users in the app
# user_id, first_name, last_name, gender, level
users_table = df.select(df.userId.alias("user_id"),
                        df.firstName.alias("first_name"),
                        df.lastName.alias("last_name"),
                        df.gender,
                        df.level).drop_duplicates()

In [53]:
# Lets display
users_table.show(n=5)

+-------+----------+---------+------+-----+
|user_id|first_name|last_name|gender|level|
+-------+----------+---------+------+-----+
|     26|      Ryan|    Smith|     M| free|
|      7|    Adelyn|   Jordan|     F| free|
|     71|    Ayleen|     Wise|     F| free|
|     81|    Sienna|    Colon|     F| free|
|     87|    Dustin|      Lee|     M| free|
+-------+----------+---------+------+-----+
only showing top 5 rows



In [54]:
# write users table to parquet files
users_table.write.parquet(output_data + "users/", 
                          mode="overwrite")

In [55]:
# create timestamp column from original timestamp column
get_timestamp = F.udf(lambda x : datetime.utcfromtimestamp(int(x)/1000), T.TimestampType())
df = df.withColumn("start_time", get_timestamp('ts'))

In [56]:
# Lets display
df.show(n=5)

+------------------+---------+---------+------+-------------+---------+---------+-----+--------------------+------+--------+-----------------+---------+--------------------+------+-------------+--------------------+------+--------------------+
|            artist|     auth|firstName|gender|itemInSession| lastName|   length|level|            location|method|    page|     registration|sessionId|                song|status|           ts|           userAgent|userId|          start_time|
+------------------+---------+---------+------+-------------+---------+---------+-----+--------------------+------+--------+-----------------+---------+--------------------+------+-------------+--------------------+------+--------------------+
|           Fat Joe|Logged In|     Kate|     F|           21|  Harrell|241.34485| paid|Lansing-East Lans...|   PUT|NextSong|1.540472624796E12|      605|Safe 2 Say [The I...|   200|1542296032796|"Mozilla/5.0 (X11...|    97|2018-11-15 15:33:...|
|       Linkin Park|Logg

In [57]:
# extract columns to create time table
# Udacity instructions:
# time - timestamps of records in songplays broken down into specific units
# start_time, hour, day, week, month, year, weekday
# Example 1: https://sparkbyexamples.com/spark/spark-extract-hour-minute-and-second-from-timestamp/
# Example 2: https://stackoverflow.com/questions/30949202/spark-dataframe-timestamptype-how-to-get-year-month-day-values-from-field
time_table = df.withColumn("start_time", F.col("start_time")) \
               .withColumn("hour", F.hour(F.col("start_time"))) \
               .withColumn("day", F.dayofmonth(F.col("start_time"))) \
               .withColumn("week", F.weekofyear(F.col("start_time"))) \
               .withColumn("month", F.month(F.col("start_time"))) \
               .withColumn("year", F.year(F.col("start_time"))) \
               .withColumn("weekday", F.dayofweek(F.col("start_time"))) \
               .select("ts","start_time","hour", "day", "week", "month", "year", "weekday").drop_duplicates()

In [58]:
# Lets display
time_table.show(n=5)

+-------------+--------------------+----+---+----+-----+----+-------+
|           ts|          start_time|hour|day|week|month|year|weekday|
+-------------+--------------------+----+---+----+-----+----+-------+
|1543595890796|2018-11-30 16:38:...|  16| 30|  48|   11|2018|      6|
|1542605625796|2018-11-19 05:33:...|   5| 19|  47|   11|2018|      2|
|1541267645796|2018-11-03 17:54:...|  17|  3|  44|   11|2018|      7|
|1542986986796|2018-11-23 15:29:...|  15| 23|  47|   11|2018|      6|
|1543349224796|2018-11-27 20:07:...|  20| 27|  48|   11|2018|      3|
+-------------+--------------------+----+---+----+-----+----+-------+
only showing top 5 rows



In [59]:
# write time table to parquet files partitioned by year and month
time_table.write.parquet(output_data + "time/", 
                         mode="overwrite",
                         partitionBy=["year","month"])

In [60]:
# read in song data to use for songplays table
songs_parquet = output_data + 'songs/*/*/*.parquet'
song_df = spark.read.parquet(songs_parquet)

In [61]:
# extract columns from joined song and log datasets to create songplays table 
# Udacity instructions:
# songplays - records in log data associated with song plays i.e. records with page NextSong
# songplay_id, start_time, user_id, level, song_id, artist_id, session_id, location, user_agent
# Example: https://dzone.com/articles/pyspark-join-explained-with-examples
# Objects: [userId, location, gender, start_time, title, duration, sessionId, registration, song, lastName, itemInSession, artist, ts, page, userAgent, level, length, method, auth, firstName, song_id, status]
songplays_table = df.join(song_df, [df.song == song_df.title], how='inner') \
                    .join(time_table, df.start_time == time_table.start_time, how="inner") \
                    .select(F.monotonically_increasing_id().alias("songplay_id"),
                            df.start_time,
                            df.userId.alias("user_id"),
                            df.level,
                            song_df.song_id,
                            df.artist.alias("artist_id"), 
                            df.sessionId.alias("session_id"), 
                            df.location, 
                            df.userAgent.alias("user_agent"),
                            time_table.year,
                            time_table.month) \
                    .repartition("year", "month") \
                    .drop_duplicates()

In [62]:
# Lets display
songplays_table.show(n=5)

+-------------+--------------------+-------+-----+------------------+-----------------+----------+--------------------+--------------------+----+-----+
|  songplay_id|          start_time|user_id|level|           song_id|        artist_id|session_id|            location|          user_agent|year|month|
+-------------+--------------------+-------+-----+------------------+-----------------+----------+--------------------+--------------------+----+-----+
| 188978561024|2018-11-21 21:56:...|     15| paid|SOZCTXZ12AB0182364|            Elena|       818|Chicago-Napervill...|"Mozilla/5.0 (X11...|2018|   11|
| 584115552256|2018-11-19 09:14:...|     24| paid|SOGDBUF12A8C140FAA|Calvin Richardson|       672|Lake Havasu City-...|"Mozilla/5.0 (Win...|2018|   11|
| 944892805120|2018-11-27 22:35:...|     80| paid|SOGDBUF12A8C140FAA|      Samy Deluxe|       992|Portland-South Po...|"Mozilla/5.0 (Mac...|2018|   11|
|1056561954816|2018-11-14 05:06:...|     10| free|SOGDBUF12A8C140FAA|        Percubaba| 

In [63]:
# write songplays table to parquet files partitioned by year and month
songplays_table.write.parquet(output_data + "songplays/", 
                              mode="overwrite",
                              partitionBy=["year","month"])

In [64]:
# End of all extractions and parquet savings. Lets have a look at the data we have produced.