In [39]:
import configparser
from datetime import datetime
import os

import findspark
findspark.init()

from pyspark.sql import SparkSession
from pyspark.sql.functions import udf, col, monotonically_increasing_id
from pyspark.sql.functions import year, month, dayofmonth, hour, weekofyear, date_format, dayofweek
from pyspark.sql.types import *

## Create Spark Session

In [5]:
spark = SparkSession.builder.appName('Sparkify').getOrCreate()

In [6]:
input_data = './data/'
output_data = './data/'

## Process Song Data

In [10]:
song_data = input_data + 'song_data/*/*/*/*.json'
    
song_schema = StructType([
    StructField("artist_id", StringType()),
    StructField("artist_latitude", DoubleType()),
    StructField("artist_location", StringType()),
    StructField("artist_longitude", DoubleType()),
    StructField("artist_name", StringType()),
    StructField("duration", DoubleType()),
    StructField("num_songs", IntegerType()),
    StructField("title", StringType()),
    StructField("year", IntegerType())
])
    
df = spark.read.json(song_data, schema=song_schema)
df.printSchema()

root
 |-- artist_id: string (nullable = true)
 |-- artist_latitude: double (nullable = true)
 |-- artist_location: string (nullable = true)
 |-- artist_longitude: double (nullable = true)
 |-- artist_name: string (nullable = true)
 |-- duration: double (nullable = true)
 |-- num_songs: integer (nullable = true)
 |-- title: string (nullable = true)
 |-- year: integer (nullable = true)



In [12]:
song_table = df.select('title', 'artist_id', 'year', 'duration').dropDuplicates()\
                .withColumn('song_id', monotonically_increasing_id())
song_table.show(5)

+--------------------+------------------+----+---------+-----------+
|               title|         artist_id|year| duration|    song_id|
+--------------------+------------------+----+---------+-----------+
|               Intro|AR558FS1187FB45658|2003| 75.67628|51539607552|
|Setting Fire to S...|ARMAC4T1187FB3FA4C|2004|207.77751|68719476736|
|Kutt Free (DJ Vol...|ARNNKDK1187B98BBD5|   0|407.37914|68719476737|
|Get Your Head Stu...|AREDL271187FB40F44|   0| 45.66159|77309411328|
|     Amor De Cabaret|ARKRRTF1187B9984DA|   0|177.47546|94489280512|
+--------------------+------------------+----+---------+-----------+
only showing top 5 rows



In [13]:
song_table.write.parquet(output_data + 'songs/', mode='overwrite', partitionBy=['year', 'artist_id'])

In [14]:
artist_table = df.select("artist_id","artist_name","artist_location","artist_latitude","artist_longitude").dropDuplicates()
artist_table.show(5)

+------------------+------------+---------------+---------------+----------------+
|         artist_id| artist_name|artist_location|artist_latitude|artist_longitude|
+------------------+------------+---------------+---------------+----------------+
|ARPBNLO1187FB3D52F|    Tiny Tim|   New York, NY|       40.71455|       -74.00712|
|ARBEBBY1187B9B43DB|   Tom Petty|Gainesville, FL|           null|            null|
|AR0IAWL1187B9A96D0|Danilo Perez|         Panama|         8.4177|       -80.11278|
|ARMBR4Y1187B9990EB|David Martin|California - SF|       37.77916|      -122.42005|
|ARD0S291187B9B7BF5|     Rated R|           Ohio|           null|            null|
+------------------+------------+---------------+---------------+----------------+
only showing top 5 rows



In [15]:
artist_table.write.parquet(output_data + 'artists/', mode='overwrite')

## Process Log Data

In [29]:
log_data = input_data + 'log-data/'

df = spark.read.json(log_data).drop_duplicates()

df = df.filter(df.page == 'NextSong')

#### User Table

In [30]:
users_fields = ["userId", "firstName", "lastName", "gender", "level"]
users_table = df.selectExpr(users_fields).drop_duplicates()
users_table.show(5)

+------+---------+--------+------+-----+
|userId|firstName|lastName|gender|level|
+------+---------+--------+------+-----+
|    98|   Jordyn|  Powell|     F| free|
|    34|   Evelin|   Ayala|     F| free|
|    85|  Kinsley|   Young|     F| paid|
|    38|   Gianna|   Jones|     F| free|
|    85|  Kinsley|   Young|     F| free|
+------+---------+--------+------+-----+
only showing top 5 rows



In [32]:
users_table.write.parquet(output_data + 'users/', mode='overwrite')

#### Time Table

In [40]:
get_timestamp = udf(lambda x: datetime.utcfromtimestamp(int(x) / 1000), TimestampType())
df = df.withColumn('start_time', get_timestamp('ts'))

time_table = df.withColumn("hour",hour("start_time"))\
                .withColumn("day",dayofmonth("start_time"))\
                .withColumn("week",weekofyear("start_time"))\
                .withColumn("month",month("start_time"))\
                .withColumn("year",year("start_time"))\
                .withColumn("weekday",dayofweek("start_time"))\
                .select("ts","start_time","hour", "day", "week", "month", "year", "weekday").drop_duplicates()

time_table.show(5)

+-------------+--------------------+----+---+----+-----+----+-------+
|           ts|          start_time|hour|day|week|month|year|weekday|
+-------------+--------------------+----+---+----+-----+----+-------+
|1542394784796|2018-11-16 18:59:...|  18| 16|  46|   11|2018|      6|
|1543219406796|2018-11-26 08:03:...|   8| 26|  48|   11|2018|      2|
|1542695612796|2018-11-20 06:33:...|   6| 20|  47|   11|2018|      3|
|1543026602796|2018-11-24 02:30:...|   2| 24|  47|   11|2018|      7|
|1542276456796|2018-11-15 10:07:...|  10| 15|  46|   11|2018|      5|
+-------------+--------------------+----+---+----+-----+----+-------+
only showing top 5 rows



In [41]:
time_table.write.parquet(output_data + 'time_table/', mode='overwrite', partitionBy=['year', 'month'])

#### Songplays Table

In [42]:
# read in song data to use for songplays table
song_df = spark.read\
                .format("parquet")\
                .option("basePath", os.path.join(output_data, "songs/"))\
                .load(os.path.join(output_data, "songs/*/*/"))

In [43]:
# extract columns from joined song and log datasets to create songplays table
songplays_table = df.join(song_df, df.song == song_df.title, how='inner')\
                    .select(monotonically_increasing_id().alias("songplay_id"), col("start_time"),
                            col("userId").alias("user_id"), "level", "song_id", "artist_id", 
                            col("sessionId").alias("session_id"), "location", col("userAgent").alias("user_agent"))

In [44]:
songplays_table = songplays_table.join(time_table, songplays_table.start_time == time_table.start_time, how="inner")\
                        .select("songplay_id", songplays_table.start_time, "user_id", "level", "song_id", "artist_id", 
                                "session_id", "location", "user_agent", "year", "month")

In [45]:
songplays_table.show(5)

+-------------+--------------------+-------+-----+-------------+------------------+----------+--------------------+--------------------+----+-----+
|  songplay_id|          start_time|user_id|level|      song_id|         artist_id|session_id|            location|          user_agent|year|month|
+-------------+--------------------+-------+-----+-------------+------------------+----------+--------------------+--------------------+----+-----+
| 601295421440|2018-11-27 22:35:...|     80| paid|  51539607552|AR558FS1187FB45658|       992|Portland-South Po...|"Mozilla/5.0 (Mac...|2018|   11|
| 652835028992|2018-11-19 09:14:...|     24| paid|  51539607552|AR558FS1187FB45658|       672|Lake Havasu City-...|"Mozilla/5.0 (Win...|2018|   11|
| 790273982464|2018-11-21 21:56:...|     15| paid|1348619730944|AR5KOSW1187FB35FF4|       818|Chicago-Napervill...|"Mozilla/5.0 (X11...|2018|   11|
|1254130450432|2018-11-14 05:06:...|     10| free|  51539607552|AR558FS1187FB45658|       484|Washington-Arling.

In [46]:
# write songplays table to parquet files partitioned by year and month
songplays_table.drop_duplicates().write.parquet(os.path.join(output_data, "songplays/"), mode="overwrite", partitionBy=["year","month"])