In [87]:
from pyspark.sql import SparkSession
import pyspark.sql.functions as F
from pyspark.sql.types import IntegerType
from pyspark.sql import Window

In [88]:
spark = SparkSession \
    .builder \
    .config("spark.jars.packages", 'org.apache.hadoop:hadoop-aws:2.7.0') \
    .getOrCreate()

In [89]:
songs = spark.read.options(inferSchema='true').json('data/song_data/*/*/*/*.json')

In [90]:
songs.printSchema()

root
 |-- artist_id: string (nullable = true)
 |-- artist_latitude: double (nullable = true)
 |-- artist_location: string (nullable = true)
 |-- artist_longitude: double (nullable = true)
 |-- artist_name: string (nullable = true)
 |-- duration: double (nullable = true)
 |-- num_songs: long (nullable = true)
 |-- song_id: string (nullable = true)
 |-- title: string (nullable = true)
 |-- year: long (nullable = true)



In [91]:
output_data = 'data/lake/'

### Songs Table

In [92]:
songs_table = songs.select('song_id', 'title', 'artist_id', 'year', 'duration')

In [93]:
songs_table.show(5)

+------------------+--------------------+------------------+----+---------+
|           song_id|               title|         artist_id|year| duration|
+------------------+--------------------+------------------+----+---------+
|SOBAYLL12A8C138AF9|Sono andati? Fing...|ARDR4AC1187FB371A1|   0|511.16363|
|SOOLYAZ12A6701F4A6|Laws Patrolling (...|AREBBGV1187FB523D2|   0|173.66159|
|SOBBUGU12A8C13E95D|Setting Fire to S...|ARMAC4T1187FB3FA4C|2004|207.77751|
|SOAOIBZ12AB01815BE|I Hold Your Hand ...|ARPBNLO1187FB3D52F|2000| 43.36281|
|SONYPOM12A8C13B2D7|I Think My Wife I...|ARDNS031187B9924F0|2005|186.48771|
+------------------+--------------------+------------------+----+---------+
only showing top 5 rows



In [94]:
songs_table.printSchema()

root
 |-- song_id: string (nullable = true)
 |-- title: string (nullable = true)
 |-- artist_id: string (nullable = true)
 |-- year: long (nullable = true)
 |-- duration: double (nullable = true)



In [95]:
# convert col year to int
songs_table = songs_table.withColumn('year', songs_table.year.cast(IntegerType()))

In [96]:
songs_table.printSchema()

root
 |-- song_id: string (nullable = true)
 |-- title: string (nullable = true)
 |-- artist_id: string (nullable = true)
 |-- year: integer (nullable = true)
 |-- duration: double (nullable = true)



In [97]:
songs_table.write.mode('overwrite').partitionBy('year','artist_id').parquet(f'{output_data}songs/')

                                                                                

### Artists Table

In [98]:
artists_table = songs.select('artist_id',
                             F.col('artist_name').alias('name'),
                             F.col('artist_location').alias('location'),
                             F.col('artist_latitude').alias('latitude'),
                             F.col('artist_longitude').alias('longitude'))

In [99]:
artists_table.printSchema()

root
 |-- artist_id: string (nullable = true)
 |-- name: string (nullable = true)
 |-- location: string (nullable = true)
 |-- latitude: double (nullable = true)
 |-- longitude: double (nullable = true)



In [100]:
artists_table.write.mode('overwrite').parquet(f'{output_data}artists/')

## Load logs dataset

In [101]:
logs = spark.read.options(inferSchema='true').json('data/log_data/*.json')

In [102]:
logs.printSchema()

root
 |-- artist: string (nullable = true)
 |-- auth: string (nullable = true)
 |-- firstName: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- itemInSession: long (nullable = true)
 |-- lastName: string (nullable = true)
 |-- length: double (nullable = true)
 |-- level: string (nullable = true)
 |-- location: string (nullable = true)
 |-- method: string (nullable = true)
 |-- page: string (nullable = true)
 |-- registration: double (nullable = true)
 |-- sessionId: long (nullable = true)
 |-- song: string (nullable = true)
 |-- status: long (nullable = true)
 |-- ts: long (nullable = true)
 |-- userAgent: string (nullable = true)
 |-- userId: string (nullable = true)



In [103]:
logs = logs.filter(F.col('page') == 'NextSong')

In [104]:
logs.show()

+--------------------+---------+---------+------+-------------+--------+---------+-----+--------------------+------+--------+-----------------+---------+--------------------+------+-------------+--------------------+------+
|              artist|     auth|firstName|gender|itemInSession|lastName|   length|level|            location|method|    page|     registration|sessionId|                song|status|           ts|           userAgent|userId|
+--------------------+---------+---------+------+-------------+--------+---------+-----+--------------------+------+--------+-----------------+---------+--------------------+------+-------------+--------------------+------+
|            Harmonia|Logged In|     Ryan|     M|            0|   Smith|655.77751| free|San Jose-Sunnyval...|   PUT|NextSong|1.541016707796E12|      583|       Sehr kosmisch|   200|1542241826796|"Mozilla/5.0 (X11...|    26|
|         The Prodigy|Logged In|     Ryan|     M|            1|   Smith|260.07465| free|San Jose-Sunnyva

### Users Table

In [105]:
users_table = logs.select('userId', 'firstName', 'lastName', 'gender', 'level', 'ts')

In [106]:
# add new column with max_ts => filter by it later
window_spec = Window.partitionBy('userID')
users_table = users_table.withColumn('max_ts_user', F.max(F.col('ts')).over(window_spec))

In [107]:
users_table.show()

+------+---------+--------+------+-----+-------------+-------------+
|userId|firstName|lastName|gender|level|           ts|  max_ts_user|
+------+---------+--------+------+-----+-------------+-------------+
|    10|   Sylvie|    Cruz|     F| free|1542171216796|1543494067796|
|    10|   Sylvie|    Cruz|     F| free|1542171517796|1543494067796|
|    10|   Sylvie|    Cruz|     F| free|1542171840796|1543494067796|
|    10|   Sylvie|    Cruz|     F| free|1542171963796|1543494067796|
|    10|   Sylvie|    Cruz|     F| free|1542172087796|1543494067796|
|    10|   Sylvie|    Cruz|     F| free|1542172409796|1543494067796|
|    10|   Sylvie|    Cruz|     F| free|1542172712796|1543494067796|
|    10|   Sylvie|    Cruz|     F| free|1541433665796|1543494067796|
|    10|   Sylvie|    Cruz|     F| free|1542409085796|1543494067796|
|    10|   Sylvie|    Cruz|     F| free|1542737077796|1543494067796|
|    10|   Sylvie|    Cruz|     F| free|1543072762796|1543494067796|
|    10|   Sylvie|    Cruz|     F|

In [108]:
# filter by max_ts_user equals to ts
users_table = users_table.filter(F.col('max_ts_user') == F.col('ts'))

In [109]:
users_table = users_table.drop('ts','max_ts_user')\
    .withColumnRenamed('userId', 'user_id')\
    .withColumnRenamed('firstName', 'first_name')\
    .withColumnRenamed('lastName', 'last_name')\
    .withColumnRenamed('firstName', 'first_name')
users_table.printSchema()

root
 |-- user_id: string (nullable = true)
 |-- first_name: string (nullable = true)
 |-- last_name: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- level: string (nullable = true)



In [110]:
users_table.show()

+-------+----------+---------+------+-----+
|user_id|first_name|last_name|gender|level|
+-------+----------+---------+------+-----+
|     10|    Sylvie|     Cruz|     F| free|
|    100|     Adler|  Barrera|     M| free|
|    101|    Jayden|      Fox|     M| free|
|     11| Christian|   Porter|     F| free|
|     12|    Austin|  Rosales|     M| free|
|     13|       Ava| Robinson|     F| free|
|     14|  Theodore|   Harris|     M| free|
|     15|      Lily|     Koch|     F| paid|
|     16|     Rylan|   George|     M| paid|
|     17|  Makinley|    Jones|     F| free|
|     18|     Jacob|   Rogers|     M| free|
|     19|   Zachary|   Thomas|     M| free|
|      2|   Jizelle| Benjamin|     F| free|
|     20|     Aiden|  Ramirez|     M| paid|
|     22|      Sean|   Wilson|     F| free|
|     23|    Morris|  Gilmore|     M| free|
|     24|     Layla|  Griffin|     F| paid|
|     25|    Jayden|   Graves|     M| paid|
|     26|      Ryan|    Smith|     M| free|
|     27|    Carlos|   Carter|  

In [111]:
users_table.write.mode('overwrite').parquet(output_data + 'users/')

### Time table

In [112]:
from datetime import datetime
from pyspark.sql.types import TimestampType

get_timestamp = F.udf(lambda x: datetime.fromtimestamp(x / 1000.0), TimestampType())

In [113]:
time_table = logs.withColumn('start_time', get_timestamp(F.col('ts')))
time_table.show()

+--------------------+---------+---------+------+-------------+--------+---------+-----+--------------------+------+--------+-----------------+---------+--------------------+------+-------------+--------------------+------+--------------------+
|              artist|     auth|firstName|gender|itemInSession|lastName|   length|level|            location|method|    page|     registration|sessionId|                song|status|           ts|           userAgent|userId|          start_time|
+--------------------+---------+---------+------+-------------+--------+---------+-----+--------------------+------+--------+-----------------+---------+--------------------+------+-------------+--------------------+------+--------------------+
|            Harmonia|Logged In|     Ryan|     M|            0|   Smith|655.77751| free|San Jose-Sunnyval...|   PUT|NextSong|1.541016707796E12|      583|       Sehr kosmisch|   200|1542241826796|"Mozilla/5.0 (X11...|    26|2018-11-15 01:30:...|
|         The Prodig

In [114]:
time_table = time_table\
    .withColumn('hour', F.hour('start_time'))\
    .withColumn('day', F.dayofmonth('start_time'))\
    .withColumn('week', F.weekofyear('start_time'))\
    .withColumn('month', F.month('start_time'))\
    .withColumn('year', F.year('start_time'))\
    .withColumn('weekday', F.dayofweek('start_time'))\
    .select('start_time', 'hour', 'day', 'week', 'month', 'year', 'weekday')\
    .distinct()


In [115]:
time_table.show()

+--------------------+----+---+----+-----+----+-------+
|          start_time|hour|day|week|month|year|weekday|
+--------------------+----+---+----+-----+----+-------+
|2018-11-15 10:54:...|  10| 15|  46|   11|2018|      5|
|2018-11-15 12:21:...|  12| 15|  46|   11|2018|      5|
|2018-11-15 19:42:...|  19| 15|  46|   11|2018|      5|
|2018-11-15 23:41:...|  23| 15|  46|   11|2018|      5|
|2018-11-21 08:11:...|   8| 21|  47|   11|2018|      4|
|2018-11-14 13:53:...|  13| 14|  46|   11|2018|      4|
|2018-11-14 17:42:...|  17| 14|  46|   11|2018|      4|
|2018-11-28 09:23:...|   9| 28|  48|   11|2018|      4|
|2018-11-28 14:12:...|  14| 28|  48|   11|2018|      4|
|2018-11-29 00:31:...|   0| 29|  48|   11|2018|      5|
|2018-11-15 18:08:...|  18| 15|  46|   11|2018|      5|
|2018-11-15 23:17:...|  23| 15|  46|   11|2018|      5|
|2018-11-21 05:39:...|   5| 21|  47|   11|2018|      4|
|2018-11-21 20:57:...|  20| 21|  47|   11|2018|      4|
|2018-11-22 00:18:...|   0| 22|  47|   11|2018| 

In [116]:
time_table.printSchema()

root
 |-- start_time: timestamp (nullable = true)
 |-- hour: integer (nullable = true)
 |-- day: integer (nullable = true)
 |-- week: integer (nullable = true)
 |-- month: integer (nullable = true)
 |-- year: integer (nullable = true)
 |-- weekday: integer (nullable = true)



In [117]:
time_table.write.mode('overwrite').partitionBy('year', 'month').parquet(output_data + 'time/')

### Songplays Table

In [118]:
songs_df = spark.read.parquet(output_data + 'songs')
songs_df.show(10)

+------------------+--------------------+---------+----+------------------+
|           song_id|               title| duration|year|         artist_id|
+------------------+--------------------+---------+----+------------------+
|SOAOIBZ12AB01815BE|I Hold Your Hand ...| 43.36281|2000|ARPBNLO1187FB3D52F|
|SONYPOM12A8C13B2D7|I Think My Wife I...|186.48771|2005|ARDNS031187B9924F0|
|SODREIN12A58A7F2E5|A Whiter Shade Of...|326.00771|   0|ARLTWXK1187FB5A3F8|
|SOYMRWW12A6D4FAB14|The Moon And I (O...| 267.7024|   0|ARKFYS91187B98E58F|
|SOWQTQZ12A58A7B63E|Streets On Fire (...|279.97995|   0|ARPFHN61187FB575F6|
|SOUDSGM12AC9618304|Insatiable (Instr...|266.39628|   0|ARNTLGG11E2835DDB9|
|SOPEGZN12AB0181B3D|Get Your Head Stu...| 45.66159|   0|AREDL271187FB40F44|
|SOBBUGU12A8C13E95D|Setting Fire to S...|207.77751|2004|ARMAC4T1187FB3FA4C|
|SOBAYLL12A8C138AF9|Sono andati? Fing...|511.16363|   0|ARDR4AC1187FB371A1|
|SOOLYAZ12A6701F4A6|Laws Patrolling (...|173.66159|   0|AREBBGV1187FB523D2|
+-----------

In [119]:
artists_df = spark.read.parquet(output_data + 'artists')
artists_df.show(10)

+------------------+--------------------+--------------------+--------+---------+
|         artist_id|                name|            location|latitude|longitude|
+------------------+--------------------+--------------------+--------+---------+
|ARDR4AC1187FB371A1|Montserrat Caball...|                    |    null|     null|
|AREBBGV1187FB523D2|Mike Jones (Featu...|         Houston, TX|    null|     null|
|ARMAC4T1187FB3FA4C|The Dillinger Esc...|   Morris Plains, NJ|40.82624|-74.47995|
|ARPBNLO1187FB3D52F|            Tiny Tim|        New York, NY|40.71455|-74.00712|
|ARDNS031187B9924F0|          Tim Wilson|             Georgia|32.67828|-83.22295|
|ARNF6401187FB57032|   Sophie B. Hawkins|New York, NY [Man...|40.79086|-73.96644|
|ARLTWXK1187FB5A3F8|         King Curtis|      Fort Worth, TX|32.74863|-97.32925|
|ARPFHN61187FB575F6|         Lupe Fiasco|         Chicago, IL|41.88415|-87.63241|
|ARI2JSK1187FB496EF|Nick Ingman;Gavyn...|     London, England|51.50632| -0.12714|
|AROUOZZ1187B9AB

In [120]:
songs_details = songs_df.join(artists_df, 'artist_id', 'full').select('song_id', 'title', 'artist_id', 'name', 'year')
songs_details.show(10)

+------------------+--------------------+------------------+--------------------+----+
|           song_id|               title|         artist_id|                name|year|
+------------------+--------------------+------------------+--------------------+----+
|SOLYIBD12A8C135045|Music is what we ...|AR051KA1187B98B2FF|               Wilks|   0|
|SONSKXP12A8C13A2C9|         Native Soul|AR0IAWL1187B9A96D0|        Danilo Perez|2003|
|SOGXHEG12AB018653E|It Makes No Diffe...|AR0RCMP1187FB3F427|    Billie Jo Spears|1992|
|SOHKNRJ12A6701D1F8|        Drop of Rain|AR10USD1187B99F3F1|Tweeterfriendly M...|   0|
|SOLLHMX12AB01846DC|   The Emperor Falls|AR1Y2PT1187FB5B9CE|         John Wesley|   0|
|SOILPQQ12AB017E82A|Sohna Nee Sohna Data|AR1ZHYZ1187FB3C717|       Faiz Ali Faiz|   0|
|SOBKWDJ12A8C13B2F3|Wild Rose (Back 2...|AR36F9J1187FB406F1|      Bombay Rockers|   0|
|SOPVXLX12A8C1402D5|    Larger Than Life|AR3JMC51187B9AE49D|     Backstreet Boys|1999|
|SOBLGCN12AB0183212|James (Hold The L...|AR

In [121]:
# extract columns from joined song and log datasets to create songplays table
# join songs and logs on song_id and artist_id
songplays_table = logs.join(songs_details, (logs.song == songs_details.title) & (logs.artist == songs_details.name), 'left')
songplays_table.show(10)

+--------------------+---------+---------+------+-------------+--------+---------+-----+--------------------+------+--------+-----------------+---------+--------------------+------+-------------+--------------------+------+-------+-----+---------+----+----+
|              artist|     auth|firstName|gender|itemInSession|lastName|   length|level|            location|method|    page|     registration|sessionId|                song|status|           ts|           userAgent|userId|song_id|title|artist_id|name|year|
+--------------------+---------+---------+------+-------------+--------+---------+-----+--------------------+------+--------+-----------------+---------+--------------------+------+-------------+--------------------+------+-------+-----+---------+----+----+
|           Sean Paul|Logged In|    Tegan|     F|            9|  Levine|245.34159| paid|Portland-South Po...|   PUT|NextSong|1.540794356796E12|      602|Baby Boy [feat. B...|   200|1542262434796|"Mozilla/5.0 (Mac...|    80|   

In [122]:
songplays_table.printSchema()

root
 |-- artist: string (nullable = true)
 |-- auth: string (nullable = true)
 |-- firstName: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- itemInSession: long (nullable = true)
 |-- lastName: string (nullable = true)
 |-- length: double (nullable = true)
 |-- level: string (nullable = true)
 |-- location: string (nullable = true)
 |-- method: string (nullable = true)
 |-- page: string (nullable = true)
 |-- registration: double (nullable = true)
 |-- sessionId: long (nullable = true)
 |-- song: string (nullable = true)
 |-- status: long (nullable = true)
 |-- ts: long (nullable = true)
 |-- userAgent: string (nullable = true)
 |-- userId: string (nullable = true)
 |-- song_id: string (nullable = true)
 |-- title: string (nullable = true)
 |-- artist_id: string (nullable = true)
 |-- name: string (nullable = true)
 |-- year: integer (nullable = true)



In [123]:
songplays_table = songplays_table.withColumn('start_time', get_timestamp(F.col('ts')))

In [124]:
songplays_table.show(10)

+--------------------+---------+---------+------+-------------+--------+---------+-----+--------------------+------+--------+-----------------+---------+--------------------+------+-------------+--------------------+------+-------+-----+---------+----+----+--------------------+
|              artist|     auth|firstName|gender|itemInSession|lastName|   length|level|            location|method|    page|     registration|sessionId|                song|status|           ts|           userAgent|userId|song_id|title|artist_id|name|year|          start_time|
+--------------------+---------+---------+------+-------------+--------+---------+-----+--------------------+------+--------+-----------------+---------+--------------------+------+-------------+--------------------+------+-------+-----+---------+----+----+--------------------+
|           Sean Paul|Logged In|    Tegan|     F|            9|  Levine|245.34159| paid|Portland-South Po...|   PUT|NextSong|1.540794356796E12|      602|Baby Boy [

In [125]:
songplays_table = songplays_table.withColumn('user_id', F.col('userId').cast(IntegerType())) \
    .withColumn('month', F.month('start_time'))\
    .withColumn('year', F.year('start_time'))\
    .select('start_time', 'user_id', 'level', 'song_id', 'artist_id', 'sessionId', 'location', 'year', 'month', F.col('userAgent').alias('user_agent'))

In [126]:
songplays_table = songplays_table.withColumn('songplay_id', F.monotonically_increasing_id())


In [127]:
songplays_table.show(10)

+--------------------+-------+-----+-------+---------+---------+--------------------+----+-----+--------------------+-----------+
|          start_time|user_id|level|song_id|artist_id|sessionId|            location|year|month|          user_agent|songplay_id|
+--------------------+-------+-----+-------+---------+---------+--------------------+----+-----+--------------------+-----------+
|2018-11-15 11:56:...|     80| paid|   null|     null|      611|Portland-South Po...|2018|   11|"Mozilla/5.0 (Mac...|          0|
|2018-11-15 12:27:...|     30| paid|   null|     null|      324|San Jose-Sunnyval...|2018|   11|Mozilla/5.0 (Wind...|          1|
|2018-11-15 15:26:...|     30| paid|   null|     null|      324|San Jose-Sunnyval...|2018|   11|Mozilla/5.0 (Wind...|          2|
|2018-11-15 17:28:...|     97| paid|   null|     null|      605|Lansing-East Lans...|2018|   11|"Mozilla/5.0 (X11...|          3|
|2018-11-15 23:13:...|     76| free|   null|     null|      561|Seattle-Tacoma-Be...|2018|

In [128]:
songplays_table.printSchema()

root
 |-- start_time: timestamp (nullable = true)
 |-- user_id: integer (nullable = true)
 |-- level: string (nullable = true)
 |-- song_id: string (nullable = true)
 |-- artist_id: string (nullable = true)
 |-- sessionId: long (nullable = true)
 |-- location: string (nullable = true)
 |-- year: integer (nullable = true)
 |-- month: integer (nullable = true)
 |-- user_agent: string (nullable = true)
 |-- songplay_id: long (nullable = false)

