In [32]:
import configparser
from datetime import datetime
import os
from pyspark.sql import SparkSession
from pyspark.sql.functions import monotonically_increasing_id   
from pyspark.sql.types import StructType,StructField, StringType, IntegerType, DoubleType, LongType
from pyspark.sql.functions import udf, col
from pyspark.sql.functions import year, month, dayofmonth, hour, weekofyear, date_format

In [4]:
spark = SparkSession \
    .builder \
    .appName("Sparkify") \
    .getOrCreate()

In [5]:
log_path = "data/log-data/*.json"
song_path = "data/song_data/*.json"
log = spark.read.json(log_path)
song = spark.read.json(song_path)

In [7]:
filtered_user_log = log.filter(log.page == "NextSong")
filtered_user_log.head()

Row(artist='Harmonia', auth='Logged In', firstName='Ryan', gender='M', itemInSession=0, lastName='Smith', length=655.77751, level='free', location='San Jose-Sunnyvale-Santa Clara, CA', method='PUT', page='NextSong', registration=1541016707796.0, sessionId=583, song='Sehr kosmisch', status=200, ts=1542241826796, userAgent='"Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Ubuntu Chromium/36.0.1985.125 Chrome/36.0.1985.125 Safari/537.36"', userId='26')

In [9]:
def get_date_unit_function(unit):
    switcher={
        'start_time': udf(lambda x: datetime.fromtimestamp(x/ 1000).strftime("%H:%M:%S")),
        'hour': udf(lambda x: datetime.fromtimestamp(x/ 1000).hour),
        'day': udf(lambda x: datetime.fromtimestamp(x/ 1000).day),
        'week': udf(lambda x: datetime.fromtimestamp(x/ 1000).isocalendar()[1]),
        'month': udf(lambda x: datetime.fromtimestamp(x/ 1000).month),
        'year': udf(lambda x: datetime.fromtimestamp(x/ 1000).year),
        'weekday': udf(lambda x: datetime.fromtimestamp(x/ 1000).weekday())
    }
    return switcher.get(unit)


In [10]:
time_schema = StructType([ \
        StructField("start_time",StringType(),True), \
        StructField("hour",IntegerType(),True), \
        StructField("day",IntegerType(),True), \
        StructField("week", IntegerType(), True), \
        StructField("month", IntegerType(), True), \
        StructField("year", IntegerType(), True), \
        StructField("weekday", IntegerType(), True)
      ])

In [11]:
filtered_user_log = filtered_user_log.withColumn("start_time",get_date_unit_function('start_time')(filtered_user_log.ts)). \
                            withColumn("hour",get_date_unit_function('hour')(filtered_user_log.ts)). \
                            withColumn("day",get_date_unit_function('day')(filtered_user_log.ts)). \
                            withColumn("week",get_date_unit_function('week')(filtered_user_log.ts)). \
                            withColumn("month",get_date_unit_function('month')(filtered_user_log.ts)). \
                            withColumn("year",get_date_unit_function('year')(filtered_user_log.ts)). \
                            withColumn("weekday",get_date_unit_function('weekday')(filtered_user_log.ts))


In [12]:
time_data = filtered_user_log.select("start_time", col("hour").cast('int').alias("hour"), col("day").cast('int').alias("day"), \
                                     col("week").cast('int').alias("week"), col("month").cast('int').alias("month"), \
                                     col("year").cast('int').alias("year"), col("weekday").cast('int').alias("weekday")).collect()

In [13]:
time_df = spark.createDataFrame(data=time_data, schema=time_schema)
#time_df.write.partitionBy("year","month").mode("overwrite").parquet("/tmp/output/time.parquet")

In [14]:
user_schema = StructType([ \
        StructField("user_id",StringType(),False), \
        StructField("first_name",StringType(),True), \
        StructField("last_name",StringType(),True), \
        StructField("gender", StringType(), True), \
        StructField("level", StringType(), True)
      ])

In [15]:
user_data = filtered_user_log.select("userId", "firstName", "lastName", "gender", "level").where(col("userId").isNotNull()).dropDuplicates().collect()

In [16]:
user_df = spark.createDataFrame(data=user_data, schema=user_schema)

In [17]:
song.printSchema()

root
 |-- artist_id: string (nullable = true)
 |-- artist_latitude: double (nullable = true)
 |-- artist_location: string (nullable = true)
 |-- artist_longitude: double (nullable = true)
 |-- artist_name: string (nullable = true)
 |-- duration: double (nullable = true)
 |-- num_songs: long (nullable = true)
 |-- song_id: string (nullable = true)
 |-- title: string (nullable = true)
 |-- year: long (nullable = true)



In [18]:
song_schema = StructType([ \
        StructField("song_id",StringType(),False), \
        StructField("title",StringType(),True), \
        StructField("artist_id",StringType(),True), \
        StructField("year", LongType(), True), \
        StructField("duration", DoubleType(), True)
      ])

In [20]:
song_data = song.select("song_id", "title", "artist_id", col("year").cast('int').alias("year"), \
                        col("duration").cast('double').alias("duration")).where(col("song_id").isNotNull()).dropDuplicates().collect()

In [21]:
song_df = spark.createDataFrame(data=song_data, schema=song_schema)

In [22]:
song_df.printSchema()
#song_df.write.partitionBy("year","artist_id").mode("overwrite").parquet("/tmp/output/song.parquet")

root
 |-- song_id: string (nullable = false)
 |-- title: string (nullable = true)
 |-- artist_id: string (nullable = true)
 |-- year: long (nullable = true)
 |-- duration: double (nullable = true)



In [23]:
artist_schema = StructType([ \
        StructField("artist_id",StringType(),False), \
        StructField("name",StringType(),True), \
        StructField("location",StringType(),True), \
        StructField("latitude", DoubleType(), True), \
        StructField("longitude", DoubleType(), True)
      ])

In [24]:
artist_data = song.select("artist_id", col("artist_name").alias("name"), col("artist_location").alias("location"), \
                          col("artist_latitude").alias("latitude"), col("artist_longitude").alias("longitude")).where(col("artist_id").isNotNull()).dropDuplicates().collect()

In [25]:
artist_df = spark.createDataFrame(data=artist_data, schema=artist_schema)

In [26]:
artist_df.head()

Row(artist_id='ARXR32B1187FB57099', name='Gob', location='', latitude=None, longitude=None)

In [56]:
songplay_schema = StructType([ \
        StructField("songplay_id",LongType(),False), \
        StructField("start_time",StringType(),False), \
        StructField("user_id",StringType(),False), \
        StructField("level", StringType(), True), \
        StructField("song_id", StringType(), True), \
        StructField("artist_id", StringType(), True), \
        StructField("session_id", StringType(), True), \
        StructField("location", StringType(), True), \
        StructField("user_agent", StringType(), True)            
      ])

In [48]:
joined_df = filtered_user_log.join(artist_df, filtered_user_log.artist == artist_df.name, how='left').\
                join(song_df, filtered_user_log.song == song_df.title, how='left'). \
                drop(artist_df.name).drop(song_df.title).drop(artist_df.location).drop(artist_df.artist_id). \
                withColumn("start_time",get_date_unit_function('start_time')(filtered_user_log.ts)). \
                withColumn("songplay_id",monotonically_increasing_id())
joined_df.printSchema()

root
 |-- artist: string (nullable = true)
 |-- auth: string (nullable = true)
 |-- firstName: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- itemInSession: long (nullable = true)
 |-- lastName: string (nullable = true)
 |-- length: double (nullable = true)
 |-- level: string (nullable = true)
 |-- location: string (nullable = true)
 |-- method: string (nullable = true)
 |-- page: string (nullable = true)
 |-- registration: double (nullable = true)
 |-- sessionId: long (nullable = true)
 |-- song: string (nullable = true)
 |-- status: long (nullable = true)
 |-- ts: long (nullable = true)
 |-- userAgent: string (nullable = true)
 |-- userId: string (nullable = true)
 |-- start_time: string (nullable = true)
 |-- hour: string (nullable = true)
 |-- day: string (nullable = true)
 |-- week: string (nullable = true)
 |-- month: string (nullable = true)
 |-- year: string (nullable = true)
 |-- weekday: string (nullable = true)
 |-- latitude: double (nullable = true)
 |-

In [54]:
songplays_data = joined_df.select("songplay_id", "start_time", col("userId").alias("user_id"), "level", "song_id", \
                                     "artist_id", col("sessionId").alias("session_id"), \
                                     "location", col("userAgent").alias("user_agent")).dropDuplicates().collect()

In [57]:
songplays_df = spark.createDataFrame(data=songplays_data, schema=songplay_schema)

In [58]:
songplays_df.printSchema()

root
 |-- songplay_id: long (nullable = false)
 |-- start_time: string (nullable = false)
 |-- user_id: string (nullable = false)
 |-- level: string (nullable = true)
 |-- song_id: string (nullable = true)
 |-- artist_id: string (nullable = true)
 |-- session_id: string (nullable = true)
 |-- location: string (nullable = true)
 |-- user_agent: string (nullable = true)



In [59]:
songplays_df.head()

Row(songplay_id=42949672979, start_time='23:21:20', user_id='30', level='paid', song_id=None, artist_id=None, session_id='324', location='San Jose-Sunnyvale-Santa Clara, CA', user_agent='Mozilla/5.0 (Windows NT 6.1; WOW64; rv:31.0) Gecko/20100101 Firefox/31.0')