In [1]:
import configparser
from datetime import datetime
import os
from pyspark.sql import SparkSession
from pyspark.sql.functions import udf, col, monotonically_increasing_id
from pyspark.sql.functions import year, month, dayofmonth, hour, weekofyear, dayofweek

In [2]:
config = configparser.ConfigParser()
config.read('dl.cfg')

os.environ['AWS_ACCESS_KEY_ID']=config.get('AWS', 'AWS_ACCESS_KEY_ID')
os.environ['AWS_SECRET_ACCESS_KEY']=config.get('AWS', 'AWS_SECRET_ACCESS_KEY')

input_data = config.get('IO', 'INPUT_DATA')
output_data = config.get('IO', 'OUTPUT_DATA')

In [3]:
def create_spark_session():
    spark = SparkSession \
        .builder \
        .config('spark.jars.packages', 'org.apache.hadoop:hadoop-aws:2.7.5') \
        .getOrCreate()
    return spark


In [4]:
spark = create_spark_session()

# Process song data

In [5]:
# get filepath to song data file
song_data = input_data + 'song-data/*/*/*/*.json' 

In [6]:
# read song data file
df = spark.read.json(song_data)

In [7]:
print('Song Data Schema:')
df.printSchema()

Song Data Schema:
root
 |-- artist_id: string (nullable = true)
 |-- artist_latitude: double (nullable = true)
 |-- artist_location: string (nullable = true)
 |-- artist_longitude: double (nullable = true)
 |-- artist_name: string (nullable = true)
 |-- duration: double (nullable = true)
 |-- num_songs: long (nullable = true)
 |-- song_id: string (nullable = true)
 |-- title: string (nullable = true)
 |-- year: long (nullable = true)



In [8]:
# extract columns to create songs table
# songs_table: song_id, title, artist_id, year, duration
songs_table = df.select(df.song_id.alias('song_id'), \
                        df.title.alias('title'), \
                        df.artist_id.alias('artist_id'), \
                        df.year.alias('year'), \
                        df.duration.alias('duration')).dropDuplicates()

In [9]:
songs_table.printSchema()
songs_table.head(1)

root
 |-- song_id: string (nullable = true)
 |-- title: string (nullable = true)
 |-- artist_id: string (nullable = true)
 |-- year: long (nullable = true)
 |-- duration: double (nullable = true)



[Row(song_id='SOGOSOV12AF72A285E', title='¿Dónde va Chichi?', artist_id='ARGUVEV1187B98BA17', year=1997, duration=313.12934)]

In [10]:
# write songs table to parquet files partitioned by year and artist
songs_table.write.partitionBy(['year', 'artist_id']).mode('append').parquet(output_data + '/songs/')

In [11]:
# extract columns to create artists table
# artists_table: artist_id, name, location, lattitude, longitude
artists_table = df.select(df.artist_id.alias('artist_id'), \
                          df.artist_name.alias('artist_name'), \
                          df.artist_location.alias('artist_location'), \
                          df.artist_latitude.alias('artist_latitude'), \
                          df.artist_longitude.alias('artist_longitude')).dropDuplicates()

In [12]:
artists_table.printSchema()
artists_table.head(1)

root
 |-- artist_id: string (nullable = true)
 |-- artist_name: string (nullable = true)
 |-- artist_location: string (nullable = true)
 |-- artist_latitude: double (nullable = true)
 |-- artist_longitude: double (nullable = true)



[Row(artist_id='AR3JMC51187B9AE49D', artist_name='Backstreet Boys', artist_location='Orlando, FL', artist_latitude=28.53823, artist_longitude=-81.37739)]

In [13]:
# write artists table to parquet files
artists_table.write.mode('append').parquet(output_data + '/artists/')

# Process log data

In [14]:
# get filepath to log data file
log_data = input_data + 'log-data'

In [15]:
# read log data file
df = spark.read.json(log_data)

In [16]:
# filter by actions for song plays
df = df[df['page'] == 'NextSong']

In [17]:
# extract columns for users table    
users_table = df.select(df.userId.alias('user_id'), \
                        df.firstName.alias('first_name'), \
                        df.lastName.alias('last_name'), \
                        df.gender.alias('gender'), \
                        df.level.alias('level')).dropDuplicates()

In [18]:
# write users table to parquet files
users_table.write.mode('append').parquet(output_data + '/users/')

In [19]:
users_table.printSchema()
df.head(1)

root
 |-- user_id: string (nullable = true)
 |-- first_name: string (nullable = true)
 |-- last_name: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- level: string (nullable = true)



[Row(artist='Harmonia', auth='Logged In', firstName='Ryan', gender='M', itemInSession=0, lastName='Smith', length=655.77751, level='free', location='San Jose-Sunnyvale-Santa Clara, CA', method='PUT', page='NextSong', registration=1541016707796.0, sessionId=583, song='Sehr kosmisch', status=200, ts=1542241826796, userAgent='"Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Ubuntu Chromium/36.0.1985.125 Chrome/36.0.1985.125 Safari/537.36"', userId='26')]

In [20]:
# create timestamp column from original timestamp column
from pyspark.sql.types import DateType, TimestampType
get_timestamp = udf(lambda d: datetime.fromtimestamp(d/1000.0), TimestampType())

In [21]:
df = df.withColumn('timestamp', get_timestamp('ts'))

In [22]:
# create datetime column from original timestamp column
get_datetime = udf(lambda d: datetime.fromtimestamp(d/1000.0), DateType())
df = df.withColumn('date', get_datetime('ts'))

In [23]:
# extract columns to create time table
# time_table: start_time, hour, day, week, month, year, weekday
time_table = df.select(df.ts.alias('start_time'), \
                       hour(df.timestamp).alias('hour'), \
                       dayofmonth(df.timestamp).alias('day'), \
                       weekofyear(df.timestamp).alias('week'), \
                       month(df.timestamp).alias('month'), \
                       year(df.timestamp).alias('year'), \
                       dayofweek(df.timestamp).alias('weekday')).dropDuplicates()

In [24]:
time_table.printSchema()
time_table.head(1)

root
 |-- start_time: long (nullable = true)
 |-- hour: integer (nullable = true)
 |-- day: integer (nullable = true)
 |-- week: integer (nullable = true)
 |-- month: integer (nullable = true)
 |-- year: integer (nullable = true)
 |-- weekday: integer (nullable = true)



[Row(start_time=1542767375796, hour=0, day=21, week=47, month=11, year=2018, weekday=4)]

In [26]:
# write time table to parquet files partitioned by year and month
time_table.write.partitionBy(['year', 'month']).mode('append').parquet(output_data + '/time/')

In [27]:
# read in song data to use for songplays table
song_df = spark.read.parquet(output_data + '/songs/')

In [28]:
# extract columns from joined song and log datasets to create songplays table
# songplays_table: songplay_id, start_time, user_id, level, song_id, artist_id, session_id, location, user_agent
songplays_table_join = df.join(song_df, (df.song == song_df.title) & \
                                   (df.length == song_df.duration)).dropDuplicates()

In [29]:
songplays_table = songplays_table_join.select(songplays_table_join.ts.alias('start_time'), \
                                              songplays_table_join.userId.alias('user_id'), \
                                              songplays_table_join.level.alias('level'), \
                                              songplays_table_join.song_id.alias('song_id'), \
                                              songplays_table_join.artist_id.alias('artist_id'), \
                                              songplays_table_join.sessionId.alias('session_id'), \
                                              songplays_table_join.location.alias('location'), \
                                              songplays_table_join.userAgent.alias('user_agent'), \
                                              year(songplays_table_join.timestamp).alias('year'), \
                                              month(songplays_table_join.timestamp).alias('month')).dropDuplicates()

songplays_table.withColumn('songplay_id', monotonically_increasing_id()) 

DataFrame[start_time: bigint, user_id: string, level: string, song_id: string, artist_id: string, session_id: bigint, location: string, user_agent: string, year: int, month: int, songplay_id: bigint]

In [30]:
songplays_table.printSchema()
songplays_table.head(1)

root
 |-- start_time: long (nullable = true)
 |-- user_id: string (nullable = true)
 |-- level: string (nullable = true)
 |-- song_id: string (nullable = true)
 |-- artist_id: string (nullable = true)
 |-- session_id: long (nullable = true)
 |-- location: string (nullable = true)
 |-- user_agent: string (nullable = true)
 |-- year: integer (nullable = true)
 |-- month: integer (nullable = true)



[Row(start_time=1542837407796, user_id='15', level='paid', song_id='SOZCTXZ12AB0182364', artist_id='AR5KOSW1187FB35FF4', session_id=818, location='Chicago-Naperville-Elgin, IL-IN-WI', user_agent='"Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Ubuntu Chromium/36.0.1985.125 Chrome/36.0.1985.125 Safari/537.36"', year=2018, month=11)]

In [31]:
# write songplays table to parquet files partitioned by year and month
songplays_table.write.partitionBy(['year', 'month']).mode('append').parquet(output_data + '/songplays/')

In [32]:
print('Songs table schema:')
songs_table.printSchema()
print('Users table schema:')
users_table.printSchema()
print('Artists table schema:')
artists_table.printSchema()
print('Time table schema:')
time_table.printSchema()
print('Songplays table schema:')
songplays_table.printSchema()

Songs table schema:
root
 |-- song_id: string (nullable = true)
 |-- title: string (nullable = true)
 |-- artist_id: string (nullable = true)
 |-- year: long (nullable = true)
 |-- duration: double (nullable = true)

Users table schema:
root
 |-- user_id: string (nullable = true)
 |-- first_name: string (nullable = true)
 |-- last_name: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- level: string (nullable = true)

Artists table schema:
root
 |-- artist_id: string (nullable = true)
 |-- artist_name: string (nullable = true)
 |-- artist_location: string (nullable = true)
 |-- artist_latitude: double (nullable = true)
 |-- artist_longitude: double (nullable = true)

Time table schema:
root
 |-- start_time: long (nullable = true)
 |-- hour: integer (nullable = true)
 |-- day: integer (nullable = true)
 |-- week: integer (nullable = true)
 |-- month: integer (nullable = true)
 |-- year: integer (nullable = true)
 |-- weekday: integer (nullable = true)

Songplays table 