In [1]:
from pyspark.sql import SparkSession
import pandas as pd
from pyspark.sql.types import StructType as R, StructField as Fld, DoubleType as Dbl, StringType as Str, IntegerType as Int, DateType as Date, LongType as Long
import pyspark.sql.functions as F
from datetime import datetime


In [2]:
spark = SparkSession.builder\
                     .config("spark.jars.packages","org.apache.hadoop:hadoop-aws:2.7.0")\
                     .getOrCreate()

In [3]:
spark.conf.set("mapreduce.fileoutputcommitter.algorithm.version", "2")

In [4]:
songSchema = R([
    Fld('artist_id',Str()),
    Fld('artist_latitude',Dbl()),
    Fld('artist_location',Str()),
    Fld('artist_longitude',Dbl()),
    Fld('artist_name',Str()),
    Fld('duration',Dbl()),
    Fld('num_songs',Int()),
    Fld('song_id',Str()),
    Fld('title',Str()),
    Fld('year',Int())
])

In [5]:
df_song_small = spark.read.json("./data/song_data/A/A/*/*.json",schema=songSchema)

In [7]:
df_song_small.count()

11

In [8]:
df_song_small.createOrReplaceTempView('songs')

In [9]:
spark.sql("""
SELECT song_id from songs
""").show()

+------------------+
|           song_id|
+------------------+
|SOYMRWW12A6D4FAB14|
|SOHKNRJ12A6701D1F8|
|SOQHXMF12AB0182363|
|SOCIWDW12A8C13D406|
|SONHOTT12A8C13493C|
|SOMZWCG12A8C13C480|
|SOUDSGM12AC9618304|
|SOMJBYD12A6D4F8557|
|SOXVLOJ12AB0189215|
|SOIAZJW12AB01853F1|
|SOFSOCN12A8C143F5D|
+------------------+



In [10]:
songs_table = df_song_small.select('song_id', 'title', 'artist_id','year', 'duration').dropDuplicates()

In [11]:
artists_table = df_song_small.select('artist_id','artist_name','artist_location','artist_latitude','artist_longitude').dropDuplicates()

In [12]:
artists_table.createOrReplaceTempView('artists')

In [13]:
spark.sql("""
select * from artists
""").show(5)

+------------------+----------------+---------------+---------------+----------------+
|         artist_id|     artist_name|artist_location|artist_latitude|artist_longitude|
+------------------+----------------+---------------+---------------+----------------+
|ARNTLGG11E2835DDB9|             Clp|               |           null|            null|
|ARMJAGH1187FB546F3|    The Box Tops|    Memphis, TN|       35.14968|       -90.04892|
|ARKRRTF1187B9984DA|Sonora Santanera|               |           null|            null|
|ARXR32B1187FB57099|             Gob|               |           null|            null|
|AR8ZCNI1187B9A069B|Planet P Project|               |           null|            null|
+------------------+----------------+---------------+---------------+----------------+
only showing top 5 rows



In [14]:
artists_table = artists_table.withColumnRenamed('artist_name','name')\
                             .withColumnRenamed('artist_location','location')\
                             .withColumnRenamed('artist_latitude','latitude')\
                             .withColumnRenamed('artist_longitude','longitude')

In [15]:
artists_table.printSchema()

root
 |-- artist_id: string (nullable = true)
 |-- name: string (nullable = true)
 |-- location: string (nullable = true)
 |-- latitude: double (nullable = true)
 |-- longitude: double (nullable = true)



In [16]:
artists_table.createOrReplaceTempView('artists')

In [17]:
spark.sql("""
select * from artists
""").show(5)

+------------------+----------------+-----------+--------+---------+
|         artist_id|            name|   location|latitude|longitude|
+------------------+----------------+-----------+--------+---------+
|ARNTLGG11E2835DDB9|             Clp|           |    null|     null|
|ARMJAGH1187FB546F3|    The Box Tops|Memphis, TN|35.14968|-90.04892|
|ARKRRTF1187B9984DA|Sonora Santanera|           |    null|     null|
|ARXR32B1187FB57099|             Gob|           |    null|     null|
|AR8ZCNI1187B9A069B|Planet P Project|           |    null|     null|
+------------------+----------------+-----------+--------+---------+
only showing top 5 rows



In [18]:
df_log_small = spark.read.json("./data/log_data/2018/11/2018-11-2*.json")

In [28]:
df_log_small.count()

3082

In [29]:
df_log_small.printSchema()

root
 |-- artist: string (nullable = true)
 |-- auth: string (nullable = true)
 |-- firstName: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- itemInSession: long (nullable = true)
 |-- lastName: string (nullable = true)
 |-- length: double (nullable = true)
 |-- level: string (nullable = true)
 |-- location: string (nullable = true)
 |-- method: string (nullable = true)
 |-- page: string (nullable = true)
 |-- registration: double (nullable = true)
 |-- sessionId: long (nullable = true)
 |-- song: string (nullable = true)
 |-- status: long (nullable = true)
 |-- ts: long (nullable = true)
 |-- userAgent: string (nullable = true)
 |-- userId: string (nullable = true)



In [19]:
%%time
df_log_small = spark.read.json("./data/log_data/2018/11/*.json")

CPU times: user 1.85 ms, sys: 418 µs, total: 2.27 ms
Wall time: 647 ms


In [20]:
df_log_small.count()

8056

In [21]:
df_log_filtered = df_log_small.filter(df_log_small.page == "NextSong")

In [39]:
df_log_small.printSchema()

root
 |-- artist: string (nullable = true)
 |-- auth: string (nullable = true)
 |-- firstName: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- itemInSession: long (nullable = true)
 |-- lastName: string (nullable = true)
 |-- length: double (nullable = true)
 |-- level: string (nullable = true)
 |-- location: string (nullable = true)
 |-- method: string (nullable = true)
 |-- page: string (nullable = true)
 |-- registration: double (nullable = true)
 |-- sessionId: long (nullable = true)
 |-- song: string (nullable = true)
 |-- status: long (nullable = true)
 |-- ts: long (nullable = true)
 |-- userAgent: string (nullable = true)
 |-- userId: string (nullable = true)



In [22]:
df_log_filtered.createOrReplaceTempView('log')

In [23]:
spark.sql("""
select * from log
""").show(5)

+-----------+---------+---------+------+-------------+--------+---------+-----+--------------------+------+--------+-----------------+---------+--------------------+------+-------------+--------------------+------+
|     artist|     auth|firstName|gender|itemInSession|lastName|   length|level|            location|method|    page|     registration|sessionId|                song|status|           ts|           userAgent|userId|
+-----------+---------+---------+------+-------------+--------+---------+-----+--------------------+------+--------+-----------------+---------+--------------------+------+-------------+--------------------+------+
|   Harmonia|Logged In|     Ryan|     M|            0|   Smith|655.77751| free|San Jose-Sunnyval...|   PUT|NextSong|1.541016707796E12|      583|       Sehr kosmisch|   200|1542241826796|"Mozilla/5.0 (X11...|    26|
|The Prodigy|Logged In|     Ryan|     M|            1|   Smith|260.07465| free|San Jose-Sunnyval...|   PUT|NextSong|1.541016707796E12|      

In [24]:
spark.sql("""
select Distinct page,count(*) as cnt from log
group by page
order by cnt
""").show()

+--------+----+
|    page| cnt|
+--------+----+
|NextSong|6820|
+--------+----+



In [25]:
get_timestamp = F.udf(lambda x : datetime.fromtimestamp(x/1000.0).strftime('%Y-%m-%d %H:%M:%S'))

In [26]:
df_log_filtered = df_log_filtered.withColumn('start_time', get_timestamp(df_log_filtered.ts)) 

In [27]:
df_log_filtered.show(5)

+-----------+---------+---------+------+-------------+--------+---------+-----+--------------------+------+--------+-----------------+---------+--------------------+------+-------------+--------------------+------+-------------------+
|     artist|     auth|firstName|gender|itemInSession|lastName|   length|level|            location|method|    page|     registration|sessionId|                song|status|           ts|           userAgent|userId|         start_time|
+-----------+---------+---------+------+-------------+--------+---------+-----+--------------------+------+--------+-----------------+---------+--------------------+------+-------------+--------------------+------+-------------------+
|   Harmonia|Logged In|     Ryan|     M|            0|   Smith|655.77751| free|San Jose-Sunnyval...|   PUT|NextSong|1.541016707796E12|      583|       Sehr kosmisch|   200|1542241826796|"Mozilla/5.0 (X11...|    26|2018-11-15 00:30:26|
|The Prodigy|Logged In|     Ryan|     M|            1|   Smi

In [28]:
time_table = df_log_filtered.select(df_log_filtered.start_time.alias('start_time'), \
                F.hour(df_log_filtered.start_time).alias('hour'),\
                F.dayofmonth(df_log_filtered.start_time).alias('day'),\
                F.weekofyear(df_log_filtered.start_time).alias('week'),\
                F.month(df_log_filtered.start_time).alias('month'),\
                F.year(df_log_filtered.start_time).alias('year'),
                F.dayofweek(df_log_filtered.start_time).alias('weekday')).dropDuplicates()

In [29]:
time_table.printSchema()

root
 |-- start_time: string (nullable = true)
 |-- hour: integer (nullable = true)
 |-- day: integer (nullable = true)
 |-- week: integer (nullable = true)
 |-- month: integer (nullable = true)
 |-- year: integer (nullable = true)
 |-- weekday: integer (nullable = true)



In [30]:
time_table.show(5)

+-------------------+----+---+----+-----+----+-------+
|         start_time|hour|day|week|month|year|weekday|
+-------------------+----+---+----+-----+----+-------+
|2018-11-15 12:38:03|  12| 15|  46|   11|2018|      5|
|2018-11-15 22:00:58|  22| 15|  46|   11|2018|      5|
|2018-11-21 19:00:45|  19| 21|  47|   11|2018|      4|
|2018-11-21 20:22:17|  20| 21|  47|   11|2018|      4|
|2018-11-21 22:26:57|  22| 21|  47|   11|2018|      4|
+-------------------+----+---+----+-----+----+-------+
only showing top 5 rows



In [66]:
df_log_filtered.printSchema()

root
 |-- artist: string (nullable = true)
 |-- auth: string (nullable = true)
 |-- firstName: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- itemInSession: long (nullable = true)
 |-- lastName: string (nullable = true)
 |-- length: double (nullable = true)
 |-- level: string (nullable = true)
 |-- location: string (nullable = true)
 |-- method: string (nullable = true)
 |-- page: string (nullable = true)
 |-- registration: double (nullable = true)
 |-- sessionId: long (nullable = true)
 |-- song: string (nullable = true)
 |-- status: long (nullable = true)
 |-- ts: long (nullable = true)
 |-- userAgent: string (nullable = true)
 |-- userId: string (nullable = true)
 |-- start_time: string (nullable = true)



In [32]:
users_table = df_log_filtered.select('userId','firstName','lastName','gender','level').dropDuplicates()

In [33]:
users_table = users_table.withColumnRenamed('userId','user_id')\
                             .withColumnRenamed('firstName','first_name')\
                             .withColumnRenamed('lastName','last_name')\


In [34]:
users_table.count()

104

In [35]:
users_table.printSchema()

root
 |-- user_id: string (nullable = true)
 |-- first_name: string (nullable = true)
 |-- last_name: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- level: string (nullable = true)



In [36]:
users_table.show(5)

+-------+----------+---------+------+-----+
|user_id|first_name|last_name|gender|level|
+-------+----------+---------+------+-----+
|     57| Katherine|      Gay|     F| free|
|     84|   Shakira|     Hunt|     F| free|
|     22|      Sean|   Wilson|     F| free|
|     52|  Theodore|    Smith|     M| free|
|     80|     Tegan|   Levine|     F| paid|
+-------+----------+---------+------+-----+
only showing top 5 rows

