In [76]:
import configparser
from datetime import datetime
import os
from pyspark.sql import SparkSession
from pyspark.sql.functions import udf, col, to_timestamp, to_date
from pyspark.sql.functions import year, month, dayofmonth, hour, weekofyear, date_format, monotonically_increasing_id

In [9]:
spark = SparkSession \
        .builder \
        .config("spark.jars.packages", "org.apache.hadoop:hadoop-aws:2.7.0") \
        .getOrCreate()

In [10]:
songpath= "data/song_data/*/*/*/*.json"

songs_df= spark.read.json(songpath)

In [11]:
songs_df.printSchema()

root
 |-- artist_id: string (nullable = true)
 |-- artist_latitude: double (nullable = true)
 |-- artist_location: string (nullable = true)
 |-- artist_longitude: double (nullable = true)
 |-- artist_name: string (nullable = true)
 |-- duration: double (nullable = true)
 |-- num_songs: long (nullable = true)
 |-- song_id: string (nullable = true)
 |-- title: string (nullable = true)
 |-- year: long (nullable = true)



In [12]:
songs_df.limit(10).toPandas()

Unnamed: 0,artist_id,artist_latitude,artist_location,artist_longitude,artist_name,duration,num_songs,song_id,title,year
0,ARDR4AC1187FB371A1,,,,Montserrat Caballé;Placido Domingo;Vicente Sar...,511.16363,1,SOBAYLL12A8C138AF9,Sono andati? Fingevo di dormire,0
1,AREBBGV1187FB523D2,,"Houston, TX",,Mike Jones (Featuring CJ_ Mello & Lil' Bran),173.66159,1,SOOLYAZ12A6701F4A6,Laws Patrolling (Album Version),0
2,ARMAC4T1187FB3FA4C,40.82624,"Morris Plains, NJ",-74.47995,The Dillinger Escape Plan,207.77751,1,SOBBUGU12A8C13E95D,Setting Fire to Sleeping Giants,2004
3,ARPBNLO1187FB3D52F,40.71455,"New York, NY",-74.00712,Tiny Tim,43.36281,1,SOAOIBZ12AB01815BE,I Hold Your Hand In Mine [Live At Royal Albert...,2000
4,ARDNS031187B9924F0,32.67828,Georgia,-83.22295,Tim Wilson,186.48771,1,SONYPOM12A8C13B2D7,I Think My Wife Is Running Around On Me (Taco ...,2005
5,ARNF6401187FB57032,40.79086,"New York, NY [Manhattan]",-73.96644,Sophie B. Hawkins,305.162,1,SONWXQJ12A8C134D94,The Ballad Of Sleeping Beauty,1994
6,ARLTWXK1187FB5A3F8,32.74863,"Fort Worth, TX",-97.32925,King Curtis,326.00771,1,SODREIN12A58A7F2E5,A Whiter Shade Of Pale (Live @ Fillmore West),0
7,ARPFHN61187FB575F6,41.88415,"Chicago, IL",-87.63241,Lupe Fiasco,279.97995,1,SOWQTQZ12A58A7B63E,Streets On Fire (Explicit Album Version),0
8,ARI2JSK1187FB496EF,51.50632,"London, England",-0.12714,Nick Ingman;Gavyn Wright,111.62077,1,SODUJBS12A8C132150,Wessex Loses a Bride,0
9,AR0RCMP1187FB3F427,30.08615,"Beaumont, TX",-94.10158,Billie Jo Spears,133.32853,1,SOGXHEG12AB018653E,It Makes No Difference Now,1992


In [13]:
song_table_list=["title", "artist_id","year", "duration"]

In [14]:
songs_table= songs_df.select(song_table_list).dropDuplicates()

In [47]:
songs_table.printSchema()
songs_table.limit(10).toPandas()
songs_table.write.parquet('/output/songs/')
songs_table.write.partitionBy("year", "artist_id").parquet(output_data + 'songs/')

root
 |-- title: string (nullable = true)
 |-- artist_id: string (nullable = true)
 |-- year: long (nullable = true)
 |-- duration: double (nullable = true)
 |-- song_id: string (nullable = true)



In [16]:
artist_table_list=["artist_id", "artist_name", "artist_location", "artist_latitude", "artist_longitude"]

In [17]:
artist_table= songs_df.select(artist_table_list).dropDuplicates()

In [18]:
artist_table.printSchema()
artist_table.limit(10).toPandas()

root
 |-- artist_id: string (nullable = true)
 |-- artist_name: string (nullable = true)
 |-- artist_location: string (nullable = true)
 |-- artist_latitude: double (nullable = true)
 |-- artist_longitude: double (nullable = true)



Unnamed: 0,artist_id,artist_name,artist_location,artist_latitude,artist_longitude
0,AR3JMC51187B9AE49D,Backstreet Boys,"Orlando, FL",28.53823,-81.37739
1,AR0IAWL1187B9A96D0,Danilo Perez,Panama,8.4177,-80.11278
2,ARWB3G61187FB49404,Steve Morse,"Hamilton, Ohio",,
3,AR47JEX1187B995D81,SUE THOMPSON,"Nevada, MO",37.83721,-94.35868
4,ARHHO3O1187B989413,Bob Azzam,,,
5,ARAGB2O1187FB3A161,Pucho & His Latin Soul Brothers,,,
6,AREBBGV1187FB523D2,Mike Jones (Featuring CJ_ Mello & Lil' Bran),"Houston, TX",,
7,ARGSAFR1269FB35070,Blingtones,,,
8,AROUOZZ1187B9ABE51,Willie Bobo,"New York, NY [Spanish Harlem]",40.79195,-73.94512
9,AR0RCMP1187FB3F427,Billie Jo Spears,"Beaumont, TX",30.08615,-94.10158


change_name_list=["artist_location", "artist_latitude", "artist_longitude"]
changed_list=["location", "latitude", "longitude"]
#artist_table.withColumnRenamed("'{}'".format(x) for x in change_name_list ,"'{}'".format(y) for y in changed_list).printSchema()
artist_table.withColumnRenamed("'{}'".format("artist_latitude"),"newlat").printSchema()

In [19]:
artist_table.write.parquet('/output/artists/artist.parquet')

In [45]:
artist_df = spark.read.parquet('/output/artists/artist.parquet')
artist_df.limit(5).show()

+------------------+--------------------+--------------------+---------------+----------------+
|         artist_id|         artist_name|     artist_location|artist_latitude|artist_longitude|
+------------------+--------------------+--------------------+---------------+----------------+
|ARNF6401187FB57032|   Sophie B. Hawkins|New York, NY [Man...|       40.79086|       -73.96644|
|AROUOZZ1187B9ABE51|         Willie Bobo|New York, NY [Spa...|       40.79195|       -73.94512|
|AREBBGV1187FB523D2|Mike Jones (Featu...|         Houston, TX|           null|            null|
|ARD842G1187B997376|          Blue Rodeo|Toronto, Ontario,...|       43.64856|       -79.38533|
|ARDR4AC1187FB371A1|Montserrat Caball...|                    |           null|            null|
+------------------+--------------------+--------------------+---------------+----------------+



In [34]:
logfilepath= "data/log_data/*.json"

In [35]:
logfile= spark.read.json(logfilepath)
logfile= logfile.filter(logfile.page== 'NextSong')

In [36]:
logfile.printSchema()
logfile.limit(5).toPandas()

root
 |-- artist: string (nullable = true)
 |-- auth: string (nullable = true)
 |-- firstName: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- itemInSession: long (nullable = true)
 |-- lastName: string (nullable = true)
 |-- length: double (nullable = true)
 |-- level: string (nullable = true)
 |-- location: string (nullable = true)
 |-- method: string (nullable = true)
 |-- page: string (nullable = true)
 |-- registration: double (nullable = true)
 |-- sessionId: long (nullable = true)
 |-- song: string (nullable = true)
 |-- status: long (nullable = true)
 |-- ts: long (nullable = true)
 |-- userAgent: string (nullable = true)
 |-- userId: string (nullable = true)



Unnamed: 0,artist,auth,firstName,gender,itemInSession,lastName,length,level,location,method,page,registration,sessionId,song,status,ts,userAgent,userId
0,Harmonia,Logged In,Ryan,M,0,Smith,655.77751,free,"San Jose-Sunnyvale-Santa Clara, CA",PUT,NextSong,1541017000000.0,583,Sehr kosmisch,200,1542241826796,"""Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/5...",26
1,The Prodigy,Logged In,Ryan,M,1,Smith,260.07465,free,"San Jose-Sunnyvale-Santa Clara, CA",PUT,NextSong,1541017000000.0,583,The Big Gundown,200,1542242481796,"""Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/5...",26
2,Train,Logged In,Ryan,M,2,Smith,205.45261,free,"San Jose-Sunnyvale-Santa Clara, CA",PUT,NextSong,1541017000000.0,583,Marry Me,200,1542242741796,"""Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/5...",26
3,Sony Wonder,Logged In,Samuel,M,0,Gonzalez,218.06975,free,"Houston-The Woodlands-Sugar Land, TX",PUT,NextSong,1540493000000.0,597,Blackbird,200,1542253449796,"""Mozilla/5.0 (Macintosh; Intel Mac OS X 10_9_4...",61
4,Van Halen,Logged In,Tegan,F,2,Levine,289.38404,paid,"Portland-South Portland, ME",PUT,NextSong,1540794000000.0,602,Best Of Both Worlds (Remastered Album Version),200,1542260935796,"""Mozilla/5.0 (Macintosh; Intel Mac OS X 10_9_4...",80


In [53]:
users_fields = ["userdId as user_id", "firstName as first_name", "lastName as last_name", "gender", "level"]


In [71]:
fields= ["userId as user_id", "firstName as first_name", "lastName as last_name", "gender", "level"]
new_df= logfile.selectExpr(fields).dropDuplicates()

In [72]:
new_df.printSchema()

root
 |-- user_id: string (nullable = true)
 |-- first_name: string (nullable = true)
 |-- last_name: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- level: string (nullable = true)



In [41]:
logfile = logfile.withColumn("start_time", to_timestamp(logfile.ts/1000))

In [42]:
logfile.printSchema()
logfile.limit(5).toPandas()

root
 |-- artist: string (nullable = true)
 |-- auth: string (nullable = true)
 |-- firstName: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- itemInSession: long (nullable = true)
 |-- lastName: string (nullable = true)
 |-- length: double (nullable = true)
 |-- level: string (nullable = true)
 |-- location: string (nullable = true)
 |-- method: string (nullable = true)
 |-- page: string (nullable = true)
 |-- registration: double (nullable = true)
 |-- sessionId: long (nullable = true)
 |-- song: string (nullable = true)
 |-- status: long (nullable = true)
 |-- ts: long (nullable = true)
 |-- userAgent: string (nullable = true)
 |-- userId: string (nullable = true)
 |-- start_time: timestamp (nullable = true)



Unnamed: 0,artist,auth,firstName,gender,itemInSession,lastName,length,level,location,method,page,registration,sessionId,song,status,ts,userAgent,userId,start_time
0,Harmonia,Logged In,Ryan,M,0,Smith,655.77751,free,"San Jose-Sunnyvale-Santa Clara, CA",PUT,NextSong,1541017000000.0,583,Sehr kosmisch,200,1542241826796,"""Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/5...",26,2018-11-15 00:30:26.796
1,The Prodigy,Logged In,Ryan,M,1,Smith,260.07465,free,"San Jose-Sunnyvale-Santa Clara, CA",PUT,NextSong,1541017000000.0,583,The Big Gundown,200,1542242481796,"""Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/5...",26,2018-11-15 00:41:21.796
2,Train,Logged In,Ryan,M,2,Smith,205.45261,free,"San Jose-Sunnyvale-Santa Clara, CA",PUT,NextSong,1541017000000.0,583,Marry Me,200,1542242741796,"""Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/5...",26,2018-11-15 00:45:41.796
3,Sony Wonder,Logged In,Samuel,M,0,Gonzalez,218.06975,free,"Houston-The Woodlands-Sugar Land, TX",PUT,NextSong,1540493000000.0,597,Blackbird,200,1542253449796,"""Mozilla/5.0 (Macintosh; Intel Mac OS X 10_9_4...",61,2018-11-15 03:44:09.796
4,Van Halen,Logged In,Tegan,F,2,Levine,289.38404,paid,"Portland-South Portland, ME",PUT,NextSong,1540794000000.0,602,Best Of Both Worlds (Remastered Album Version),200,1542260935796,"""Mozilla/5.0 (Macintosh; Intel Mac OS X 10_9_4...",80,2018-11-15 05:48:55.796


In [43]:
logfile = logfile.withColumn("Date", to_date(logfile.start_time))

In [44]:
logfile.printSchema()
logfile.limit(5).toPandas()

root
 |-- artist: string (nullable = true)
 |-- auth: string (nullable = true)
 |-- firstName: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- itemInSession: long (nullable = true)
 |-- lastName: string (nullable = true)
 |-- length: double (nullable = true)
 |-- level: string (nullable = true)
 |-- location: string (nullable = true)
 |-- method: string (nullable = true)
 |-- page: string (nullable = true)
 |-- registration: double (nullable = true)
 |-- sessionId: long (nullable = true)
 |-- song: string (nullable = true)
 |-- status: long (nullable = true)
 |-- ts: long (nullable = true)
 |-- userAgent: string (nullable = true)
 |-- userId: string (nullable = true)
 |-- start_time: timestamp (nullable = true)
 |-- Date: date (nullable = true)



Unnamed: 0,artist,auth,firstName,gender,itemInSession,lastName,length,level,location,method,page,registration,sessionId,song,status,ts,userAgent,userId,start_time,Date
0,Harmonia,Logged In,Ryan,M,0,Smith,655.77751,free,"San Jose-Sunnyvale-Santa Clara, CA",PUT,NextSong,1541017000000.0,583,Sehr kosmisch,200,1542241826796,"""Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/5...",26,2018-11-15 00:30:26.796,2018-11-15
1,The Prodigy,Logged In,Ryan,M,1,Smith,260.07465,free,"San Jose-Sunnyvale-Santa Clara, CA",PUT,NextSong,1541017000000.0,583,The Big Gundown,200,1542242481796,"""Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/5...",26,2018-11-15 00:41:21.796,2018-11-15
2,Train,Logged In,Ryan,M,2,Smith,205.45261,free,"San Jose-Sunnyvale-Santa Clara, CA",PUT,NextSong,1541017000000.0,583,Marry Me,200,1542242741796,"""Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/5...",26,2018-11-15 00:45:41.796,2018-11-15
3,Sony Wonder,Logged In,Samuel,M,0,Gonzalez,218.06975,free,"Houston-The Woodlands-Sugar Land, TX",PUT,NextSong,1540493000000.0,597,Blackbird,200,1542253449796,"""Mozilla/5.0 (Macintosh; Intel Mac OS X 10_9_4...",61,2018-11-15 03:44:09.796,2018-11-15
4,Van Halen,Logged In,Tegan,F,2,Levine,289.38404,paid,"Portland-South Portland, ME",PUT,NextSong,1540794000000.0,602,Best Of Both Worlds (Remastered Album Version),200,1542260935796,"""Mozilla/5.0 (Macintosh; Intel Mac OS X 10_9_4...",80,2018-11-15 05:48:55.796,2018-11-15


In [52]:
time_table= logfile.selectExpr(['start_time as start_time',
                                'hour(Date) as hour',
                                'dayofmonth(Date) as day',
                                "weekofyear(Date) as week", 
                                "month(Date) as month", 
                                "year(Date) as year", 
                                "dayofweek(Date) as weekday"]).dropDuplicates()

In [53]:
time_table.printSchema()
time_table.limit(5).show()

root
 |-- start_time: timestamp (nullable = true)
 |-- hour: integer (nullable = true)
 |-- day: integer (nullable = true)
 |-- week: integer (nullable = true)
 |-- month: integer (nullable = true)
 |-- year: integer (nullable = true)
 |-- weekday: integer (nullable = true)

+--------------------+----+---+----+-----+----+-------+
|          start_time|hour|day|week|month|year|weekday|
+--------------------+----+---+----+-----+----+-------+
|2018-11-15 12:37:...|   0| 15|  46|   11|2018|      5|
|2018-11-15 14:11:...|   0| 15|  46|   11|2018|      5|
|2018-11-15 17:09:...|   0| 15|  46|   11|2018|      5|
|2018-11-15 20:36:...|   0| 15|  46|   11|2018|      5|
|2018-11-21 07:25:...|   0| 21|  47|   11|2018|      4|
+--------------------+----+---+----+-----+----+-------+



In [49]:
songs_df= spark.read.parquet('/output/songs')
songs_df.limit(5).show()

+--------------------+------------------+----+---------+------------------+
|               title|         artist_id|year| duration|           song_id|
+--------------------+------------------+----+---------+------------------+
|I Hold Your Hand ...|ARPBNLO1187FB3D52F|2000| 43.36281|SOAOIBZ12AB01815BE|
|I Think My Wife I...|ARDNS031187B9924F0|2005|186.48771|SONYPOM12A8C13B2D7|
|A Whiter Shade Of...|ARLTWXK1187FB5A3F8|   0|326.00771|SODREIN12A58A7F2E5|
|The Moon And I (O...|ARKFYS91187B98E58F|   0| 267.7024|SOYMRWW12A6D4FAB14|
|Streets On Fire (...|ARPFHN61187FB575F6|   0|279.97995|SOWQTQZ12A58A7B63E|
+--------------------+------------------+----+---------+------------------+



In [59]:
artist_df = spark.read.parquet('/output/artists/artist.parquet')
artist_df= artist_df.withColumnRenamed('artist_id', 'artist_df_artist_id')
artist_df.limit(5).show()


+-------------------+--------------------+--------------------+---------------+----------------+
|artist_df_artist_id|         artist_name|     artist_location|artist_latitude|artist_longitude|
+-------------------+--------------------+--------------------+---------------+----------------+
| ARNF6401187FB57032|   Sophie B. Hawkins|New York, NY [Man...|       40.79086|       -73.96644|
| AROUOZZ1187B9ABE51|         Willie Bobo|New York, NY [Spa...|       40.79195|       -73.94512|
| AREBBGV1187FB523D2|Mike Jones (Featu...|         Houston, TX|           null|            null|
| ARD842G1187B997376|          Blue Rodeo|Toronto, Ontario,...|       43.64856|       -79.38533|
| ARDR4AC1187FB371A1|Montserrat Caball...|                    |           null|            null|
+-------------------+--------------------+--------------------+---------------+----------------+



In [51]:
logfile.limit(5).show()

+-----------+---------+---------+------+-------------+--------+---------+-----+--------------------+------+--------+-----------------+---------+--------------------+------+-------------+--------------------+------+--------------------+----------+
|     artist|     auth|firstName|gender|itemInSession|lastName|   length|level|            location|method|    page|     registration|sessionId|                song|status|           ts|           userAgent|userId|          start_time|      Date|
+-----------+---------+---------+------+-------------+--------+---------+-----+--------------------+------+--------+-----------------+---------+--------------------+------+-------------+--------------------+------+--------------------+----------+
|   Harmonia|Logged In|     Ryan|     M|            0|   Smith|655.77751| free|San Jose-Sunnyval...|   PUT|NextSong|1.541016707796E12|      583|       Sehr kosmisch|   200|1542241826796|"Mozilla/5.0 (X11...|    26|2018-11-15 00:30:...|2018-11-15|
|The Prodigy

In [60]:
logsongstable= logfile.join(songs_df, logfile.song == songs_df.title)

In [61]:
logsongsartisttable= logsongstable.join(artist_df, logsongstable.artist_id == artist_df.artist_df_artist_id)

In [62]:
logsongsartisttable.printSchema()

root
 |-- artist: string (nullable = true)
 |-- auth: string (nullable = true)
 |-- firstName: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- itemInSession: long (nullable = true)
 |-- lastName: string (nullable = true)
 |-- length: double (nullable = true)
 |-- level: string (nullable = true)
 |-- location: string (nullable = true)
 |-- method: string (nullable = true)
 |-- page: string (nullable = true)
 |-- registration: double (nullable = true)
 |-- sessionId: long (nullable = true)
 |-- song: string (nullable = true)
 |-- status: long (nullable = true)
 |-- ts: long (nullable = true)
 |-- userAgent: string (nullable = true)
 |-- userId: string (nullable = true)
 |-- start_time: timestamp (nullable = true)
 |-- Date: date (nullable = true)
 |-- title: string (nullable = true)
 |-- artist_id: string (nullable = true)
 |-- year: long (nullable = true)
 |-- duration: double (nullable = true)
 |-- song_id: string (nullable = true)
 |-- artist_df_artist_id: string (

In [64]:
logsongsartisttimetable= logsongsartisttable.join(time_table, logsongsartisttable.start_time == time_table.start_time, 'left')

In [65]:
logsongsartisttimetable.printSchema()

root
 |-- artist: string (nullable = true)
 |-- auth: string (nullable = true)
 |-- firstName: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- itemInSession: long (nullable = true)
 |-- lastName: string (nullable = true)
 |-- length: double (nullable = true)
 |-- level: string (nullable = true)
 |-- location: string (nullable = true)
 |-- method: string (nullable = true)
 |-- page: string (nullable = true)
 |-- registration: double (nullable = true)
 |-- sessionId: long (nullable = true)
 |-- song: string (nullable = true)
 |-- status: long (nullable = true)
 |-- ts: long (nullable = true)
 |-- userAgent: string (nullable = true)
 |-- userId: string (nullable = true)
 |-- start_time: timestamp (nullable = true)
 |-- Date: date (nullable = true)
 |-- title: string (nullable = true)
 |-- artist_id: string (nullable = true)
 |-- year: long (nullable = true)
 |-- duration: double (nullable = true)
 |-- song_id: string (nullable = true)
 |-- artist_df_artist_id: string (

In [78]:
songs_playtable= logsongsartisttimetable.select(col('ts').alias('start_time'),
        col('userId').alias('user_id'),
        col('level').alias('level'),
        col('song_id').alias('song_id'),
        col('artist_id').alias('artist_id'),
        col('sessionId').alias('session_id'),
        col('location').alias('location'),
        col('userAgent').alias('user_agent'),
        col('month').alias('month'),
    ).withColumn('songplay_id', monotonically_increasing_id())

In [79]:
songs_playtable.printSchema()

root
 |-- start_time: long (nullable = true)
 |-- user_id: string (nullable = true)
 |-- level: string (nullable = true)
 |-- song_id: string (nullable = true)
 |-- artist_id: string (nullable = true)
 |-- session_id: long (nullable = true)
 |-- location: string (nullable = true)
 |-- user_agent: string (nullable = true)
 |-- month: integer (nullable = true)
 |-- songplay_id: long (nullable = false)



In [80]:
songs_playtable.limit(5).show()

+-------------+-------+-----+------------------+------------------+----------+--------------------+--------------------+-----+-----------+
|   start_time|user_id|level|           song_id|         artist_id|session_id|            location|          user_agent|month|songplay_id|
+-------------+-------+-----+------------------+------------------+----------+--------------------+--------------------+-----+-----------+
|1542837407796|     15| paid|SOZCTXZ12AB0182364|AR5KOSW1187FB35FF4|       818|Chicago-Napervill...|"Mozilla/5.0 (X11...|   11|          0|
|1542171963796|     10| free|SOGDBUF12A8C140FAA|AR558FS1187FB45658|       484|Washington-Arling...|"Mozilla/5.0 (Mac...|   11|          1|
|1542618860796|     24| paid|SOGDBUF12A8C140FAA|AR558FS1187FB45658|       672|Lake Havasu City-...|"Mozilla/5.0 (Win...|   11|          2|
|1543358159796|     80| paid|SOGDBUF12A8C140FAA|AR558FS1187FB45658|       992|Portland-South Po...|"Mozilla/5.0 (Mac...|   11|          3|
+-------------+-------+----