In [1]:
import configparser
from datetime import datetime
import os
from pyspark.sql import SparkSession
from pyspark.sql.functions import udf, col
from pyspark.sql.functions import year, month, dayofmonth, hour, weekofyear, date_format


In [2]:
spark = SparkSession \
        .builder \
        .config("spark.jars.packages", "org.apache.hadoop:hadoop-aws:2.7.0") \
        .getOrCreate()

In [3]:
input_data = "./data/"

In [4]:
from pyspark.sql.types import StructType as R, StructField,DecimalType as Dec, DoubleType as Dbl,LongType as Long, StringType as Str, IntegerType as Int, DateType as Date 
songsSchema = R([
    StructField("num_songs",Int()),
    StructField("artist_id",Str()),
    StructField("artist_latitude",Dec()),
    StructField("artist_longitude",Dec()),
    StructField("artist_location",Str()),
    StructField("artist_name",Str()),
    StructField("song_id",Str()),
    StructField("title",Str()),
    StructField("duration",Dbl()),
    StructField("year",Int()),
])

In [5]:
song_data = input_data + "song_data/*/*/*/*.json"
df = spark.read.json(song_data, schema=songsSchema)

In [6]:
df.printSchema()

root
 |-- num_songs: integer (nullable = true)
 |-- artist_id: string (nullable = true)
 |-- artist_latitude: decimal(10,0) (nullable = true)
 |-- artist_longitude: decimal(10,0) (nullable = true)
 |-- artist_location: string (nullable = true)
 |-- artist_name: string (nullable = true)
 |-- song_id: string (nullable = true)
 |-- title: string (nullable = true)
 |-- duration: double (nullable = true)
 |-- year: integer (nullable = true)



In [7]:
df.createOrReplaceTempView("songs_data_table")

In [8]:
songs_table = spark.sql('''
    SELECT DISTINCT song_id, title, artist_id, year, duration
    FROM songs_data_table
''')

In [9]:
print("Songs table schema:\n")
songs_table.printSchema()

Songs table schema:

root
 |-- song_id: string (nullable = true)
 |-- title: string (nullable = true)
 |-- artist_id: string (nullable = true)
 |-- year: integer (nullable = true)
 |-- duration: double (nullable = true)



In [10]:
songs_table.head(1)

[Row(song_id='SOBBXLX12A58A79DDA', title='Erica (2005 Digital Remaster)', artist_id='AREDBBQ1187B98AFF5', year=0, duration=138.63138)]

In [11]:
#songs_table.write.parquet(output_data + "songs_table.parquet",
#                          mode = "overwrite")

In [12]:
log_data = input_data + "log_data/*/*/*.json"
print("Reading song_data from {}\n".format(log_data))
# read log data file
df = spark.read.json(log_data)

# filter by actions for song plays
# If I specify the schema, all empties are None...
df = df.filter(df.page=='NextSong')

Reading song_data from ./data/log_data/*/*/*.json



In [13]:
df.printSchema()

root
 |-- artist: string (nullable = true)
 |-- auth: string (nullable = true)
 |-- firstName: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- itemInSession: long (nullable = true)
 |-- lastName: string (nullable = true)
 |-- length: double (nullable = true)
 |-- level: string (nullable = true)
 |-- location: string (nullable = true)
 |-- method: string (nullable = true)
 |-- page: string (nullable = true)
 |-- registration: double (nullable = true)
 |-- sessionId: long (nullable = true)
 |-- song: string (nullable = true)
 |-- status: long (nullable = true)
 |-- ts: long (nullable = true)
 |-- userAgent: string (nullable = true)
 |-- userId: string (nullable = true)



In [14]:
df.createOrReplaceTempView("logs_data_table")
    # extract columns for users table    
users_table = spark.sql('''
        SELECT DISTINCT userId as user_id, firstName as first_name, lastName as last_name, gender, level
        FROM logs_data_table
    ''')
users_table = users_table.dropDuplicates(["user_id"])

In [15]:
users_table.head(1)

[Row(user_id='51', first_name='Maia', last_name='Burke', gender='F', level='free')]

In [16]:
users_table.createOrReplaceTempView("users")
count_users = spark.sql('''
    SELECT *
    FROM users
    WHERE user_id='16'
''')

In [17]:
count_users.head(5)

[Row(user_id='16', first_name='Rylan', last_name='George', gender='M', level='paid')]

In [18]:
# create timestamp column from original timestamp column
from datetime import datetime
from pyspark.sql.types import TimestampType

In [19]:
convert_ts = udf(lambda time: datetime.fromtimestamp((time/1000.0)), TimestampType())
get_datetime = udf(lambda time: datetime.fromtimestamp((time/1000.0)), Date())

In [20]:
clean_ts = df.withColumn("date",get_datetime("ts"))

clean_ts = clean_ts.withColumn("ts",convert_ts("ts"))
clean_ts.printSchema()

root
 |-- artist: string (nullable = true)
 |-- auth: string (nullable = true)
 |-- firstName: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- itemInSession: long (nullable = true)
 |-- lastName: string (nullable = true)
 |-- length: double (nullable = true)
 |-- level: string (nullable = true)
 |-- location: string (nullable = true)
 |-- method: string (nullable = true)
 |-- page: string (nullable = true)
 |-- registration: double (nullable = true)
 |-- sessionId: long (nullable = true)
 |-- song: string (nullable = true)
 |-- status: long (nullable = true)
 |-- ts: timestamp (nullable = true)
 |-- userAgent: string (nullable = true)
 |-- userId: string (nullable = true)
 |-- date: date (nullable = true)



In [22]:
clean_ts.head()

Row(artist='Harmonia', auth='Logged In', firstName='Ryan', gender='M', itemInSession=0, lastName='Smith', length=655.77751, level='free', location='San Jose-Sunnyvale-Santa Clara, CA', method='PUT', page='NextSong', registration=1541016707796.0, sessionId=583, song='Sehr kosmisch', status=200, ts=datetime.datetime(2018, 11, 15, 0, 30, 26, 796000), userAgent='"Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Ubuntu Chromium/36.0.1985.125 Chrome/36.0.1985.125 Safari/537.36"', userId='26', date=datetime.date(2018, 11, 15))

In [21]:
# extract columns to create time table
clean_ts.createOrReplaceTempView("clean")
time_table = spark.sql('''
    SELECT ts AS start_time, 
        date_format(date,'YYYY') AS year,
        date_format(date,'MM') AS month,
        date_format(date,'dd') AS day,
        date_format(date,'w') AS week,
        date_format(ts,'E') AS weekday,
        HOUR(ts) AS hour
    FROM clean
    LIMIT 5
''')
time_table.show()

+--------------------+----+-----+---+----+-------+----+
|          start_time|year|month|day|week|weekday|hour|
+--------------------+----+-----+---+----+-------+----+
|2018-11-15 00:30:...|2018|   11| 15|  46|    Thu|   0|
|2018-11-15 00:41:...|2018|   11| 15|  46|    Thu|   0|
|2018-11-15 00:45:...|2018|   11| 15|  46|    Thu|   0|
|2018-11-15 03:44:...|2018|   11| 15|  46|    Thu|   3|
|2018-11-15 05:48:...|2018|   11| 15|  46|    Thu|   5|
+--------------------+----+-----+---+----+-------+----+



In [43]:
# read in song data to use for songplays table
song_df = input_data + "song_data/*/*/*/*.json"
song_df = spark.read.json(song_data, schema=songsSchema)
song_df.createOrReplaceTempView("songs_data_table")

In [53]:
artists_table = spark.sql('''
    SELECT DISTINCT artist_id, artist_name AS name, artist_location AS location, artist_latitude AS latitude, artist_longitude AS longitude
    FROM songs_data_table
''')
artists_table.createOrReplaceTempView("artists")


In [63]:
from pyspark.sql.functions import monotonically_increasing_id

In [64]:
songplays_table = spark.sql('''
    SELECT 
        l.ts AS start_time,
        l.userId AS user_id,
        l.level,
        s.song_id,
        a.artist_id,
        l.sessionId AS session_id,
        l.location,
        l.userAgent AS user_agent
    FROM clean AS l
    JOIN songs_data_table AS s 
    ON (l.song = s.title AND l.artist = s.artist_name)  
    JOIN artists AS a ON a.artist_id=s.artist_id
    LIMIT 5
''')
songplays_table = songplays_table.withColumn("songplay_id", monotonically_increasing_id())
songplays_table.show()

+--------------------+-------+-----+------------------+------------------+----------+--------------------+--------------------+-----------+
|          start_time|user_id|level|           song_id|         artist_id|session_id|            location|          user_agent|songplay_id|
+--------------------+-------+-----+------------------+------------------+----------+--------------------+--------------------+-----------+
|2018-11-21 21:56:...|     15| paid|SOZCTXZ12AB0182364|AR5KOSW1187FB35FF4|       818|Chicago-Napervill...|"Mozilla/5.0 (X11...|          0|
+--------------------+-------+-----+------------------+------------------+----------+--------------------+--------------------+-----------+

