In [90]:
import configparser
from datetime import datetime
import os

from pyspark.sql import SparkSession
import pyspark.sql.functions as F
from pyspark.sql.functions import udf, col
from pyspark.sql.functions import year, month, dayofmonth, hour, weekofyear, date_format
from pyspark.sql.types import StructType as R, StructField as fld, DoubleType as Dbl, StringType as Str, IntegerType as Int, DateType as Date, TimestampType

In [54]:
spark = SparkSession \
        .builder \
        .config("spark.jars.packages", "org.apache.hadoop:hadoop-aws:2.7.0") \
        .getOrCreate()

In [55]:
song_data = 'song_data/*/*/*/*.json'

In [56]:
songSchema = R([
        fld("artist_id", Str()),
        fld("artist_latitude", Dbl()),
        fld("artist_location", Str()),
        fld("artist_longitude", Dbl()),
        fld("artist_name", Str()),
        fld("duration", Dbl()),
        fld("num_songs", Int()),
        fld("song_id", Str()),
        fld("title", Str()),
        fld("year", Int())
    ])

In [57]:
df = spark.read.json('song_data/*/*/*/*.json', schema=songSchema)
df.show()

+------------------+---------------+--------------------+----------------+--------------------+---------+---------+------------------+--------------------+----+
|         artist_id|artist_latitude|     artist_location|artist_longitude|         artist_name| duration|num_songs|           song_id|               title|year|
+------------------+---------------+--------------------+----------------+--------------------+---------+---------+------------------+--------------------+----+
|ARDR4AC1187FB371A1|           null|                    |            null|Montserrat Caball...|511.16363|        1|SOBAYLL12A8C138AF9|Sono andati? Fing...|   0|
|AREBBGV1187FB523D2|           null|         Houston, TX|            null|Mike Jones (Featu...|173.66159|        1|SOOLYAZ12A6701F4A6|Laws Patrolling (...|   0|
|ARMAC4T1187FB3FA4C|       40.82624|   Morris Plains, NJ|       -74.47995|The Dillinger Esc...|207.77751|        1|SOBBUGU12A8C13E95D|Setting Fire to S...|2004|
|ARPBNLO1187FB3D52F|       40.7145

In [58]:
fields=[("artist_id", "artist_id"),
              ("artist_latitude", "latitude"),
              ("artist_location", "location"),
              ("artist_longitude", "longitude"),
              ("artist_name", "name"),
              ("duration", "duration"),
              ("num_songs", "num_songs"),
              ("song_id", "song_id"),
              ("title", "title"),
              ("year",  "year")]
exprs = ["{} as {}".format(field[0], field[1]) for field in fields]
dfNamed = df.selectExpr(*exprs)
dfNamed.show()

+------------------+--------+--------------------+----------+--------------------+---------+---------+------------------+--------------------+----+
|         artist_id|latitude|            location| longitude|                name| duration|num_songs|           song_id|               title|year|
+------------------+--------+--------------------+----------+--------------------+---------+---------+------------------+--------------------+----+
|ARDR4AC1187FB371A1|    null|                    |      null|Montserrat Caball...|511.16363|        1|SOBAYLL12A8C138AF9|Sono andati? Fing...|   0|
|AREBBGV1187FB523D2|    null|         Houston, TX|      null|Mike Jones (Featu...|173.66159|        1|SOOLYAZ12A6701F4A6|Laws Patrolling (...|   0|
|ARMAC4T1187FB3FA4C|40.82624|   Morris Plains, NJ| -74.47995|The Dillinger Esc...|207.77751|        1|SOBBUGU12A8C13E95D|Setting Fire to S...|2004|
|ARPBNLO1187FB3D52F|40.71455|        New York, NY| -74.00712|            Tiny Tim| 43.36281|        1|SOAOIBZ12A

In [59]:
song_fields = ["title", "artist_id", "year", "duration"]
songs_table = dfNamed.select(song_fields) \
    .dropDuplicates() \
    .withColumn("song_id", monotonically_increasing_id())

In [60]:
songs_table.show()

+--------------------+------------------+----+---------+------------+
|               title|         artist_id|year| duration|     song_id|
+--------------------+------------------+----+---------+------------+
|               Intro|AR558FS1187FB45658|2003| 75.67628| 51539607552|
|Setting Fire to S...|ARMAC4T1187FB3FA4C|2004|207.77751| 68719476736|
|Kutt Free (DJ Vol...|ARNNKDK1187B98BBD5|   0|407.37914| 68719476737|
|Get Your Head Stu...|AREDL271187FB40F44|   0| 45.66159| 77309411328|
|     Amor De Cabaret|ARKRRTF1187B9984DA|   0|177.47546| 94489280512|
|The Urgency (LP V...|ARC43071187B990240|   0|245.21098|103079215104|
|           Ten Tonne|AR62SOJ1187FB47BB5|2005|337.68444|103079215105|
|      Spanish Grease|AROUOZZ1187B9ABE51|1997|168.25424|146028888064|
|       City Slickers|AR8IEZO1187B99055E|2008|149.86404|154618822656|
|A Higher Place (A...|ARBEBBY1187B9B43DB|1994|236.17261|180388626432|
|Wessex Loses a Bride|ARI2JSK1187FB496EF|   0|111.62077|223338299392|
|Made Like This (L..

In [61]:
artists_table = ['artist_id', 'name', 'location', 'latitude', 'longitude']
artists_table = dfNamed.select(artists_table).dropDuplicates()
artists_table.show()

+------------------+--------------------+--------------------+--------+----------+
|         artist_id|                name|            location|latitude| longitude|
+------------------+--------------------+--------------------+--------+----------+
|ARPBNLO1187FB3D52F|            Tiny Tim|        New York, NY|40.71455| -74.00712|
|ARXR32B1187FB57099|                 Gob|                    |    null|      null|
|AROGWRA122988FEE45|     Christos Dantis|                    |    null|      null|
|ARBGXIG122988F409D|          Steel Rain|     California - SF|37.77916|-122.42005|
|AREVWGE1187B9B890A|          Bitter End|           Noci (BA)| -13.442|  -41.9952|
|AREDL271187FB40F44|        Soul Mekanik|                    |    null|      null|
|ARGSAFR1269FB35070|          Blingtones|                    |    null|      null|
|ARH4Z031187B9A71F2|          Faye Adams|          Newark, NJ|40.73197| -74.17418|
|ARGSJW91187B9B1D6B|        JennyAnyKind|      North Carolina|35.21962| -80.01955|
|ARJ

In [62]:
log_data = 'log_data/*.json'

In [63]:
ldf = spark.read.json(log_data)
ldf.show()

+--------------------+----------+---------+------+-------------+--------+---------+-----+--------------------+------+--------+-----------------+---------+--------------------+------+-------------+--------------------+------+
|              artist|      auth|firstName|gender|itemInSession|lastName|   length|level|            location|method|    page|     registration|sessionId|                song|status|           ts|           userAgent|userId|
+--------------------+----------+---------+------+-------------+--------+---------+-----+--------------------+------+--------+-----------------+---------+--------------------+------+-------------+--------------------+------+
|            Harmonia| Logged In|     Ryan|     M|            0|   Smith|655.77751| free|San Jose-Sunnyval...|   PUT|NextSong|1.541016707796E12|      583|       Sehr kosmisch|   200|1542241826796|"Mozilla/5.0 (X11...|    26|
|         The Prodigy| Logged In|     Ryan|     M|            1|   Smith|260.07465| free|San Jose-Su

In [65]:
ldf = ldf.filter(ldf.page=='NextSong')
ldf.show()

+--------------------+---------+---------+------+-------------+--------+---------+-----+--------------------+------+--------+-----------------+---------+--------------------+------+-------------+--------------------+------+
|              artist|     auth|firstName|gender|itemInSession|lastName|   length|level|            location|method|    page|     registration|sessionId|                song|status|           ts|           userAgent|userId|
+--------------------+---------+---------+------+-------------+--------+---------+-----+--------------------+------+--------+-----------------+---------+--------------------+------+-------------+--------------------+------+
|            Harmonia|Logged In|     Ryan|     M|            0|   Smith|655.77751| free|San Jose-Sunnyval...|   PUT|NextSong|1.541016707796E12|      583|       Sehr kosmisch|   200|1542241826796|"Mozilla/5.0 (X11...|    26|
|         The Prodigy|Logged In|     Ryan|     M|            1|   Smith|260.07465| free|San Jose-Sunnyva

In [66]:
fields = [("artist", "artist"),
          ("auth", "auth"),
          ("firstName", "first_name"),
          ("gender", "gender"),
          ("itemInSession", "itemInSession"),
          ("lastName", "last_name"),
          ("length", "length"),
          ("level", "level"),
          ("location", "location"),
          ("method", "method"),
          ("page", "page"),
          ("registration", "registration"),
          ("sessionId", "session_id"),
          ("song", "song"),
          ("status", "status"),
          ("ts", "ts"),
          ("userAgent", "user_agent"),
          ("userId", "user_id")
          ]
exprs = [ "{} as {}".format(field[0],field[1]) for field in fields]
ldf = ldf.selectExpr(*exprs)

In [67]:
user_fields = ['user_id', 'first_name', 'last_name', 'gender', 'level']
users_table = ldf.select(user_fields).dropDuplicates()
users_table.show()

+-------+----------+---------+------+-----+
|user_id|first_name|last_name|gender|level|
+-------+----------+---------+------+-----+
|     26|      Ryan|    Smith|     M| free|
|      7|    Adelyn|   Jordan|     F| free|
|     71|    Ayleen|     Wise|     F| free|
|     81|    Sienna|    Colon|     F| free|
|     87|    Dustin|      Lee|     M| free|
|     23|    Morris|  Gilmore|     M| free|
|     75|    Joseph|Gutierrez|     M| free|
|     16|     Rylan|   George|     M| paid|
|      2|   Jizelle| Benjamin|     F| free|
|      3|     Isaac|   Valdez|     M| free|
|     54|     Kaleb|     Cook|     M| free|
|     79|     James|   Martin|     M| free|
|     80|     Tegan|   Levine|     F| paid|
|     77| Magdalene|   Herman|     F| free|
|     47|    Kimber|   Norris|     F| free|
|     30|     Avery|  Watkins|     F| paid|
|     22|      Sean|   Wilson|     F| free|
|      4|    Alivia|  Terrell|     F| free|
|     55|    Martin|  Johnson|     M| free|
|     20|     Aiden|  Ramirez|  

In [68]:
get_timestamp = udf(lambda x: x/1000, Dbl())
ldf = ldf.withColumn('ts2', get_timestamp('ts'))
ldf.show()

+--------------------+---------+----------+------+-------------+---------+---------+-----+--------------------+------+--------+-----------------+----------+--------------------+------+-------------+--------------------+-------+----------------+
|              artist|     auth|first_name|gender|itemInSession|last_name|   length|level|            location|method|    page|     registration|session_id|                song|status|           ts|          user_agent|user_id|             ts2|
+--------------------+---------+----------+------+-------------+---------+---------+-----+--------------------+------+--------+-----------------+----------+--------------------+------+-------------+--------------------+-------+----------------+
|            Harmonia|Logged In|      Ryan|     M|            0|    Smith|655.77751| free|San Jose-Sunnyval...|   PUT|NextSong|1.541016707796E12|       583|       Sehr kosmisch|   200|1542241826796|"Mozilla/5.0 (X11...|     26|1.542241826796E9|
|         The Prodig

In [79]:
get_timestamp = udf(lambda x: datetime.fromtimestamp(x/1000).strftime('%Y-%m-%d %H:%M:%S'))
ldf = ldf.withColumn('timestamp', get_timestamp('ts'))
ldf.show()

+--------------------+---------+----------+------+-------------+---------+---------+-----+--------------------+------+--------+-----------------+----------+--------------------+------+-------------+--------------------+-------+----------------+-------------------+
|              artist|     auth|first_name|gender|itemInSession|last_name|   length|level|            location|method|    page|     registration|session_id|                song|status|           ts|          user_agent|user_id|             ts2|          timestamp|
+--------------------+---------+----------+------+-------------+---------+---------+-----+--------------------+------+--------+-----------------+----------+--------------------+------+-------------+--------------------+-------+----------------+-------------------+
|            Harmonia|Logged In|      Ryan|     M|            0|    Smith|655.77751| free|San Jose-Sunnyval...|   PUT|NextSong|1.541016707796E12|       583|       Sehr kosmisch|   200|1542241826796|"Mozill

In [80]:
get_datetime = udf(lambda x: datetime.fromtimestamp(x/1000).strftime('%Y-%m-%d'))
ldf = ldf.withColumn('datetime', get_datetime('ts'))
ldf.show()

+--------------------+---------+----------+------+-------------+---------+---------+-----+--------------------+------+--------+-----------------+----------+--------------------+------+-------------+--------------------+-------+----------------+-------------------+----------+
|              artist|     auth|first_name|gender|itemInSession|last_name|   length|level|            location|method|    page|     registration|session_id|                song|status|           ts|          user_agent|user_id|             ts2|          timestamp|  datetime|
+--------------------+---------+----------+------+-------------+---------+---------+-----+--------------------+------+--------+-----------------+----------+--------------------+------+-------------+--------------------+-------+----------------+-------------------+----------+
|            Harmonia|Logged In|      Ryan|     M|            0|    Smith|655.77751| free|San Jose-Sunnyval...|   PUT|NextSong|1.541016707796E12|       583|       Sehr kosm

In [82]:
time_table = ldf.select(
    col('timestamp').alias('start_time'),
    hour('timestamp').alias('hour'),
    dayofmonth('timestamp').alias('day'),
    weekofyear('timestamp').alias('week'),
    month('timestamp').alias('month'),
    year('timestamp').alias('year'),
    date_format('timestamp', 'u').alias('weekday')).orderBy("start_time").drop_duplicates() 

time_table.show()

+-------------------+----+---+----+-----+----+-------+
|         start_time|hour|day|week|month|year|weekday|
+-------------------+----+---+----+-----+----+-------+
|2018-11-01 21:01:46|  21|  1|  44|   11|2018|      4|
|2018-11-01 21:05:52|  21|  1|  44|   11|2018|      4|
|2018-11-01 21:08:16|  21|  1|  44|   11|2018|      4|
|2018-11-01 21:11:13|  21|  1|  44|   11|2018|      4|
|2018-11-01 21:17:33|  21|  1|  44|   11|2018|      4|
|2018-11-01 21:24:53|  21|  1|  44|   11|2018|      4|
|2018-11-01 21:28:54|  21|  1|  44|   11|2018|      4|
|2018-11-01 21:42:00|  21|  1|  44|   11|2018|      4|
|2018-11-01 21:52:05|  21|  1|  44|   11|2018|      4|
|2018-11-01 21:55:25|  21|  1|  44|   11|2018|      4|
|2018-11-01 22:23:14|  22|  1|  44|   11|2018|      4|
|2018-11-02 01:25:34|   1|  2|  44|   11|2018|      5|
|2018-11-02 01:30:41|   1|  2|  44|   11|2018|      5|
|2018-11-02 01:34:17|   1|  2|  44|   11|2018|      5|
|2018-11-02 02:42:48|   2|  2|  44|   11|2018|      5|
|2018-11-0

In [83]:
song_df = spark.read.json('song_data/*/*/*/*.json').selectExpr(
    "song_id",
    "title",
    "artist_id",
    "artist_name",
    "year",
    "duration").drop_duplicates()

song_df

DataFrame[song_id: string, title: string, artist_id: string, artist_name: string, year: bigint, duration: double]

In [84]:
song_df.show()

+------------------+--------------------+------------------+--------------------+----+---------+
|           song_id|               title|         artist_id|         artist_name|year| duration|
+------------------+--------------------+------------------+--------------------+----+---------+
|SOKEJEJ12A8C13E0D0|The Urgency (LP V...|ARC43071187B990240|        Wayne Watson|   0|245.21098|
|SOZCTXZ12AB0182364|      Setanta matins|AR5KOSW1187FB35FF4|               Elena|   0|269.58322|
|SORRZGD12A6310DBC3|      Harajuku Girls|ARVBRGZ1187FB4675A|        Gwen Stefani|2004|290.55955|
|SOGVQGJ12AB017F169|           Ten Tonne|AR62SOJ1187FB47BB5|      Chase & Status|2005|337.68444|
|SOQOTLQ12AB01868D0|  Clementina Santafè|ARGCY1Y1187B9A4FA5|            Gloriana|   0|153.33832|
|SONHOTT12A8C13493C|     Something Girls|AR7G5I41187FB4CE6C|            Adam Ant|1982|233.40363|
|SOOJPRH12A8C141995|   Loaded Like A Gun|ARBGXIG122988F409D|          Steel Rain|   0|173.19138|
|SOLLHMX12AB01846DC|   The Emp

In [86]:
ldf.show()

+--------------------+---------+----------+------+-------------+---------+---------+-----+--------------------+------+--------+-----------------+----------+--------------------+------+-------------+--------------------+-------+----------------+-------------------+----------+
|              artist|     auth|first_name|gender|itemInSession|last_name|   length|level|            location|method|    page|     registration|session_id|                song|status|           ts|          user_agent|user_id|             ts2|          timestamp|  datetime|
+--------------------+---------+----------+------+-------------+---------+---------+-----+--------------------+------+--------+-----------------+----------+--------------------+------+-------------+--------------------+-------+----------------+-------------------+----------+
|            Harmonia|Logged In|      Ryan|     M|            0|    Smith|655.77751| free|San Jose-Sunnyval...|   PUT|NextSong|1.541016707796E12|       583|       Sehr kosm

In [91]:
songplays_table = ldf.join(song_df, 
                           (ldf.song == song_df.title) & 
                           (ldf.artist == song_df.artist_name) & 
                           (ldf.length == song_df.duration) & 
                           (year(ldf.timestamp) == song_df.year), 'left_outer').select(ldf.timestamp.alias("start_time"), 
                                                                                    ldf.user_id,
                                                                                    ldf.level,
                                                                                    song_df.song_id,
                                                                                    song_df.artist_id,
                                                                                    ldf.session_id, 
                                                                                    ldf.location, 
                                                                                    ldf.user_agent,
                                                                                    year(ldf.timestamp).alias('year'),
                                                                                    month(ldf.timestamp).alias('month')).orderBy("start_time", "user_id").withColumn("songplay_id", F.monotonically_increasing_id())

songplays_table.show()

+-------------------+-------+-----+-------+---------+----------+--------------------+--------------------+----+-----+-----------+
|         start_time|user_id|level|song_id|artist_id|session_id|            location|          user_agent|year|month|songplay_id|
+-------------------+-------+-----+-------+---------+----------+--------------------+--------------------+----+-----+-----------+
|2018-11-01 21:01:46|      8| free|   null|     null|       139|Phoenix-Mesa-Scot...|"Mozilla/5.0 (Win...|2018|   11|          0|
|2018-11-01 21:05:52|      8| free|   null|     null|       139|Phoenix-Mesa-Scot...|"Mozilla/5.0 (Win...|2018|   11|          1|
|2018-11-01 21:08:16|      8| free|   null|     null|       139|Phoenix-Mesa-Scot...|"Mozilla/5.0 (Win...|2018|   11|          2|
|2018-11-01 21:11:13|      8| free|   null|     null|       139|Phoenix-Mesa-Scot...|"Mozilla/5.0 (Win...|2018|   11|          3|
|2018-11-01 21:17:33|      8| free|   null|     null|       139|Phoenix-Mesa-Scot...|"Mozi