In [1]:
import os
from datetime import datetime
from pyspark.sql import SparkSession, SQLContext
from pyspark.sql.functions import udf, col
from pyspark.sql.functions import year, month, dayofmonth, hour, weekofyear, dayofweek, date_format
from pyspark.sql.types import StructType as R, StructField as Fld, DoubleType as Dbl, DecimalType as Dec, StringType as Str, IntegerType as Int, DateType as Date, TimestampType as Ts
from pyspark.sql.functions import monotonically_increasing_id, from_unixtime

In [2]:
spark = SparkSession \
        .builder \
        .config("spark.jars.packages", "org.apache.hadoop:hadoop-aws:2.7.0") \
        .getOrCreate()

In [3]:
songsSchema = R([
    Fld("num_songs", Int()),
    Fld("artist_id", Str()),
    Fld("artist_latitude", Dec()),
    Fld("artist_longitude", Dec()),
    Fld("artist_location", Str()),
    Fld("artist_name", Str()),
    Fld("song_id", Str()),
    Fld("title", Str()),
    Fld("duration", Dbl()),
    Fld("year", Int()),
]) 

In [5]:
input_data = './data/'
song_data = f"{input_data}/song_data/*/*/*/*.json"
df = spark.read.json(song_data, schema=songsSchema)

In [6]:
df.printSchema()

root
 |-- num_songs: integer (nullable = true)
 |-- artist_id: string (nullable = true)
 |-- artist_latitude: decimal(10,0) (nullable = true)
 |-- artist_longitude: decimal(10,0) (nullable = true)
 |-- artist_location: string (nullable = true)
 |-- artist_name: string (nullable = true)
 |-- song_id: string (nullable = true)
 |-- title: string (nullable = true)
 |-- duration: double (nullable = true)
 |-- year: integer (nullable = true)



In [7]:
df.createOrReplaceTempView("songs_table")
songs_table = spark.sql('''
    SELECT DISTINCT song_id, title, artist_id, year, duration
    FROM songs_table
''')
songs_table = songs_table.dropDuplicates(['song_id'])

In [8]:
songs_table.head(5)

[Row(song_id='SOGOSOV12AF72A285E', title='¿Dónde va Chichi?', artist_id='ARGUVEV1187B98BA17', year=1997, duration=313.12934),
 Row(song_id='SOMZWCG12A8C13C480', title="I Didn't Mean To", artist_id='ARD7TVE1187B99BFB1', year=0, duration=218.93179),
 Row(song_id='SOUPIRU12A6D4FA1E1', title='Der Kleine Dompfaff', artist_id='ARJIE2Y1187B994AB7', year=0, duration=152.92036),
 Row(song_id='SOXVLOJ12AB0189215', title='Amor De Cabaret', artist_id='ARKRRTF1187B9984DA', year=0, duration=177.47546),
 Row(song_id='SOWTBJW12AC468AC6E', title='Broken-Down Merry-Go-Round', artist_id='ARQGYP71187FB44566', year=0, duration=151.84934)]

In [9]:
log_data = input_data + "log_data/*.json"

df = spark.read.json(log_data)

df = df.filter(df.page=='NextSong')

In [10]:
df.printSchema()

root
 |-- artist: string (nullable = true)
 |-- auth: string (nullable = true)
 |-- firstName: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- itemInSession: long (nullable = true)
 |-- lastName: string (nullable = true)
 |-- length: double (nullable = true)
 |-- level: string (nullable = true)
 |-- location: string (nullable = true)
 |-- method: string (nullable = true)
 |-- page: string (nullable = true)
 |-- registration: double (nullable = true)
 |-- sessionId: long (nullable = true)
 |-- song: string (nullable = true)
 |-- status: long (nullable = true)
 |-- ts: long (nullable = true)
 |-- userAgent: string (nullable = true)
 |-- userId: string (nullable = true)



In [11]:
df.createOrReplaceTempView("logs_data_table")
# extract columns for users table    
users_table = spark.sql('''
    SELECT DISTINCT userId, firstName, lastName, gender, level
    FROM logs_data_table
''')
users_table = users_table.dropDuplicates(["userId"])

In [12]:
users_table.head(5)

[Row(userId='51', firstName='Maia', lastName='Burke', gender='F', level='free'),
 Row(userId='7', firstName='Adelyn', lastName='Jordan', gender='F', level='free'),
 Row(userId='15', firstName='Lily', lastName='Koch', gender='F', level='paid'),
 Row(userId='54', firstName='Kaleb', lastName='Cook', gender='M', level='free'),
 Row(userId='101', firstName='Jayden', lastName='Fox', gender='M', level='free')]

In [13]:
df = df.withColumn("ts",from_unixtime((df.ts.cast('bigint')/1000)).cast('timestamp'))
df.createOrReplaceTempView("time_table")
time_table = df.select(\
              df.ts.alias('start_time'),    
              hour(df.ts).alias('hour'), \
              dayofmonth(df.ts).alias('day'),\
              weekofyear(df.ts).alias('week'),\
              month(df.ts).alias('month'),\
              year(df.ts).alias('year'),\
              dayofweek(df.ts).alias('weekday'),\
            )

time_table = time_table.dropDuplicates(["start_time"])

In [14]:
time_table.printSchema()

root
 |-- start_time: timestamp (nullable = true)
 |-- hour: integer (nullable = true)
 |-- day: integer (nullable = true)
 |-- week: integer (nullable = true)
 |-- month: integer (nullable = true)
 |-- year: integer (nullable = true)
 |-- weekday: integer (nullable = true)



In [15]:
time_table.head(5)

[Row(start_time=datetime.datetime(2018, 11, 21, 18, 52, 12), hour=18, day=21, week=47, month=11, year=2018, weekday=4),
 Row(start_time=datetime.datetime(2018, 11, 22, 3, 46, 29), hour=3, day=22, week=47, month=11, year=2018, weekday=5),
 Row(start_time=datetime.datetime(2018, 11, 14, 12, 37, 40), hour=12, day=14, week=46, month=11, year=2018, weekday=4),
 Row(start_time=datetime.datetime(2018, 11, 14, 20, 14, 41), hour=20, day=14, week=46, month=11, year=2018, weekday=4),
 Row(start_time=datetime.datetime(2018, 11, 15, 7, 3), hour=7, day=15, week=46, month=11, year=2018, weekday=5)]

In [16]:
song_data = input_data + "song_data/*/*/*/*.json"
song_df = spark.read.json(song_data, schema=songsSchema)
song_df.createOrReplaceTempView("songs_table") 

In [17]:
artists_table = spark.sql('''
    SELECT DISTINCT artist_id, artist_name AS name, artist_location AS location, artist_latitude AS latitude, artist_longitude AS longitude
    FROM songs_table
''')
artists_table.createOrReplaceTempView("artists_table") 

In [20]:
songplays_table = spark.sql('''
        SELECT 
            year(l.ts) AS year,
            month(l.ts) AS month,
            l.ts AS start_time,
            l.userId AS user_id,
            l.level,
            s.song_id,
            a.artist_id,
            l.sessionId AS session_id,
            l.location,
            l.userAgent AS user_agent
        FROM time_table AS l
        JOIN songs_table AS s 
        ON (l.song = s.title AND l.artist = s.artist_name)  
        JOIN artists_table AS a ON a.artist_id=s.artist_id
        LIMIT 5
    ''')
songplays_table = songplays_table.withColumn("songplay_id", monotonically_increasing_id())
songplays_table.show()

[Row(year=2018, month=11, start_time=datetime.datetime(2018, 11, 22, 5, 56, 47), user_id='15', level='paid', song_id='SOZCTXZ12AB0182364', artist_id='AR5KOSW1187FB35FF4', session_id=818, location='Chicago-Naperville-Elgin, IL-IN-WI', user_agent='"Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Ubuntu Chromium/36.0.1985.125 Chrome/36.0.1985.125 Safari/537.36"', songplay_id=0)]