In [1]:
import configparser
from datetime import datetime
import os
from pyspark.sql import SparkSession
from pyspark.sql.functions import udf, col
from pyspark.sql.functions import year, month, dayofmonth, hour, dayofweek, weekofyear, date_format
import pyspark.sql.functions as F
import sql_queries as sq

In [2]:
config = configparser.ConfigParser()
config.read('dl.cfg')

os.environ['AWS_ACCESS_KEY_ID']=config.get("AWS","AWS_ACCESS_KEY_ID")
os.environ['AWS_SECRET_ACCESS_KEY']=config.get("AWS","AWS_SECRET_ACCESS_KEY")

In [3]:
spark = SparkSession \
    .builder \
    .config("spark.jars.packages", "org.apache.hadoop:hadoop-aws:2.7.0") \
    .getOrCreate()

Ivy Default Cache set to: /Users/subashprakash/.ivy2/cache
The jars for the packages stored in: /Users/subashprakash/.ivy2/jars
org.apache.hadoop#hadoop-aws added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent-28f1aace-ce41-48d6-8f2b-ffe177753e94;1.0
	confs: [default]
	found org.apache.hadoop#hadoop-aws;2.7.0 in central


:: loading settings :: url = jar:file:/opt/homebrew/Cellar/apache-spark/3.2.1/libexec/jars/ivy-2.5.0.jar!/org/apache/ivy/core/settings/ivysettings.xml


	found org.apache.hadoop#hadoop-common;2.7.0 in central
	found org.apache.hadoop#hadoop-annotations;2.7.0 in central
	found com.google.guava#guava;11.0.2 in central
	found com.google.code.findbugs#jsr305;3.0.0 in central
	found commons-cli#commons-cli;1.2 in central
	found org.apache.commons#commons-math3;3.1.1 in central
	found xmlenc#xmlenc;0.52 in central
	found commons-httpclient#commons-httpclient;3.1 in central
	found commons-logging#commons-logging;1.1.3 in central
	found commons-codec#commons-codec;1.4 in central
	found commons-io#commons-io;2.4 in central
	found commons-net#commons-net;3.1 in central
	found commons-collections#commons-collections;3.2.1 in central
	found javax.servlet#servlet-api;2.5 in central
	found org.mortbay.jetty#jetty;6.1.26 in central
	found org.mortbay.jetty#jetty-util;6.1.26 in central
	found com.sun.jersey#jersey-core;1.9 in central
	found com.sun.jersey#jersey-json;1.9 in central
	found org.codehaus.jettison#jettison;1.1 in central
	found com.sun.xm

Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


In [4]:
input_data = "data/"
song_e_df = spark.read.json(input_data + "song_data/*/*/*/*.json")
song_e_df.createOrReplaceTempView("songs_db")

In [5]:
log_e_df = spark.read.json(input_data + "log-data/*.json")
log_e_df.createOrReplaceTempView("logs_db")

In [6]:
song_e_df.printSchema()

root
 |-- artist_id: string (nullable = true)
 |-- artist_latitude: double (nullable = true)
 |-- artist_location: string (nullable = true)
 |-- artist_longitude: double (nullable = true)
 |-- artist_name: string (nullable = true)
 |-- duration: double (nullable = true)
 |-- num_songs: long (nullable = true)
 |-- song_id: string (nullable = true)
 |-- title: string (nullable = true)
 |-- year: long (nullable = true)



In [7]:
log_e_df.printSchema()

root
 |-- artist: string (nullable = true)
 |-- auth: string (nullable = true)
 |-- firstName: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- itemInSession: long (nullable = true)
 |-- lastName: string (nullable = true)
 |-- length: double (nullable = true)
 |-- level: string (nullable = true)
 |-- location: string (nullable = true)
 |-- method: string (nullable = true)
 |-- page: string (nullable = true)
 |-- registration: double (nullable = true)
 |-- sessionId: long (nullable = true)
 |-- song: string (nullable = true)
 |-- status: long (nullable = true)
 |-- ts: long (nullable = true)
 |-- userAgent: string (nullable = true)
 |-- userId: string (nullable = true)



In [43]:
# There are some year which are 0
song_table = spark.sql(sq.songs_q)
song_table.write.partitionBy("year","artist_id").mode("overwrite").parquet("song_table.parquet")
song_table.show(10)



+------------------+------------------+--------------------+----+---------+
|           song_id|         artist_id|               title|year| duration|
+------------------+------------------+--------------------+----+---------+
|SOBBUGU12A8C13E95D|ARMAC4T1187FB3FA4C|Setting Fire to S...|2004|207.77751|
|SOAOIBZ12AB01815BE|ARPBNLO1187FB3D52F|I Hold Your Hand ...|2000| 43.36281|
|SONYPOM12A8C13B2D7|ARDNS031187B9924F0|I Think My Wife I...|2005|186.48771|
|SONWXQJ12A8C134D94|ARNF6401187FB57032|The Ballad Of Sle...|1994|  305.162|
|SOBZBAZ12A6D4F8742|AROUOZZ1187B9ABE51|      Spanish Grease|1997|168.25424|
|SOGXHEG12AB018653E|AR0RCMP1187FB3F427|It Makes No Diffe...|1992|133.32853|
|SOBLGCN12AB0183212|AR47JEX1187B995D81|James (Hold The L...|1985|124.86485|
|SOGNCJP12A58A80271|ARB29H41187B98F0EF|Do You Finally Ne...|1972|342.56934|
|SOFFKZS12AB017F194|ARBEBBY1187B9B43DB|A Higher Place (A...|1994|236.17261|
|SOBONFF12A6D4F84D8|ARIK43K1187B9AE54C|Tonight Will Be A...|1986| 307.3824|
+-----------


                                                                                

In [9]:
song_table.printSchema()

root
 |-- song_id: string (nullable = true)
 |-- artist_id: string (nullable = true)
 |-- title: string (nullable = true)
 |-- year: long (nullable = true)
 |-- duration: double (nullable = true)



In [10]:
song_table.groupBy("year").agg(F.count("year")).show()

+----+-----------+
|year|count(year)|
+----+-----------+
|1994|          2|
|2004|          4|
|2005|          2|
|2000|          2|
|1972|          1|
|1997|          2|
|1992|          1|
|1985|          1|
|1986|          1|
|1987|          1|
|1999|          1|
|1961|          1|
|1969|          1|
|1964|          1|
|2003|          2|
|1982|          1|
|1984|          1|
|2008|          1|
|2007|          1|
|1993|          1|
+----+-----------+



In [11]:
# Partition by song_id is okay however there is one song which is listened very less.
song_table.groupBy("song_id").agg(F.avg("duration")).show()

+------------------+-------------+
|           song_id|avg(duration)|
+------------------+-------------+
|SOAOIBZ12AB01815BE|     43.36281|
|SONWXQJ12A8C134D94|      305.162|
|SOBBUGU12A8C13E95D|    207.77751|
|SONYPOM12A8C13B2D7|    186.48771|
|SOGXHEG12AB018653E|    133.32853|
|SOBZBAZ12A6D4F8742|    168.25424|
|SOBLGCN12AB0183212|    124.86485|
|SOGNCJP12A58A80271|    342.56934|
|SOBONFF12A6D4F84D8|     307.3824|
|SOPVXLX12A8C1402D5|    236.25098|
|SOFFKZS12AB017F194|    236.17261|
|SOHUOAP12A8AE488E9|    491.12771|
|SOVYKGO12AB0187199|    156.39465|
|SOGOSOV12AF72A285E|    313.12934|
|SOYTPEP12AB0180E7B|    164.80608|
|SOCIWDW12A8C13D406|    148.03546|
|SONSKXP12A8C13A2C9|    197.19791|
|SONHOTT12A8C13493C|    233.40363|
|SODAUVL12A8C13D184|    363.85914|
|SOXILUQ12A58A7C72A|    207.43791|
+------------------+-------------+
only showing top 20 rows



In [12]:
song_table.filter(song_table.song_id=="SOAOIBZ12AB01815BE").show()

+------------------+------------------+--------------------+----+--------+
|           song_id|         artist_id|               title|year|duration|
+------------------+------------------+--------------------+----+--------+
|SOAOIBZ12AB01815BE|ARPBNLO1187FB3D52F|I Hold Your Hand ...|2000|43.36281|
+------------------+------------------+--------------------+----+--------+



In [13]:
# Looking only from the part of the data it seems that it is fine.
song_table.groupBy("artist_id").agg(F.count("*")).show()

+------------------+--------+
|         artist_id|count(1)|
+------------------+--------+
|ARMAC4T1187FB3FA4C|       1|
|ARPBNLO1187FB3D52F|       1|
|ARNF6401187FB57032|       1|
|ARDNS031187B9924F0|       1|
|AR0RCMP1187FB3F427|       1|
|ARB29H41187B98F0EF|       1|
|AR47JEX1187B995D81|       1|
|AROUOZZ1187B9ABE51|       1|
|ARD842G1187B997376|       1|
|AR3JMC51187B9AE49D|       1|
|ARIK43K1187B9AE54C|       1|
|ARH4Z031187B9A71F2|       1|
|ARBEBBY1187B9B43DB|       1|
|ARAJPHH1187FB5566A|       1|
|ARGUVEV1187B98BA17|       1|
|ARMJAGH1187FB546F3|       1|
|AR0IAWL1187B9A96D0|       1|
|AR7G5I41187FB4CE6C|       1|
|ARP6N5A1187B99D1A3|       1|
|ARWB3G61187FB49404|       1|
+------------------+--------+
only showing top 20 rows



In [14]:
artist_table = spark.sql(sq.artists_q)
artist_table.show(10)

+------------------+--------------------+--------------------+--------+---------+
|         artist_id|                name|            location|latitude|longitude|
+------------------+--------------------+--------------------+--------+---------+
|ARDR4AC1187FB371A1|Montserrat Caball...|                    |    null|     null|
|AREBBGV1187FB523D2|Mike Jones (Featu...|         Houston, TX|    null|     null|
|ARMAC4T1187FB3FA4C|The Dillinger Esc...|   Morris Plains, NJ|40.82624|-74.47995|
|ARPBNLO1187FB3D52F|            Tiny Tim|        New York, NY|40.71455|-74.00712|
|ARDNS031187B9924F0|          Tim Wilson|             Georgia|32.67828|-83.22295|
|ARNF6401187FB57032|   Sophie B. Hawkins|New York, NY [Man...|40.79086|-73.96644|
|ARLTWXK1187FB5A3F8|         King Curtis|      Fort Worth, TX|32.74863|-97.32925|
|ARPFHN61187FB575F6|         Lupe Fiasco|         Chicago, IL|41.88415|-87.63241|
|ARI2JSK1187FB496EF|Nick Ingman;Gavyn...|     London, England|51.50632| -0.12714|
|AROUOZZ1187B9AB

In [15]:
artist_table.describe()

DataFrame[summary: string, artist_id: string, name: string, location: string, latitude: string, longitude: string]

In [16]:
logs_df = spark.sql(sq.logs_q)

In [17]:
logs_df.show(10)

+--------------------+---------+---------+------+-------------+--------+---------+-----+--------------------+------+--------+-----------------+---------+--------------------+------+-------------+--------------------+------+
|              artist|     auth|firstName|gender|itemInSession|lastName|   length|level|            location|method|    page|     registration|sessionId|                song|status|           ts|           userAgent|userId|
+--------------------+---------+---------+------+-------------+--------+---------+-----+--------------------+------+--------+-----------------+---------+--------------------+------+-------------+--------------------+------+
|            Harmonia|Logged In|     Ryan|     M|            0|   Smith|655.77751| free|San Jose-Sunnyval...|   PUT|NextSong|1.541016707796E12|      583|       Sehr kosmisch|   200|1542241826796|"Mozilla/5.0 (X11...|    26|
|         The Prodigy|Logged In|     Ryan|     M|            1|   Smith|260.07465| free|San Jose-Sunnyva

In [18]:
logs_df.createOrReplaceTempView("nextPlay_logs_db")

In [19]:
# ctx handling the level changes to have completely unique user_ids
users_table = spark.sql(sq.users_q)

In [20]:
users_table.show()

+-------+----------+---------+------+-----+
|user_id|first_name|last_name|gender|level|
+-------+----------+---------+------+-----+
|     10|    Sylvie|     Cruz|     F| free|
|    100|     Adler|  Barrera|     M| free|
|    101|    Jayden|      Fox|     M| free|
|     11| Christian|   Porter|     F| free|
|     12|    Austin|  Rosales|     M| free|
|     13|       Ava| Robinson|     F| free|
|     14|  Theodore|   Harris|     M| free|
|     15|      Lily|     Koch|     F| paid|
|     16|     Rylan|   George|     M| paid|
|     17|  Makinley|    Jones|     F| free|
|     18|     Jacob|   Rogers|     M| free|
|     19|   Zachary|   Thomas|     M| free|
|      2|   Jizelle| Benjamin|     F| free|
|     20|     Aiden|  Ramirez|     M| paid|
|     22|      Sean|   Wilson|     F| free|
|     23|    Morris|  Gilmore|     M| free|
|     24|     Layla|  Griffin|     F| paid|
|     25|    Jayden|   Graves|     M| paid|
|     26|      Ryan|    Smith|     M| free|
|     27|    Carlos|   Carter|  

In [42]:
users_table.groupBy("user_id").agg(F.count("user_id").alias('count')).orderBy(col('count').desc()).show(5)

+-------+-----+
|user_id|count|
+-------+-----+
|     10|    1|
|    100|    1|
|    101|    1|
|     11|    1|
|     12|    1|
+-------+-----+
only showing top 5 rows



In [22]:
from datetime import datetime

def format_ts(value):
    return datetime.fromtimestamp(value/1000).strftime("%Y-%m-%d %H:%M:%S")

In [23]:
logs_df.select("ts").show(3)

+-------------+
|           ts|
+-------------+
|1542241826796|
|1542242481796|
|1542242741796|
+-------------+
only showing top 3 rows



In [24]:
get_timestamp = udf(lambda x: format_ts(x))
logs_t_df = logs_df.withColumn("timestamp", get_timestamp(logs_df.ts))

In [25]:
logs_t_df.show(5)

+-----------+---------+---------+------+-------------+--------+---------+-----+--------------------+------+--------+-----------------+---------+--------------------+------+-------------+--------------------+------+-------------------+
|     artist|     auth|firstName|gender|itemInSession|lastName|   length|level|            location|method|    page|     registration|sessionId|                song|status|           ts|           userAgent|userId|          timestamp|
+-----------+---------+---------+------+-------------+--------+---------+-----+--------------------+------+--------+-----------------+---------+--------------------+------+-------------+--------------------+------+-------------------+
|   Harmonia|Logged In|     Ryan|     M|            0|   Smith|655.77751| free|San Jose-Sunnyval...|   PUT|NextSong|1.541016707796E12|      583|       Sehr kosmisch|   200|1542241826796|"Mozilla/5.0 (X11...|    26|2018-11-15 01:30:26|
|The Prodigy|Logged In|     Ryan|     M|            1|   Smi


[Stage 31:>                                                         (0 + 1) / 1]

                                                                                

In [26]:
logs_t_df.printSchema()

root
 |-- artist: string (nullable = true)
 |-- auth: string (nullable = true)
 |-- firstName: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- itemInSession: long (nullable = true)
 |-- lastName: string (nullable = true)
 |-- length: double (nullable = true)
 |-- level: string (nullable = true)
 |-- location: string (nullable = true)
 |-- method: string (nullable = true)
 |-- page: string (nullable = true)
 |-- registration: double (nullable = true)
 |-- sessionId: long (nullable = true)
 |-- song: string (nullable = true)
 |-- status: long (nullable = true)
 |-- ts: long (nullable = true)
 |-- userAgent: string (nullable = true)
 |-- userId: string (nullable = true)
 |-- timestamp: string (nullable = true)



In [27]:
logs_t_df=logs_t_df.withColumn('timestamp', logs_t_df.timestamp.cast("timestamp"))

In [28]:
logs_t_df.printSchema()

root
 |-- artist: string (nullable = true)
 |-- auth: string (nullable = true)
 |-- firstName: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- itemInSession: long (nullable = true)
 |-- lastName: string (nullable = true)
 |-- length: double (nullable = true)
 |-- level: string (nullable = true)
 |-- location: string (nullable = true)
 |-- method: string (nullable = true)
 |-- page: string (nullable = true)
 |-- registration: double (nullable = true)
 |-- sessionId: long (nullable = true)
 |-- song: string (nullable = true)
 |-- status: long (nullable = true)
 |-- ts: long (nullable = true)
 |-- userAgent: string (nullable = true)
 |-- userId: string (nullable = true)
 |-- timestamp: timestamp (nullable = true)



In [29]:
logs_t_df = logs_t_df.withColumn("year", F.year("timestamp")) \
             .withColumn("month", F.month("timestamp")) \
             .withColumn("dayofmonth", F.dayofmonth("timestamp")) \
             .withColumn("hour", F.hour("timestamp")) \
             .withColumn("weekofyear", F.weekofyear("timestamp")) \
             .withColumn("weekday", F.dayofweek("timestamp"))

In [30]:
logs_t_df.printSchema()

root
 |-- artist: string (nullable = true)
 |-- auth: string (nullable = true)
 |-- firstName: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- itemInSession: long (nullable = true)
 |-- lastName: string (nullable = true)
 |-- length: double (nullable = true)
 |-- level: string (nullable = true)
 |-- location: string (nullable = true)
 |-- method: string (nullable = true)
 |-- page: string (nullable = true)
 |-- registration: double (nullable = true)
 |-- sessionId: long (nullable = true)
 |-- song: string (nullable = true)
 |-- status: long (nullable = true)
 |-- ts: long (nullable = true)
 |-- userAgent: string (nullable = true)
 |-- userId: string (nullable = true)
 |-- timestamp: timestamp (nullable = true)
 |-- year: integer (nullable = true)
 |-- month: integer (nullable = true)
 |-- dayofmonth: integer (nullable = true)
 |-- hour: integer (nullable = true)
 |-- weekofyear: integer (nullable = true)
 |-- weekday: integer (nullable = true)



In [31]:
logs_t_df.groupBy("year").agg(F.count("*")).show()

+----+--------+
|year|count(1)|
+----+--------+
|2018|    6820|
+----+--------+



In [32]:
logs_t_df.groupBy("month").agg(F.count("*")).show()

+-----+--------+
|month|count(1)|
+-----+--------+
|   11|    6820|
+-----+--------+



In [33]:
logs_t_df.createOrReplaceTempView("time_db")
time_table = spark.sql(sq.time_q)

In [34]:
time_table.show(10)

+-------------------+----+---+----+-----+----+-------+
|         start_time|hour|day|week|month|year|weekday|
+-------------------+----+---+----+-----+----+-------+
|2018-11-15 01:30:26|   1| 15|  46|   11|2018|      5|
|2018-11-15 01:41:21|   1| 15|  46|   11|2018|      5|
|2018-11-15 01:45:41|   1| 15|  46|   11|2018|      5|
|2018-11-15 04:44:09|   4| 15|  46|   11|2018|      5|
|2018-11-15 06:48:55|   6| 15|  46|   11|2018|      5|
|2018-11-15 06:53:44|   6| 15|  46|   11|2018|      5|
|2018-11-15 06:55:56|   6| 15|  46|   11|2018|      5|
|2018-11-15 07:01:02|   7| 15|  46|   11|2018|      5|
|2018-11-15 07:07:37|   7| 15|  46|   11|2018|      5|
|2018-11-15 07:10:33|   7| 15|  46|   11|2018|      5|
+-------------------+----+---+----+-----+----+-------+
only showing top 10 rows



In [35]:
song_table.createOrReplaceTempView("song_table")

In [36]:
spark.sql(sq.songplays_q).show(3)

+-----------+----------+-------+-----+-------+---------+----------+--------+----------+
|songplay_id|start_time|user_id|level|song_id|artist_id|session_id|location|user_agent|
+-----------+----------+-------+-----+-------+---------+----------+--------+----------+
+-----------+----------+-------+-----+-------+---------+----------+--------+----------+

