In [1]:
# !unzip ./data/song-data.zip -d ./data/song_data

In [2]:
# !unzip ./data/log-data.zip -d ./data/log_data

In [1]:
import configparser
from datetime import datetime
import os
from pyspark.sql import SparkSession
import pyspark.sql.functions as F

In [2]:
config = configparser.ConfigParser()
config.read('dl.cfg')

os.environ['AWS_ACCESS_KEY_ID']=config['AWS']['AWS_ACCESS_KEY_ID']
os.environ['AWS_SECRET_ACCESS_KEY']=config['AWS']['AWS_SECRET_ACCESS_KEY']

In [3]:
def create_spark_session():
    spark = SparkSession \
        .builder \
        .config("spark.jars.packages", "org.apache.hadoop:hadoop-aws:2.7.0") \
        .getOrCreate()
    return spark

In [4]:
spark = create_spark_session()

In [5]:
spark

In [6]:
song_data = './data/song_data/*/*/*'
log_data = './data/log_data'

In [7]:
df_song = spark.read.json(song_data)
df_log = spark.read.json(log_data)

In [8]:
print(f"song record count: {df_song.count()}")
print(f"log record count: {df_log.count()}")

song record count: 71
log record count: 8056


In [9]:
df_song.limit(1).show(truncate=False, vertical=True)

-RECORD 0----------------------------------------------------------------------------------------------------------
 artist_id        | ARDR4AC1187FB371A1                                                                             
 artist_latitude  | null                                                                                           
 artist_location  |                                                                                                
 artist_longitude | null                                                                                           
 artist_name      | Montserrat Caballé;Placido Domingo;Vicente Sardinero;Judith Blegen;Sherrill Milnes;Georg Solti 
 duration         | 511.16363                                                                                      
 num_songs        | 1                                                                                              
 song_id          | SOBAYLL12A8C138AF9                                  

In [10]:
df_song.printSchema()

root
 |-- artist_id: string (nullable = true)
 |-- artist_latitude: double (nullable = true)
 |-- artist_location: string (nullable = true)
 |-- artist_longitude: double (nullable = true)
 |-- artist_name: string (nullable = true)
 |-- duration: double (nullable = true)
 |-- num_songs: long (nullable = true)
 |-- song_id: string (nullable = true)
 |-- title: string (nullable = true)
 |-- year: long (nullable = true)



In [12]:
# create song_table
songs_table = df_song\
                .select(['song_id', 'title', 'artist_id', 'year', 'duration'])\
                .dropDuplicates(['song_id'])

In [13]:
songs_table.limit(1).toPandas()

Unnamed: 0,song_id,title,artist_id,year,duration
0,SOGOSOV12AF72A285E,¿Dónde va Chichi?,ARGUVEV1187B98BA17,1997,313.12934


In [14]:
songs_table.printSchema()

root
 |-- song_id: string (nullable = true)
 |-- title: string (nullable = true)
 |-- artist_id: string (nullable = true)
 |-- year: long (nullable = true)
 |-- duration: double (nullable = true)



In [15]:
# write songs_table to parquet files 
songs_table.write.partitionBy('artist_id', 'year').parquet(path='./parquet/song.parquet', mode='overwrite')

In [17]:
# create artists_table
artists_table = df_song\
                .selectExpr('artist_id', 
                            'artist_name AS name', 
                            'artist_location AS location', 
                            'artist_latitude AS latitude', 
                            'artist_longitude AS longtitude')\
                .dropDuplicates(['artist_id'])

In [18]:
artists_table.limit(1).toPandas()

Unnamed: 0,artist_id,name,location,latitude,longtitude
0,AR9AWNF1187B9AB0B4,Kenny G featuring Daryl Hall,"Seattle, Washington USA",,


In [19]:
artists_table.printSchema()

root
 |-- artist_id: string (nullable = true)
 |-- name: string (nullable = true)
 |-- location: string (nullable = true)
 |-- latitude: double (nullable = true)
 |-- longtitude: double (nullable = true)



In [20]:
# write artists_table to parquet files 
artists_table.write.parquet(path='./parquet/artist.parquet', mode='overwrite')

In [21]:
df_log.printSchema()

root
 |-- artist: string (nullable = true)
 |-- auth: string (nullable = true)
 |-- firstName: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- itemInSession: long (nullable = true)
 |-- lastName: string (nullable = true)
 |-- length: double (nullable = true)
 |-- level: string (nullable = true)
 |-- location: string (nullable = true)
 |-- method: string (nullable = true)
 |-- page: string (nullable = true)
 |-- registration: double (nullable = true)
 |-- sessionId: long (nullable = true)
 |-- song: string (nullable = true)
 |-- status: long (nullable = true)
 |-- ts: long (nullable = true)
 |-- userAgent: string (nullable = true)
 |-- userId: string (nullable = true)



In [22]:
df_log.limit(1).show(truncate=False, vertical=True)

-RECORD 0--------------------------------------------------------------------------------------------------------------------------------------------------
 artist        | Harmonia                                                                                                                                  
 auth          | Logged In                                                                                                                                 
 firstName     | Ryan                                                                                                                                      
 gender        | M                                                                                                                                         
 itemInSession | 0                                                                                                                                         
 lastName      | Smith                                          

In [23]:
df_log.count()

8056

In [24]:
df_log.where("page = 'NextSong'").count()

6820

In [25]:
get_datetime = F.udf(lambda x: datetime.fromtimestamp(x/1000).isoformat())
spark.udf.register("get_datetime", get_datetime)

<function __main__.<lambda>(x)>

In [50]:
df_log = df_log.where("page = 'NextSong'").withColumn('songplay_id', F.monotonically_increasing_id())
df_log = df_log.withColumn('start_time', get_datetime('ts').cast('timestamp'))

In [51]:
df_log.printSchema()

root
 |-- artist: string (nullable = true)
 |-- auth: string (nullable = true)
 |-- firstName: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- itemInSession: long (nullable = true)
 |-- lastName: string (nullable = true)
 |-- length: double (nullable = true)
 |-- level: string (nullable = true)
 |-- location: string (nullable = true)
 |-- method: string (nullable = true)
 |-- page: string (nullable = true)
 |-- registration: double (nullable = true)
 |-- sessionId: long (nullable = true)
 |-- song: string (nullable = true)
 |-- status: long (nullable = true)
 |-- ts: long (nullable = true)
 |-- userAgent: string (nullable = true)
 |-- userId: string (nullable = true)
 |-- songplay_id: long (nullable = false)
 |-- start_time: timestamp (nullable = true)



In [52]:
users_table = df_log\
                .selectExpr('userId AS user_id',
                            'firstName AS first_name',
                            'lastName AS last_name',
                            'gender',
                            'level')\
                .dropDuplicates(['user_id'])

In [53]:
users_table.limit(1).toPandas()

Unnamed: 0,user_id,first_name,last_name,gender,level
0,51,Maia,Burke,F,free


In [54]:
users_table.printSchema()

root
 |-- user_id: string (nullable = true)
 |-- first_name: string (nullable = true)
 |-- last_name: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- level: string (nullable = true)



In [55]:
users_table.write.parquet('./parquet/user.parquet', mode='overwrite')

In [56]:
time_table = df_log\
                .select('start_time')\
                .withColumn('hour', F.hour('start_time'))\
                .withColumn('day', F.dayofmonth('start_time'))\
                .withColumn('week', F.weekofyear('start_time'))\
                .withColumn('month', F.month('start_time'))\
                .withColumn('year', F.year('start_time'))\
                .withColumn('weekday', F.dayofweek('start_time'))\
                .dropDuplicates(['start_time'])

In [57]:
time_table.show(5, truncate=False)

+-----------------------+----+---+----+-----+----+-------+
|start_time             |hour|day|week|month|year|weekday|
+-----------------------+----+---+----+-----+----+-------+
|2018-11-21 06:18:12.796|6   |21 |47  |11   |2018|4      |
|2018-11-14 15:20:15.796|15  |14 |46  |11   |2018|4      |
|2018-11-05 16:31:59.796|16  |5  |45  |11   |2018|2      |
|2018-11-13 18:00:26.796|18  |13 |46  |11   |2018|3      |
|2018-11-30 04:32:02.796|4   |30 |48  |11   |2018|6      |
+-----------------------+----+---+----+-----+----+-------+
only showing top 5 rows



In [58]:
time_table.printSchema()

root
 |-- start_time: timestamp (nullable = true)
 |-- hour: integer (nullable = true)
 |-- day: integer (nullable = true)
 |-- week: integer (nullable = true)
 |-- month: integer (nullable = true)
 |-- year: integer (nullable = true)
 |-- weekday: integer (nullable = true)



In [59]:
time_table.write.partitionBy('year', 'month').parquet('./parquet/time.parquet', mode='overwrite')

In [60]:
df_song.limit(1).toPandas()

Unnamed: 0,artist_id,artist_latitude,artist_location,artist_longitude,artist_name,duration,num_songs,song_id,title,year
0,ARDR4AC1187FB371A1,,,,Montserrat Caballé;Placido Domingo;Vicente Sar...,511.16363,1,SOBAYLL12A8C138AF9,Sono andati? Fingevo di dormire,0


In [61]:
df_log.limit(1).toPandas()

Unnamed: 0,artist,auth,firstName,gender,itemInSession,lastName,length,level,location,method,page,registration,sessionId,song,status,ts,userAgent,userId,songplay_id,start_time
0,Harmonia,Logged In,Ryan,M,0,Smith,655.77751,free,"San Jose-Sunnyvale-Santa Clara, CA",PUT,NextSong,1541017000000.0,583,Sehr kosmisch,200,1542241826796,"""Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/5...",26,0,2018-11-15 00:30:26.796


In [62]:
# create song_plays_table
df_song.createOrReplaceTempView('song')
df_log.createOrReplaceTempView('log')

song_play_table = spark.sql('''
                                SELECT 
                                log.songplay_id,
                                log.start_time,
                                log.userId AS user_id,
                                log.level,
                                song.song_id,
                                song.artist_id,
                                log.sessionId AS session_id,
                                log.location,
                                log.userAgent AS user_agent,
                                MONTH(log.start_time) AS month,
                                YEAR(log.start_time) AS year
                                FROM log
                                LEFT JOIN song 
                                 ON song.title = log.song
                                 AND song.artist_name = log.artist
                                WHERE log.page = 'NextSong'
                            ''')

In [63]:
song_play_table.limit(5).toPandas()

Unnamed: 0,songplay_id,start_time,user_id,level,song_id,artist_id,session_id,location,user_agent,month,year
0,0,2018-11-15 00:30:26.796,26,free,,,583,"San Jose-Sunnyvale-Santa Clara, CA","""Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/5...",11,2018
1,1,2018-11-15 00:41:21.796,26,free,,,583,"San Jose-Sunnyvale-Santa Clara, CA","""Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/5...",11,2018
2,2,2018-11-15 00:45:41.796,26,free,,,583,"San Jose-Sunnyvale-Santa Clara, CA","""Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/5...",11,2018
3,3,2018-11-15 03:44:09.796,61,free,,,597,"Houston-The Woodlands-Sugar Land, TX","""Mozilla/5.0 (Macintosh; Intel Mac OS X 10_9_4...",11,2018
4,4,2018-11-15 05:48:55.796,80,paid,,,602,"Portland-South Portland, ME","""Mozilla/5.0 (Macintosh; Intel Mac OS X 10_9_4...",11,2018


In [64]:
song_play_table.printSchema()

root
 |-- songplay_id: long (nullable = false)
 |-- start_time: timestamp (nullable = true)
 |-- user_id: string (nullable = true)
 |-- level: string (nullable = true)
 |-- song_id: string (nullable = true)
 |-- artist_id: string (nullable = true)
 |-- session_id: long (nullable = true)
 |-- location: string (nullable = true)
 |-- user_agent: string (nullable = true)
 |-- month: integer (nullable = true)
 |-- year: integer (nullable = true)



In [65]:
song_play_table.write.partitionBy('year', 'month').parquet('./parquet/songPlay.parquet', mode='overwrite')