In [1]:
# Parse app's configurations
import configparser

config = configparser.ConfigParser()
config.read('dl.cfg')

AWS_KEY = config.get("AWS", "KEY")
AWS_SECRET = config.get("AWS", "SECRET")

LOG_DATA = config.get("S3", "LOG_DATA")
SONGS_DATA = config.get("S3", "SONGS_DATA")

OUTPUT_PATH = config.get("S3", "PROCESSED_DATA_PATH")

In [None]:
from pyspark.sql import SparkSession
from pyspark import SparkConf
from pyspark.sql import functions as F, Window
from pyspark.sql import types as T

In [None]:
# Config & Initialize PySpark Session
spark_cfg = SparkConf()

# Set AWS Credentials, letting Spark access to S3 services
spark_cfg.set('fs.s3a.access.key', AWS_KEY)
spark_cfg.set('fs.s3a.secret.key', AWS_SECRET)

# Config Spark's underlying Hadoop, letting Spark read/write to S3
spark_cfg.set("spark.jars.packages", "org.apache.hadoop:hadoop-aws:3.2.2")
spark_cfg.set('fs.s3a.impl', 'org.apache.hadoop.fs.s3a.S3AFileSystem')
spark_cfg.set("fs.s3a.aws.credentials.provider", "org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider")
spark_cfg.set("fs.s3a.connection.ssl.enabled", "true")

spark = SparkSession\
        .builder\
        .config(conf=spark_cfg)\
        .appName('Dataset Explorer')\
        .getOrCreate()

In [2]:
# Extract log data from S3
log_schema = T.StructType([
    T.StructField('artist', T.StringType()),
    T.StructField('auth', T.StringType()),
    T.StructField('firstName', T.StringType()),
    T.StructField('gender', T.StringType()),
    T.StructField('itemInSession', T.IntegerType()),
    T.StructField('lastName', T.StringType()),
    T.StructField('length', T.FloatType()),
    T.StructField('level', T.StringType()),
    T.StructField('location', T.StringType()),
    T.StructField('method', T.StringType()),
    T.StructField('page', T.StringType()),
    T.StructField('registration', T.DoubleType()),
    T.StructField('sessionId', T.StringType()),
    T.StructField('song', T.StringType()),
    T.StructField('status', T.IntegerType()),
    T.StructField('ts', T.LongType()),
    T.StructField('userAgent', T.StringType()),
    T.StructField('userId', T.StringType())
])

df_log_staging = spark.read.options(header=True, recursiveFileLookup=True).json(LOG_DATA, schema=log_schema)

print('Total rows: ' + str(df_log_staging.count()))
df_log_staging.printSchema()
df_log_staging.limit(5).toPandas()

Total rows: 8056
root
 |-- artist: string (nullable = true)
 |-- auth: string (nullable = true)
 |-- firstName: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- itemInSession: integer (nullable = true)
 |-- lastName: string (nullable = true)
 |-- length: float (nullable = true)
 |-- level: string (nullable = true)
 |-- location: string (nullable = true)
 |-- method: string (nullable = true)
 |-- page: string (nullable = true)
 |-- registration: double (nullable = true)
 |-- sessionId: string (nullable = true)
 |-- song: string (nullable = true)
 |-- status: integer (nullable = true)
 |-- ts: long (nullable = true)
 |-- userAgent: string (nullable = true)
 |-- userId: string (nullable = true)



Unnamed: 0,artist,auth,firstName,gender,itemInSession,lastName,length,level,location,method,page,registration,sessionId,song,status,ts,userAgent,userId
0,Harmonia,Logged In,Ryan,M,0,Smith,655.777527,free,"San Jose-Sunnyvale-Santa Clara, CA",PUT,NextSong,1541017000000.0,583,Sehr kosmisch,200,1542241826796,"""Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/5...",26
1,The Prodigy,Logged In,Ryan,M,1,Smith,260.074646,free,"San Jose-Sunnyvale-Santa Clara, CA",PUT,NextSong,1541017000000.0,583,The Big Gundown,200,1542242481796,"""Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/5...",26
2,Train,Logged In,Ryan,M,2,Smith,205.452606,free,"San Jose-Sunnyvale-Santa Clara, CA",PUT,NextSong,1541017000000.0,583,Marry Me,200,1542242741796,"""Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/5...",26
3,,Logged In,Wyatt,M,0,Scott,,free,"Eureka-Arcata-Fortuna, CA",GET,Home,1540872000000.0,563,,200,1542247071796,Mozilla/5.0 (Windows NT 6.1; WOW64; Trident/7....,9
4,,Logged In,Austin,M,0,Rosales,,free,"New York-Newark-Jersey City, NY-NJ-PA",GET,Home,1541060000000.0,521,,200,1542252577796,Mozilla/5.0 (Windows NT 6.1; rv:31.0) Gecko/20...,12


In [3]:
# Extract songs data from S3
song_schema = T.StructType([
    T.StructField('artist_id', T.StringType()),
    T.StructField('artist_latitude', T.DoubleType()),
    T.StructField('artist_location', T.StringType()),
    T.StructField('artist_longitude', T.DoubleType()),
    T.StructField('artist_name', T.StringType()),
    T.StructField('duration', T.DoubleType()),
    T.StructField('num_songs', T.IntegerType()),
    T.StructField('song_id', T.StringType()),
    T.StructField('title', T.StringType()),
    T.StructField('year', T.IntegerType())
])

df_songs_staging = spark.read.options(header=True, recursiveFileLookup=True).json(SONGS_DATA, schema=song_schema)

print('Total rows: ' + str(df_songs_staging.count()))
df_songs_staging.printSchema()
df_songs_staging.limit(5).toPandas()

Total rows: 604
root
 |-- artist_id: string (nullable = true)
 |-- artist_latitude: double (nullable = true)
 |-- artist_location: string (nullable = true)
 |-- artist_longitude: double (nullable = true)
 |-- artist_name: string (nullable = true)
 |-- duration: double (nullable = true)
 |-- num_songs: integer (nullable = true)
 |-- song_id: string (nullable = true)
 |-- title: string (nullable = true)
 |-- year: integer (nullable = true)



Unnamed: 0,artist_id,artist_latitude,artist_location,artist_longitude,artist_name,duration,num_songs,song_id,title,year
0,ARSUVLW12454A4C8B8,35.83073,Tennessee,-85.97874,Royal Philharmonic Orchestra/Sir Thomas Beecham,94.56281,1,SOBTCUI12A8AE48B70,Faust: Ballet Music (1959 Digital Remaster): V...,0
1,ARXQC081187FB4AD42,54.31407,UK,-2.23001,William Shatner_ David Itkin_ The Arkansas Sym...,1047.71873,1,SOXRPUH12AB017F769,Exodus: Part I: Moses and Pharaoh,0
2,ARWUNH81187FB4A3E0,,"Miami , Florida",,Trick Daddy,227.10812,1,SOVNKJI12A8C13CB0D,Take It To Da House (Featuring The Slip N' Sli...,2001
3,ARTC1LV1187B9A4858,51.4536,"Goldsmith's College, Lewisham, Lo",-0.01802,The Bonzo Dog Band,301.40036,1,SOAFBCP12A8C13CC7D,King Of Scurf (2007 Digital Remaster),1972
4,ARA23XO1187B9AF18F,40.57885,"Carteret, New Jersey",-74.21956,The Smithereens,192.522,1,SOKTJDS12AF72A25E5,Drown In My Own Tears (24-Bit Digitally Remast...,0


In [4]:
# Extract & Transform 'songplays' fact table
join_cond = [df_log_staging.artist == df_songs_staging.artist_name, df_log_staging.song == df_songs_staging.title]
df_songplays = df_log_staging.where(df_log_staging.page == 'NextSong').join(df_songs_staging, join_cond)\
.select(
    F.from_unixtime(df_log_staging.ts / 1000).alias('start_time'),
    df_log_staging.userId.alias('user_id'),
    df_log_staging.level.alias('level'),
    df_songs_staging.song_id.alias('song_id'),
    df_songs_staging.artist_id.alias('artist_id'),
    df_log_staging.sessionId.alias('session_id'),
    df_log_staging.location.alias('location'),
    df_log_staging.userAgent.alias('user_agent'),
    F.month(F.from_unixtime(df_log_staging.ts / 1000)).alias('month'),
    F.year(F.from_unixtime(df_log_staging.ts / 1000)).alias('year')
)\
.withColumn('songplay_id', F.row_number().over(Window.orderBy('start_time')))

df_songplays.limit(5).toPandas()

Unnamed: 0,start_time,user_id,level,song_id,artist_id,session_id,location,user_agent,month,year,songplay_id
0,1541279668796,49,free,SOFVOQL12A6D4F7456,ARPN0Y61187B9ABAA0,195,"San Francisco-Oakland-Hayward, CA",Mozilla/5.0 (Windows NT 5.1; rv:31.0) Gecko/20...,11,2018,1
1,1541440182796,73,paid,SOHDWWH12A6D4F7F6A,ARC0IOF1187FB3F6E6,255,"Tampa-St. Petersburg-Clearwater, FL","""Mozilla/5.0 (Macintosh; Intel Mac OS X 10_9_4...",11,2018,2
2,1541689317796,29,paid,SOFVOQL12A6D4F7456,ARPN0Y61187B9ABAA0,372,"Atlanta-Sandy Springs-Roswell, GA","""Mozilla/5.0 (Macintosh; Intel Mac OS X 10_9_4...",11,2018,3
3,1541786100796,80,paid,SOAOJYY12A58A7B2F9,ARFVYJI1187B9B8E13,416,"Portland-South Portland, ME","""Mozilla/5.0 (Macintosh; Intel Mac OS X 10_9_4...",11,2018,4
4,1541793477796,36,paid,SODWXQV12A6310F10D,AR6892W1187B9AC71B,392,"Janesville-Beloit, WI","""Mozilla/5.0 (Windows NT 5.1) AppleWebKit/537....",11,2018,5


In [5]:
# Extract & Transform 'users' dimension table
win_spec = Window.partitionBy('userId').orderBy(F.desc('ts'))
df_users = df_log_staging\
    .filter((df_log_staging.userId.isNotNull()) & (df_log_staging.userId != ""))\
    .select(
        df_log_staging.userId.alias('user_id'),
        df_log_staging.firstName.alias('first_name'),
        df_log_staging.lastName.alias('last_name'),
        df_log_staging.gender.alias('gender'),
        df_log_staging.level.alias('level'),
        F.row_number().over(win_spec).alias('ts_order')
    )\
    .filter('ts_order==1')\
    .drop('ts_order')


print(df_users.count())
df_users.limit(5).toPandas()

97


Unnamed: 0,user_id,first_name,last_name,gender,level
0,51,Maia,Burke,F,free
1,7,Adelyn,Jordan,F,free
2,15,Lily,Koch,F,paid
3,54,Kaleb,Cook,M,free
4,101,Jayden,Fox,M,free


In [6]:
# Extract & Transform 'songs' dimension table
df_songs = df_songs_staging\
    .filter((df_songs_staging.song_id.isNotNull()) & (df_songs_staging.song_id != ""))\
    .select(
        F.col('song_id'),
        F.col('title'),
        F.col('artist_id'),
        F.col('year'),
        F.col('duration')
    )\
    .dropDuplicates()

print(df_songs.count())
df_songs.limit(5).toPandas()

604


Unnamed: 0,song_id,title,artist_id,year,duration
0,SOGQTRZ12A8C13C8B0,I Can't Stop Loving You,ARF3GX71187FB3EB66,2005,261.95546
1,SOJRMKC12A8C13AD5E,Sick,AR7FMHB1187FB443E9,2007,189.88363
2,SODORIU12A6D4F84BB,Lifeline,ARYAUMZ1187B9A2A40,1983,199.13098
3,SOVXMTN12A8C135A18,OUTE ENA EFHARISTO,ARNQAVF11F4C844C04,0,303.09832
4,SONJKPC12A8C143404,Cihangir,AR4RY4E1187B9912E5,2008,333.84444


In [7]:
# Extract & Transform 'artists' dimension table
df_artists = df_songs_staging\
    .filter((df_songs_staging.artist_id.isNotNull()) & (df_songs_staging.artist_id != ""))\
    .select(
        F.col('artist_id'),
        F.col('artist_name').alias('name'),
        F.col('artist_location').alias('location'),
        F.col('artist_latitude').alias('latitude'),
        F.col('artist_longitude').alias('longitude')
    )\
    .dropDuplicates()

print(df_artists.count())
df_artists.limit(5).toPandas()

591


Unnamed: 0,artist_id,name,location,latitude,longitude
0,ARSVTNL1187B992A91,Jonathan King,"London, England",51.50632,-0.12714
1,ARY0RQP1187FB48B93,Mickey 3D,,,
2,ARPIKA31187FB4C233,The Action,New York,40.71455,-74.00712
3,ARX16TQ1187B9899C9,Oysterhead,"New Orleans, LA",29.95369,-90.07771
4,ARE50SC1187B98C04A,The 69 Eyes,"Helsinki, Finland",60.17116,24.93258


In [8]:
# Extract & Transform 'time' dimension table
df_time = df_log_staging\
    .distinct()\
    .select(
        F.from_unixtime(F.col('ts') / 1000).alias('start_time'),
        F.hour(F.from_unixtime(F.col('ts') / 1000)).alias('hour'),
        F.dayofmonth(F.from_unixtime(F.col('ts') / 1000)).alias('day'),
        F.weekofyear(F.from_unixtime(F.col('ts') / 1000)).alias('week'),
        F.month(F.from_unixtime(F.col('ts') / 1000)).alias('month'),
        F.year(F.from_unixtime(F.col('ts') / 1000)).alias('year'),
        F.dayofweek(F.from_unixtime(F.col('ts') / 1000)).alias('weekday')
    )

print(df_time.count())
df_time.printSchema()
df_time.limit(5).toPandas()

8056
root
 |-- timestamp: string (nullable = true)
 |-- hour: integer (nullable = true)
 |-- day: integer (nullable = true)
 |-- week: integer (nullable = true)
 |-- month: integer (nullable = true)
 |-- year: integer (nullable = true)
 |-- weekday: integer (nullable = true)



Unnamed: 0,timestamp,hour,day,week,month,year,weekday
0,2018-11-15 04:37:47,4,15,46,11,2018,5
1,2018-11-15 08:51:56,8,15,46,11,2018,5
2,2018-11-15 09:49:22,9,15,46,11,2018,5
3,2018-11-15 13:58:03,13,15,46,11,2018,5
4,2018-11-20 18:18:40,18,20,47,11,2018,3


In [14]:
df_songs.groupBy('year').count().orderBy(F.desc('count')).show()

+----+-----+
|year|count|
+----+-----+
|   0|  198|
|2005|   34|
|2007|   32|
|2006|   32|
|2008|   30|
|2001|   24|
|2004|   24|
|2009|   22|
|2003|   20|
|1997|   18|
|2000|   18|
|1995|   17|
|2002|   16|
|1994|   10|
|1999|    9|
|2010|    8|
|1996|    7|
|1986|    7|
|1987|    7|
|1983|    6|
+----+-----+
only showing top 20 rows



In [None]:
# Load tables back to S3 (Partitioned)
df_songplays.write.partitionBy('year', 'month').option('header', True).parquet(OUTPUT_PATH + 'songplays', mode='overwrite')
df_songs.write.partitionBy('year', 'artist_id').option('header', True).parquet(OUTPUT_PATH + 'songs', mode='overwrite')
df_time.write.partitionBy('year', 'month').option('header', True).parquet(OUTPUT_PATH + 'time', mode='overwrite')
df_users.write.option('header', True).parquet(OUTPUT_PATH + 'users', mode='overwrite')
df_artists.write.option('header', True).parquet(OUTPUT_PATH + 'artists', mode='overwrite')