In [2]:
import findspark
findspark.init()

In [179]:
from pyspark.sql import SparkSession
from pyspark.sql.window import Window
import pyspark.sql.functions as F

In [6]:
spark = SparkSession.Builder().master('local').getOrCreate()

## Get song_data files

In [8]:
import os
import glob

In [38]:
os.getcwd()

'C:\\Users\\xingy\\workspace\\ipynb'

In [40]:
song_data_files = []
for root, dir, file in os.walk("data/song_data"):
    
    paths = os.path.abspath( os.path.join(root, "*.json") )
    files = glob.glob(paths)
    for file in files:
        song_data_files.append(file)
        
len(song_data_files)    

70

## Get log_data file

In [44]:
log_data_files = []
for root, dir, file in os.walk("data/log_data"):
    paths = os.path.abspath(os.path.join(root, "*.json"))
    files = glob.glob(paths)
    for file in files:
        log_data_files.append(file)
len(log_data_files)

30

## Get event_data file

In [46]:
event_data_files = []
for root, dir, file, in os.walk("event_data"):
    paths = os.path.abspath(os.path.join(root, "*.csv"))
    files = glob.glob(paths)
    for file in files:
        event_data_files.append(file)
len(event_data_files)

30

## Load song_data to spark sql Data Frame

In [69]:
song_df = spark.read.json(song_data_files)
song_df.count()

70

In [70]:
song_df.show(1, False)

+------------------+---------------+---------------+----------------+----------------------------------------------------------------------------------------------+---------+---------+------------------+-------------------------------+----+
|artist_id         |artist_latitude|artist_location|artist_longitude|artist_name                                                                                   |duration |num_songs|song_id           |title                          |year|
+------------------+---------------+---------------+----------------+----------------------------------------------------------------------------------------------+---------+---------+------------------+-------------------------------+----+
|ARDR4AC1187FB371A1|null           |               |null            |Montserrat Caballé;Placido Domingo;Vicente Sardinero;Judith Blegen;Sherrill Milnes;Georg Solti|511.16363|1        |SOBAYLL12A8C138AF9|Sono andati? Fingevo di dormire|0   |
+------------------+---------------+

## Load log_data to Spark SQL Data Frame

In [65]:
log_df = spark.read.json(log_data_files)

In [66]:
log_df.count()

8056

In [89]:
log_df.show(2, True)

+-----------+---------+---------+------+-------------+--------+---------+-----+--------------------+------+--------+-----------------+---------+---------------+------+-------------+--------------------+------+
|     artist|     auth|firstName|gender|itemInSession|lastName|   length|level|            location|method|    page|     registration|sessionId|           song|status|           ts|           userAgent|userId|
+-----------+---------+---------+------+-------------+--------+---------+-----+--------------------+------+--------+-----------------+---------+---------------+------+-------------+--------------------+------+
|   Harmonia|Logged In|     Ryan|     M|            0|   Smith|655.77751| free|San Jose-Sunnyval...|   PUT|NextSong|1.541016707796E12|      583|  Sehr kosmisch|   200|1542241826796|"Mozilla/5.0 (X11...|    26|
|The Prodigy|Logged In|     Ryan|     M|            1|   Smith|260.07465| free|San Jose-Sunnyval...|   PUT|NextSong|1.541016707796E12|      583|The Big Gundown|

In [68]:
log_df.head()

Row(artist='Harmonia', auth='Logged In', firstName='Ryan', gender='M', itemInSession=0, lastName='Smith', length=655.77751, level='free', location='San Jose-Sunnyvale-Santa Clara, CA', method='PUT', page='NextSong', registration=1541016707796.0, sessionId=583, song='Sehr kosmisch', status=200, ts=1542241826796, userAgent='"Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Ubuntu Chromium/36.0.1985.125 Chrome/36.0.1985.125 Safari/537.36"', userId='26')

## Load event_data to Spark SQL Data Frame

In [58]:
event_data = spark.read.csv(event_data_files)
event_data.count()

8086

In [61]:
event_data.show(2, False)

+--------+---------+---------+------+-------------+--------+---------+-----+----------------------------------+------+--------+------------+---------+-------------+------+-----------+------+
|_c0     |_c1      |_c2      |_c3   |_c4          |_c5     |_c6      |_c7  |_c8                               |_c9   |_c10    |_c11        |_c12     |_c13         |_c14  |_c15       |_c16  |
+--------+---------+---------+------+-------------+--------+---------+-----+----------------------------------+------+--------+------------+---------+-------------+------+-----------+------+
|artist  |auth     |firstName|gender|itemInSession|lastName|length   |level|location                          |method|page    |registration|sessionId|song         |status|ts         |userId|
|Harmonia|Logged In|Ryan     |M     |0            |Smith   |655.77751|free |San Jose-Sunnyvale-Santa Clara, CA|PUT   |NextSong|1.54102E+12 |583      |Sehr kosmisch|200   |1.54224E+12|26    |
+--------+---------+---------+------+--------

In [63]:
event_data.head()

Row(_c0='artist', _c1='auth', _c2='firstName', _c3='gender', _c4='itemInSession', _c5='lastName', _c6='length', _c7='level', _c8='location', _c9='method', _c10='page', _c11='registration', _c12='sessionId', _c13='song', _c14='status', _c15='ts', _c16='userId')

In [64]:
type(event_data)

pyspark.sql.dataframe.DataFrame

## Processing Song data

### Create song_table

In [74]:
song_df.columns

['artist_id',
 'artist_latitude',
 'artist_location',
 'artist_longitude',
 'artist_name',
 'duration',
 'num_songs',
 'song_id',
 'title',
 'year']

In [82]:
song_table = song_df.select(['song_id', 'title', 'artist_id', 'year', 'duration']).distinct()
print(song_table.count())
song_table.show(5, False)

70
+------------------+---------------------------------------------------+------------------+----+---------+
|song_id           |title                                              |artist_id         |year|duration |
+------------------+---------------------------------------------------+------------------+----+---------+
|SOGOSOV12AF72A285E|¿Dónde va Chichi?                                  |ARGUVEV1187B98BA17|1997|313.12934|
|SOTTDKS12AB018D69B|It Wont Be Christmas                               |ARMBR4Y1187B9990EB|0   |241.47546|
|SOBBUGU12A8C13E95D|Setting Fire to Sleeping Giants                    |ARMAC4T1187FB3FA4C|2004|207.77751|
|SOIAZJW12AB01853F1|Pink World                                         |AR8ZCNI1187B9A069B|1984|269.81832|
|SONYPOM12A8C13B2D7|I Think My Wife Is Running Around On Me (Taco Hell)|ARDNS031187B9924F0|2005|186.48771|
+------------------+---------------------------------------------------+------------------+----+---------+
only showing top 5 rows



In [None]:
artists_table = song_df.select(["artist_id", "artist_name", "artist_location", "artist_latitude", "artist_longitude"]).distinct()
artists_table.show(5, truncate = False)

### Create artist_table

In [78]:
song_df.columns

['artist_id',
 'artist_latitude',
 'artist_location',
 'artist_longitude',
 'artist_name',
 'duration',
 'num_songs',
 'song_id',
 'title',
 'year']

In [86]:
artist_table = song_df.select(['artist_id', 'artist_name', 'artist_location', 'artist_latitude', 'artist_longitude']).distinct()
print(artist_table.count())
artist_table.show(5, False)

68
+------------------+---------------+---------------+---------------+----------------+
|artist_id         |artist_name    |artist_location|artist_latitude|artist_longitude|
+------------------+---------------+---------------+---------------+----------------+
|AR3JMC51187B9AE49D|Backstreet Boys|Orlando, FL    |28.53823       |-81.37739       |
|AR0IAWL1187B9A96D0|Danilo Perez   |Panama         |8.4177         |-80.11278       |
|ARWB3G61187FB49404|Steve Morse    |Hamilton, Ohio |null           |null            |
|AR47JEX1187B995D81|SUE THOMPSON   |Nevada, MO     |37.83721       |-94.35868       |
|ARHHO3O1187B989413|Bob Azzam      |               |null           |null            |
+------------------+---------------+---------------+---------------+----------------+
only showing top 5 rows



In [87]:
song_df.count()

70

### filter by actions for song plays

In [90]:
log_df.columns

['artist',
 'auth',
 'firstName',
 'gender',
 'itemInSession',
 'lastName',
 'length',
 'level',
 'location',
 'method',
 'page',
 'registration',
 'sessionId',
 'song',
 'status',
 'ts',
 'userAgent',
 'userId']

In [96]:
log_df.count()

8056

In [98]:
log_df = log_df.where('page == "NextSong" ')
log_df.count()

6820

In [99]:
log_df.show(2,True)

+-----------+---------+---------+------+-------------+--------+---------+-----+--------------------+------+--------+-----------------+---------+---------------+------+-------------+--------------------+------+
|     artist|     auth|firstName|gender|itemInSession|lastName|   length|level|            location|method|    page|     registration|sessionId|           song|status|           ts|           userAgent|userId|
+-----------+---------+---------+------+-------------+--------+---------+-----+--------------------+------+--------+-----------------+---------+---------------+------+-------------+--------------------+------+
|   Harmonia|Logged In|     Ryan|     M|            0|   Smith|655.77751| free|San Jose-Sunnyval...|   PUT|NextSong|1.541016707796E12|      583|  Sehr kosmisch|   200|1542241826796|"Mozilla/5.0 (X11...|    26|
|The Prodigy|Logged In|     Ryan|     M|            1|   Smith|260.07465| free|San Jose-Sunnyval...|   PUT|NextSong|1.541016707796E12|      583|The Big Gundown|

### Create users table

In [None]:
users_table = log_df.select(["userId", "firstName", "lastName", "gender", "level"]).distinct()
users_table.show(5, truncate = False)

In [100]:
log_df.columns

['artist',
 'auth',
 'firstName',
 'gender',
 'itemInSession',
 'lastName',
 'length',
 'level',
 'location',
 'method',
 'page',
 'registration',
 'sessionId',
 'song',
 'status',
 'ts',
 'userAgent',
 'userId']

In [108]:
users_table = log_df.select(['userId', 'firstName', 'lastName', 'gender', 'level']).distinct()
print(users_table.count())
users_table.show(5, False)

104
+------+---------+--------+------+-----+
|userId|firstName|lastName|gender|level|
+------+---------+--------+------+-----+
|26    |Ryan     |Smith   |M     |free |
|61    |Samuel   |Gonzalez|M     |free |
|80    |Tegan    |Levine  |F     |paid |
|15    |Lily     |Koch    |F     |paid |
|49    |Chloe    |Cuevas  |F     |paid |
+------+---------+--------+------+-----+
only showing top 5 rows



## Create time_table

In [109]:
log_df.columns

['artist',
 'auth',
 'firstName',
 'gender',
 'itemInSession',
 'lastName',
 'length',
 'level',
 'location',
 'method',
 'page',
 'registration',
 'sessionId',
 'song',
 'status',
 'ts',
 'userAgent',
 'userId']

In [134]:
log_df = log_df.withColumn( 'timestamp', F.to_timestamp(log_df.ts/1000) )
log_df.select('ts', 'timestamp').show(5, False)

+-------------+-----------------------+
|ts           |timestamp              |
+-------------+-----------------------+
|1542241826796|2018-11-15 01:30:26.796|
|1542242481796|2018-11-15 01:41:21.796|
|1542242741796|2018-11-15 01:45:41.796|
|1542253449796|2018-11-15 04:44:09.796|
|1542260935796|2018-11-15 06:48:55.796|
+-------------+-----------------------+
only showing top 5 rows



In [133]:
log_df.columns

['artist',
 'auth',
 'firstName',
 'gender',
 'itemInSession',
 'lastName',
 'length',
 'level',
 'location',
 'method',
 'page',
 'registration',
 'sessionId',
 'song',
 'status',
 'ts',
 'userAgent',
 'userId',
 'timestamp']

In [169]:
time_table = log_df.select(
'timestamp',
F.year(log_df.timestamp).alias('year'),
F.month('timestamp').alias('month'),
F.dayofmonth('timestamp').alias('day'),
F.hour('timestamp').alias('hour'),
F.minute('timestamp').alias('minute'),
F.second('timestamp').alias('second'),
F.weekofyear('timestamp').alias('week'),
F.date_format('timestamp','E').alias('weekday'), 
) 

In [170]:
time_table.show(5, False)

+-----------------------+----+-----+---+----+------+------+----+-------+
|timestamp              |year|month|day|hour|minute|second|week|weekday|
+-----------------------+----+-----+---+----+------+------+----+-------+
|2018-11-15 01:30:26.796|2018|11   |15 |1   |30    |26    |46  |Thu    |
|2018-11-15 01:41:21.796|2018|11   |15 |1   |41    |21    |46  |Thu    |
|2018-11-15 01:45:41.796|2018|11   |15 |1   |45    |41    |46  |Thu    |
|2018-11-15 04:44:09.796|2018|11   |15 |4   |44    |9     |46  |Thu    |
|2018-11-15 06:48:55.796|2018|11   |15 |6   |48    |55    |46  |Thu    |
+-----------------------+----+-----+---+----+------+------+----+-------+
only showing top 5 rows



## Create songplays_table

In [171]:
log_df.columns

['artist',
 'auth',
 'firstName',
 'gender',
 'itemInSession',
 'lastName',
 'length',
 'level',
 'location',
 'method',
 'page',
 'registration',
 'sessionId',
 'song',
 'status',
 'ts',
 'userAgent',
 'userId',
 'timestamp']

In [172]:
song_df.columns

['artist_id',
 'artist_latitude',
 'artist_location',
 'artist_longitude',
 'artist_name',
 'duration',
 'num_songs',
 'song_id',
 'title',
 'year']

### Join song_df and log_df

In [194]:
song_log_joined_table = log_df.join(song_df, (log_df.song == song_df.title) & \
                                    (log_df.artist == song_df.artist_name) & \
                                    (log_df.length == song_df.duration), how='inner')

In [195]:
song_log_joined_table.count()

1

In [196]:
songplays_table = song_log_joined_table.distinct() \
            .select("userId", "timestamp", "song_id", "artist_id", "level", "sessionId", "location", "userAgent" ) \
            .withColumn("songplay_id", F.row_number().over( Window.partitionBy("timestamp").orderBy("timestamp"))) \
            .withColumnRenamed("userId","user_id")        \
            .withColumnRenamed("timestamp","start_time")  \
            .withColumnRenamed("sessionId","session_id")  \
            .withColumnRenamed("userAgent", "user_agent") \
                    

In [197]:
songplays_table.select(['songplay_id', 'user_id', 'start_time','song_id', 'artist_id', 'level', 'session_id']).show(5, False)

+-----------+-------+-----------------------+------------------+------------------+-----+----------+
|songplay_id|user_id|start_time             |song_id           |artist_id         |level|session_id|
+-----------+-------+-----------------------+------------------+------------------+-----+----------+
|1          |15     |2018-11-21 22:56:47.796|SOZCTXZ12AB0182364|AR5KOSW1187FB35FF4|paid |818       |
+-----------+-------+-----------------------+------------------+------------------+-----+----------+

