In [66]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import isnan, count, when, col, desc, udf, col, sort_array, asc, avg
from pyspark.sql.functions import sum as Fsum, from_unixtime, monotonically_increasing_id
from pyspark.sql.window import Window
from pyspark.sql.types import IntegerType, TimestampType

In [2]:
spark = SparkSession \
    .builder \
    .appName("ETL") \
    .getOrCreate()

In [None]:
input_data = "/home/workspace/data/"
output_data = "/home/workspace/data/"

In [34]:
song_data = "/home/workspace/data/song_data/*/*/*/*.json"

# read song data file
df = spark.read.json(song_data)

# extract columns to create songs table
songs_table = df.select("song_id", "title", "artist_id", "year", "duration").drop_duplicates()

# # write songs table to parquet files partitioned by year and artist
print(songs_table.count())
songs_table.printSchema()
songs_table.show(5, False)
songs_table.write \
               .mode("overwrite") \
               .partitionBy("year", "artist_id") \
               .parquet(output_data + "songs")

71
root
 |-- song_id: string (nullable = true)
 |-- title: string (nullable = true)
 |-- artist_id: string (nullable = true)
 |-- year: long (nullable = true)
 |-- duration: double (nullable = true)

+------------------+---------------------------------------------------+------------------+----+---------+
|song_id           |title                                              |artist_id         |year|duration |
+------------------+---------------------------------------------------+------------------+----+---------+
|SOGOSOV12AF72A285E|¿Dónde va Chichi?                                  |ARGUVEV1187B98BA17|1997|313.12934|
|SOTTDKS12AB018D69B|It Wont Be Christmas                               |ARMBR4Y1187B9990EB|0   |241.47546|
|SOBBUGU12A8C13E95D|Setting Fire to Sleeping Giants                    |ARMAC4T1187FB3FA4C|2004|207.77751|
|SOIAZJW12AB01853F1|Pink World                                         |AR8ZCNI1187B9A069B|1984|269.81832|
|SONYPOM12A8C13B2D7|I Think My Wife Is Running Arou

In [38]:
%%time
# extract columns to create artists table
artists_table = df.selectExpr("artist_id", 
                              "artist_name as name", 
                              "artist_location as location", 
                              "artist_latitude as latitude", 
                              "artist_longitude as longitude")\
                  .drop_duplicates()

print(artists_table.count())
artists_table.printSchema()
artists_table.show(5, False)

# write artists table to parquet files
artists_table.write \
             .mode("overwrite") \
             .parquet(output_data + "artists")

69
root
 |-- artist_id: string (nullable = true)
 |-- name: string (nullable = true)
 |-- location: string (nullable = true)
 |-- latitude: double (nullable = true)
 |-- longitude: double (nullable = true)

+------------------+---------------+---------------+--------+----------+
|artist_id         |name           |location       |latitude|longitude |
+------------------+---------------+---------------+--------+----------+
|ARPBNLO1187FB3D52F|Tiny Tim       |New York, NY   |40.71455|-74.00712 |
|ARXR32B1187FB57099|Gob            |               |null    |null      |
|AROGWRA122988FEE45|Christos Dantis|               |null    |null      |
|ARBGXIG122988F409D|Steel Rain     |California - SF|37.77916|-122.42005|
|AREVWGE1187B9B890A|Bitter End     |Noci (BA)      |-13.442 |-41.9952  |
+------------------+---------------+---------------+--------+----------+
only showing top 5 rows

CPU times: user 7.47 ms, sys: 0 ns, total: 7.47 ms
Wall time: 8.2 s


In [51]:
# get filepath to log data file
log_data = input_data + "log_data/*.json"

# read log data file
df = spark.read.json(log_data)
print(df.count())

# filter by actions for song plays
df = df.filter(df.page == "NextSong")
print(df.count())
df.printSchema()

8056
6820
root
 |-- artist: string (nullable = true)
 |-- auth: string (nullable = true)
 |-- firstName: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- itemInSession: long (nullable = true)
 |-- lastName: string (nullable = true)
 |-- length: double (nullable = true)
 |-- level: string (nullable = true)
 |-- location: string (nullable = true)
 |-- method: string (nullable = true)
 |-- page: string (nullable = true)
 |-- registration: double (nullable = true)
 |-- sessionId: long (nullable = true)
 |-- song: string (nullable = true)
 |-- status: long (nullable = true)
 |-- ts: long (nullable = true)
 |-- userAgent: string (nullable = true)
 |-- userId: string (nullable = true)



In [52]:
# extract columns for users table    
users_table = df.selectExpr("userId as user_id",
                            "firstName as first_name",
                            "lastName as last_name",
                            "gender",
                            "level")

# write users table to parquet files
users_table.printSchema()
users_table.count()
users_table.show(5, False)
users_table.write \
           .mode("overwrite") \
           .parquet(output_data + "users")

root
 |-- user_id: string (nullable = true)
 |-- first_name: string (nullable = true)
 |-- last_name: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- level: string (nullable = true)

+-------+----------+---------+------+-----+
|user_id|first_name|last_name|gender|level|
+-------+----------+---------+------+-----+
|26     |Ryan      |Smith    |M     |free |
|26     |Ryan      |Smith    |M     |free |
|26     |Ryan      |Smith    |M     |free |
|61     |Samuel    |Gonzalez |M     |free |
|80     |Tegan     |Levine   |F     |paid |
+-------+----------+---------+------+-----+
only showing top 5 rows



In [53]:
from pyspark.sql.types import TimestampType
from datetime import datetime

# create timestamp column from original timestamp column
get_timestamp = udf(lambda x: int(x / 1000), IntegerType())
df = df.withColumn("start_time", get_timestamp('ts'))
df.select("ts", "start_time").show()

# # create datetime column from original timestamp column
get_datetime = udf(lambda x: datetime.utcfromtimestamp(x), TimestampType()) 
df = df.withColumn('start_time', get_datetime('start_time'))
df.select("ts", "start_time").show()

+-------------+----------+
|           ts|start_time|
+-------------+----------+
|1542241826796|1542241826|
|1542242481796|1542242481|
|1542242741796|1542242741|
|1542253449796|1542253449|
|1542260935796|1542260935|
|1542261224796|1542261224|
|1542261356796|1542261356|
|1542261662796|1542261662|
|1542262057796|1542262057|
|1542262233796|1542262233|
|1542262434796|1542262434|
|1542262456796|1542262456|
|1542262679796|1542262679|
|1542262728796|1542262728|
|1542262893796|1542262893|
|1542263158796|1542263158|
|1542263378796|1542263378|
|1542265716796|1542265716|
|1542265929796|1542265929|
|1542266927796|1542266927|
+-------------+----------+
only showing top 20 rows

+-------------+-------------------+
|           ts|         start_time|
+-------------+-------------------+
|1542241826796|2018-11-15 00:30:26|
|1542242481796|2018-11-15 00:41:21|
|1542242741796|2018-11-15 00:45:41|
|1542253449796|2018-11-15 03:44:09|
|1542260935796|2018-11-15 05:48:55|
|1542261224796|2018-11-15 05:53:44|
|1

In [54]:
from pyspark.sql.functions import year, month, dayofmonth, hour, weekofyear, date_format, dayofweek
# extract columns to create time table
time_table = df.withColumn("hour", hour("start_time")) \
               .withColumn("day", dayofmonth("start_time")) \
               .withColumn("week", weekofyear("start_time")) \
               .withColumn("month", month("start_time")) \
               .withColumn("year", year("start_time")) \
               .withColumn("weekday", dayofweek("start_time")) \
               .select("start_time", "hour", "day", "week", "month", "year", "weekday").drop_duplicates()

# write time table to parquet files partitioned by year and month
time_table.printSchema()
time_table.show(5, False)
time_table.write \
          .mode("overwrite") \
          .partitionBy("year", "month") \
          .parquet(output_data + "time")

root
 |-- start_time: timestamp (nullable = true)
 |-- hour: integer (nullable = true)
 |-- day: integer (nullable = true)
 |-- week: integer (nullable = true)
 |-- month: integer (nullable = true)
 |-- year: integer (nullable = true)
 |-- weekday: integer (nullable = true)

+-------------------+----+---+----+-----+----+-------+
|start_time         |hour|day|week|month|year|weekday|
+-------------------+----+---+----+-----+----+-------+
|2018-11-15 17:02:30|17  |15 |46  |11   |2018|5      |
|2018-11-15 21:18:34|21  |15 |46  |11   |2018|5      |
|2018-11-21 13:46:13|13  |21 |47  |11   |2018|4      |
|2018-11-14 09:19:37|9   |14 |46  |11   |2018|4      |
|2018-11-14 12:18:35|12  |14 |46  |11   |2018|4      |
+-------------------+----+---+----+-----+----+-------+
only showing top 5 rows



In [56]:
song_df.count()

71

In [65]:
# read in song data to use for songplays table
song_df = spark.read.json(song_data)
print(song_df.count())
song_df.printSchema()

print(df.count())
df.printSchema()

71
root
 |-- artist_id: string (nullable = true)
 |-- artist_latitude: double (nullable = true)
 |-- artist_location: string (nullable = true)
 |-- artist_longitude: double (nullable = true)
 |-- artist_name: string (nullable = true)
 |-- duration: double (nullable = true)
 |-- num_songs: long (nullable = true)
 |-- song_id: string (nullable = true)
 |-- title: string (nullable = true)
 |-- year: long (nullable = true)

6820
root
 |-- artist: string (nullable = true)
 |-- auth: string (nullable = true)
 |-- firstName: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- itemInSession: long (nullable = true)
 |-- lastName: string (nullable = true)
 |-- length: double (nullable = true)
 |-- level: string (nullable = true)
 |-- location: string (nullable = true)
 |-- method: string (nullable = true)
 |-- page: string (nullable = true)
 |-- registration: double (nullable = true)
 |-- sessionId: long (nullable = true)
 |-- song: string (nullable = true)
 |-- status: long (nul

In [61]:
display(df.limit(5).toPandas().head())
display(song_df.limit(5).toPandas().head())

Unnamed: 0,artist,auth,firstName,gender,itemInSession,lastName,length,level,location,method,page,registration,sessionId,song,status,ts,userAgent,userId,start_time
0,Harmonia,Logged In,Ryan,M,0,Smith,655.77751,free,"San Jose-Sunnyvale-Santa Clara, CA",PUT,NextSong,1541017000000.0,583,Sehr kosmisch,200,1542241826796,"""Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/5...",26,2018-11-15 00:30:26
1,The Prodigy,Logged In,Ryan,M,1,Smith,260.07465,free,"San Jose-Sunnyvale-Santa Clara, CA",PUT,NextSong,1541017000000.0,583,The Big Gundown,200,1542242481796,"""Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/5...",26,2018-11-15 00:41:21
2,Train,Logged In,Ryan,M,2,Smith,205.45261,free,"San Jose-Sunnyvale-Santa Clara, CA",PUT,NextSong,1541017000000.0,583,Marry Me,200,1542242741796,"""Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/5...",26,2018-11-15 00:45:41
3,Sony Wonder,Logged In,Samuel,M,0,Gonzalez,218.06975,free,"Houston-The Woodlands-Sugar Land, TX",PUT,NextSong,1540493000000.0,597,Blackbird,200,1542253449796,"""Mozilla/5.0 (Macintosh; Intel Mac OS X 10_9_4...",61,2018-11-15 03:44:09
4,Van Halen,Logged In,Tegan,F,2,Levine,289.38404,paid,"Portland-South Portland, ME",PUT,NextSong,1540794000000.0,602,Best Of Both Worlds (Remastered Album Version),200,1542260935796,"""Mozilla/5.0 (Macintosh; Intel Mac OS X 10_9_4...",80,2018-11-15 05:48:55


Unnamed: 0,artist_id,artist_latitude,artist_location,artist_longitude,artist_name,duration,num_songs,song_id,title,year
0,ARDR4AC1187FB371A1,,,,Montserrat Caballé;Placido Domingo;Vicente Sar...,511.16363,1,SOBAYLL12A8C138AF9,Sono andati? Fingevo di dormire,0
1,AREBBGV1187FB523D2,,"Houston, TX",,Mike Jones (Featuring CJ_ Mello & Lil' Bran),173.66159,1,SOOLYAZ12A6701F4A6,Laws Patrolling (Album Version),0
2,ARMAC4T1187FB3FA4C,40.82624,"Morris Plains, NJ",-74.47995,The Dillinger Escape Plan,207.77751,1,SOBBUGU12A8C13E95D,Setting Fire to Sleeping Giants,2004
3,ARPBNLO1187FB3D52F,40.71455,"New York, NY",-74.00712,Tiny Tim,43.36281,1,SOAOIBZ12AB01815BE,I Hold Your Hand In Mine [Live At Royal Albert...,2000
4,ARDNS031187B9924F0,32.67828,Georgia,-83.22295,Tim Wilson,186.48771,1,SONYPOM12A8C13B2D7,I Think My Wife Is Running Around On Me (Taco ...,2005


In [74]:
# read in song data to use for songplays table
song_df = spark.read.json(song_data)

# extract columns from joined song and log datasets to create songplays table 
songplays_table = df.join(song_df, (df.song==song_df.title) & (df.artist==song_df.artist_name), how='left')\
                    .selectExpr("start_time", 
                                "userId as user_id",
                                "level",
                                "song_id",
                                "artist_id",
                                "sessionId as session_id",
                                "artist_location as location",
                                "userAgent as user_agent")\
                    .withColumn("year", year("start_time"))\
                    .withColumn("month", month("start_time"))\
                    .drop_duplicates()\
                    .withColumn("songplay_id", monotonically_increasing_id())

In [75]:
songplays_table.count()
display(songplays_table.limit(5).toPandas().head())

Unnamed: 0,start_time,user_id,level,song_id,artist_id,session_id,location,user_agent,year,month,songplay_id
0,2018-11-15 09:50:22,30,paid,,,324,,Mozilla/5.0 (Windows NT 6.1; WOW64; rv:31.0) G...,2018,11,0
1,2018-11-21 23:53:48,15,paid,,,818,,"""Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/5...",2018,11,1
2,2018-11-14 06:35:14,80,paid,,,548,,"""Mozilla/5.0 (Macintosh; Intel Mac OS X 10_9_4...",2018,11,2
3,2018-11-14 09:14:48,58,paid,,,522,,"""Mozilla/5.0 (Windows NT 6.3; WOW64) AppleWebK...",2018,11,3
4,2018-11-14 09:35:50,58,paid,,,522,,"""Mozilla/5.0 (Windows NT 6.3; WOW64) AppleWebK...",2018,11,4


In [76]:
# write songplays table to parquet files partitioned by year and month
songplays_table.write \
               .mode("overwrite") \
               .partitionBy("year", "month") \
               .parquet(output_data + "songplays")