In [1]:
import configparser
from datetime import datetime, date
import os
from pyspark.sql import SparkSession
from pyspark.sql.functions import udf, col, monotonically_increasing_id
from pyspark.sql.functions import year, month, dayofmonth, hour, weekofyear, date_format,to_date
from zipfile import ZipFile
from pyspark.sql.types import StructType, StructField, DoubleType, StringType, IntegerType, DateType, TimestampType

In [2]:
#config = configparser.ConfigParser()
#config.read('dl.cfg')

#os.environ['AWS_ACCESS_KEY_ID']=config['AWS CREDS']['AWS_ACCESS_KEY_ID']
#os.environ['AWS_SECRET_ACCESS_KEY']=config['AWS CREDS']['AWS_SECRET_ACCESS_KEY']

In [3]:
 spark = SparkSession \
        .builder \
        .config("spark.jars.packages", "org.apache.hadoop:hadoop-aws:2.7.0") \
        .getOrCreate()

In [4]:
#with ZipFile('data/song-data.zip') as myzip:
   # myzip.extractall('data/song_data')

In [5]:
#with ZipFile('data/log-data.zip') as myzip:
    #myzip.extractall('data/log_data')

In [6]:
dfSong=spark.read.json('data/song_data/*/*/*/*.json')

In [7]:
dfSong.printSchema()

root
 |-- artist_id: string (nullable = true)
 |-- artist_latitude: double (nullable = true)
 |-- artist_location: string (nullable = true)
 |-- artist_longitude: double (nullable = true)
 |-- artist_name: string (nullable = true)
 |-- duration: double (nullable = true)
 |-- num_songs: long (nullable = true)
 |-- song_id: string (nullable = true)
 |-- title: string (nullable = true)
 |-- year: long (nullable = true)



In [8]:
dfSong.show(10)
dfSong.count()

+------------------+---------------+--------------------+----------------+--------------------+---------+---------+------------------+--------------------+----+
|         artist_id|artist_latitude|     artist_location|artist_longitude|         artist_name| duration|num_songs|           song_id|               title|year|
+------------------+---------------+--------------------+----------------+--------------------+---------+---------+------------------+--------------------+----+
|ARDR4AC1187FB371A1|           null|                    |            null|Montserrat Caball...|511.16363|        1|SOBAYLL12A8C138AF9|Sono andati? Fing...|   0|
|AREBBGV1187FB523D2|           null|         Houston, TX|            null|Mike Jones (Featu...|173.66159|        1|SOOLYAZ12A6701F4A6|Laws Patrolling (...|   0|
|ARMAC4T1187FB3FA4C|       40.82624|   Morris Plains, NJ|       -74.47995|The Dillinger Esc...|207.77751|        1|SOBBUGU12A8C13E95D|Setting Fire to S...|2004|
|ARPBNLO1187FB3D52F|       40.7145

71

In [9]:
dfSong.createOrReplaceTempView('data_song_table')
song_table=spark.sql(""" SELECT DISTINCT (song_id), title, artist_id, year, duration 
                          FROM data_song_table """)
artist_table=spark.sql("""SELECT DISTINCT (artist_id), artist_name AS name, artist_location AS location, artist_latitude AS lattitude, artist_longitude AS longitude
                           FROM data_song_table""")
song_table.show(10)
artist_table.show(10)
artist_table

+------------------+--------------------+------------------+----+---------+
|           song_id|               title|         artist_id|year| duration|
+------------------+--------------------+------------------+----+---------+
|SOGNCJP12A58A80271|Do You Finally Ne...|ARB29H41187B98F0EF|1972|342.56934|
|SOOJPRH12A8C141995|   Loaded Like A Gun|ARBGXIG122988F409D|   0|173.19138|
|SOFCHDR12AB01866EF|         Living Hell|AREVWGE1187B9B890A|   0|282.43546|
|SOWTBJW12AC468AC6E|Broken-Down Merry...|ARQGYP71187FB44566|   0|151.84934|
|SOGOSOV12AF72A285E|   ¿Dónde va Chichi?|ARGUVEV1187B98BA17|1997|313.12934|
|SOTUKVB12AB0181477|   Blessed Assurance|AR7ZKHQ1187B98DD73|1993|  270.602|
|SOMVWWT12A58A7AE05|Knocked Out Of Th...|ARQ9BO41187FB5CF1F|   0|183.17016|
|SOBEBDG12A58A76D60|        Kassie Jones|ARI3BMM1187FB4255E|   0|220.78649|
|SOILPQQ12AB017E82A|Sohna Nee Sohna Data|AR1ZHYZ1187FB3C717|   0|599.24853|
|SOYMRWW12A6D4FAB14|The Moon And I (O...|ARKFYS91187B98E58F|   0| 267.7024|
+-----------

DataFrame[artist_id: string, name: string, location: string, lattitude: double, longitude: double]

In [10]:
dfLog=spark.read.json('data/log_data/')
dfLog = dfLog.filter(dfLog.page=='NextSong')
dfLog.printSchema()
dfLog

root
 |-- artist: string (nullable = true)
 |-- auth: string (nullable = true)
 |-- firstName: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- itemInSession: long (nullable = true)
 |-- lastName: string (nullable = true)
 |-- length: double (nullable = true)
 |-- level: string (nullable = true)
 |-- location: string (nullable = true)
 |-- method: string (nullable = true)
 |-- page: string (nullable = true)
 |-- registration: double (nullable = true)
 |-- sessionId: long (nullable = true)
 |-- song: string (nullable = true)
 |-- status: long (nullable = true)
 |-- ts: long (nullable = true)
 |-- userAgent: string (nullable = true)
 |-- userId: string (nullable = true)



DataFrame[artist: string, auth: string, firstName: string, gender: string, itemInSession: bigint, lastName: string, length: double, level: string, location: string, method: string, page: string, registration: double, sessionId: bigint, song: string, status: bigint, ts: bigint, userAgent: string, userId: string]

In [11]:
dfLog.count()
dfLog.take(5)

[Row(artist='Harmonia', auth='Logged In', firstName='Ryan', gender='M', itemInSession=0, lastName='Smith', length=655.77751, level='free', location='San Jose-Sunnyvale-Santa Clara, CA', method='PUT', page='NextSong', registration=1541016707796.0, sessionId=583, song='Sehr kosmisch', status=200, ts=1542241826796, userAgent='"Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Ubuntu Chromium/36.0.1985.125 Chrome/36.0.1985.125 Safari/537.36"', userId='26'),
 Row(artist='The Prodigy', auth='Logged In', firstName='Ryan', gender='M', itemInSession=1, lastName='Smith', length=260.07465, level='free', location='San Jose-Sunnyvale-Santa Clara, CA', method='PUT', page='NextSong', registration=1541016707796.0, sessionId=583, song='The Big Gundown', status=200, ts=1542242481796, userAgent='"Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Ubuntu Chromium/36.0.1985.125 Chrome/36.0.1985.125 Safari/537.36"', userId='26'),
 Row(artist='Train', auth='Logged In'

In [12]:
dfLog.createOrReplaceTempView('dat_log_table')
users_table= spark.sql("""SELECT DISTINCT(userId) AS user_id, firstName AS first_name, lastName as last_name, gender, level
                          FROM dat_log_table""")

users_table.show(10)
users_table.count()

+-------+----------+---------+------+-----+
|user_id|first_name|last_name|gender|level|
+-------+----------+---------+------+-----+
|     98|    Jordyn|   Powell|     F| free|
|     34|    Evelin|    Ayala|     F| free|
|     85|   Kinsley|    Young|     F| paid|
|     38|    Gianna|    Jones|     F| free|
|     85|   Kinsley|    Young|     F| free|
|     63|      Ayla|  Johnson|     F| free|
|     37|    Jordan|    Hicks|     F| free|
|      6|   Cecilia|    Owens|     F| free|
|     15|      Lily|     Koch|     F| paid|
|     27|    Carlos|   Carter|     M| free|
+-------+----------+---------+------+-----+
only showing top 10 rows



104

In [13]:
#get_timestamp = udf(lambda x: datetime.fromtimestamp(x/1000.0).strftime("%Y-%m-%d %H:%M:%S"))
get_timestamp = udf(lambda x: datetime.fromtimestamp(x/1000.0), TimestampType())
dfLog=dfLog.withColumn("timestamp", get_timestamp(dfLog.ts))

get_datetime = udf(lambda x: x.strftime("%Y-%m-%d %H:%M:%S"))
dfLog=dfLog.withColumn("datetime", get_datetime(dfLog.timestamp)) \
                    .withColumn("hour", hour('timestamp')) \
                    .withColumn("day", dayofmonth('timestamp')) \
                    .withColumn("week", weekofyear('timestamp')) \
                    .withColumn("month", month('timestamp')) \
                    .withColumn("year", year('timestamp')) \
                    .withColumn("Weekday", date_format('timestamp', 'E'))
dfLog.printSchema()
dfLog.take(2)

root
 |-- artist: string (nullable = true)
 |-- auth: string (nullable = true)
 |-- firstName: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- itemInSession: long (nullable = true)
 |-- lastName: string (nullable = true)
 |-- length: double (nullable = true)
 |-- level: string (nullable = true)
 |-- location: string (nullable = true)
 |-- method: string (nullable = true)
 |-- page: string (nullable = true)
 |-- registration: double (nullable = true)
 |-- sessionId: long (nullable = true)
 |-- song: string (nullable = true)
 |-- status: long (nullable = true)
 |-- ts: long (nullable = true)
 |-- userAgent: string (nullable = true)
 |-- userId: string (nullable = true)
 |-- timestamp: timestamp (nullable = true)
 |-- datetime: string (nullable = true)
 |-- hour: integer (nullable = true)
 |-- day: integer (nullable = true)
 |-- week: integer (nullable = true)
 |-- month: integer (nullable = true)
 |-- year: integer (nullable = true)
 |-- Weekday: string (nullable = tr

[Row(artist='Harmonia', auth='Logged In', firstName='Ryan', gender='M', itemInSession=0, lastName='Smith', length=655.77751, level='free', location='San Jose-Sunnyvale-Santa Clara, CA', method='PUT', page='NextSong', registration=1541016707796.0, sessionId=583, song='Sehr kosmisch', status=200, ts=1542241826796, userAgent='"Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Ubuntu Chromium/36.0.1985.125 Chrome/36.0.1985.125 Safari/537.36"', userId='26', timestamp=datetime.datetime(2018, 11, 15, 0, 30, 26, 796000), datetime='2018-11-15 00:30:26', hour=0, day=15, week=46, month=11, year=2018, Weekday='Thu'),
 Row(artist='The Prodigy', auth='Logged In', firstName='Ryan', gender='M', itemInSession=1, lastName='Smith', length=260.07465, level='free', location='San Jose-Sunnyvale-Santa Clara, CA', method='PUT', page='NextSong', registration=1541016707796.0, sessionId=583, song='The Big Gundown', status=200, ts=1542242481796, userAgent='"Mozilla/5.0 (X11; Linux x86_64) App

In [14]:
dfLog.createOrReplaceTempView('date_log_table')
time_table = spark.sql("""SELECT DISTINCT(datetime) AS start_time, hour, day, month,year, weekday
                        FROM date_log_table""")
time_table.show(5)
time_table.count()

+-------------------+----+---+-----+----+-------+
|         start_time|hour|day|month|year|weekday|
+-------------------+----+---+-----+----+-------+
|2018-11-21 05:18:03|   5| 21|   11|2018|    Wed|
|2018-11-21 05:59:29|   5| 21|   11|2018|    Wed|
|2018-11-21 11:54:20|  11| 21|   11|2018|    Wed|
|2018-11-21 23:28:20|  23| 21|   11|2018|    Wed|
|2018-11-14 16:26:21|  16| 14|   11|2018|    Wed|
+-------------------+----+---+-----+----+-------+
only showing top 5 rows



6813

In [15]:
#song_log_join= spark.sql("""SELECT * FROM data_song_table""")
#get_timestamp = udf(lambda x: datetime.fromtimestamp(x/1000.0), TimestampType())
#get_datetime = udf(lambda x: x.strftime("%Y-%m-%d %H:%M:%S"))

songplays_table= spark.sql(""" SELECT monotonically_increasing_id() AS  songplay_id, dat_log_table.userId AS user_id, dat_log_table.level, 
                                     data_song_table.song_id, data_song_table.artist_id, dat_log_table.sessionId AS session_id,
                                     data_song_table.artist_location AS location, dat_log_table.userAgent AS user_agent
                                FROM dat_log_table 
                                JOIN data_song_table ON dat_log_table.artist==data_song_table.artist_name
                                                      AND dat_log_table.song == data_song_table.title
                                """) 
songplays_table.show(50)
songplays_table.count()

+-----------+-------+-----+------------------+------------------+----------+---------+--------------------+
|songplay_id|user_id|level|           song_id|         artist_id|session_id| location|          user_agent|
+-----------+-------+-----+------------------+------------------+----------+---------+--------------------+
|          0|     15| paid|SOZCTXZ12AB0182364|AR5KOSW1187FB35FF4|       818|Dubai UAE|"Mozilla/5.0 (X11...|
+-----------+-------+-----+------------------+------------------+----------+---------+--------------------+



1

In [16]:
df_songplays_table= dfLog.join(dfSong, (dfLog.artist==dfSong.artist_name) \
                              & (dfLog.song==dfSong.title )) \
                             .select( monotonically_increasing_id().alias('songplay_id'), col('userId').alias('user_id'), col('level'), \
                                     col('song_id'), col('artist_id'), col('sessionId').alias('session_id'), col('artist_location').alias('location'), \
                                     col('userAgent').alias('user_agent'), col('datetime').alias('start_time'),col('month'), dfLog.year).dropDuplicates()
df_songplays_table.printSchema()
df_songplays_table.show(3)

root
 |-- songplay_id: long (nullable = false)
 |-- user_id: string (nullable = true)
 |-- level: string (nullable = true)
 |-- song_id: string (nullable = true)
 |-- artist_id: string (nullable = true)
 |-- session_id: long (nullable = true)
 |-- location: string (nullable = true)
 |-- user_agent: string (nullable = true)
 |-- start_time: string (nullable = true)
 |-- month: integer (nullable = true)
 |-- year: integer (nullable = true)

+-----------+-------+-----+------------------+------------------+----------+---------+--------------------+-------------------+-----+----+
|songplay_id|user_id|level|           song_id|         artist_id|session_id| location|          user_agent|         start_time|month|year|
+-----------+-------+-----+------------------+------------------+----------+---------+--------------------+-------------------+-----+----+
|          0|     15| paid|SOZCTXZ12AB0182364|AR5KOSW1187FB35FF4|       818|Dubai UAE|"Mozilla/5.0 (X11...|2018-11-21 21:56:47|   11|2018|
+

In [18]:
input_data ="data/"
song_data=os.path.join(input_data, 'song_data', '*', '*', '*', '*.json')
song_df=spark.read.json(song_data)
song_df.printSchema()

root
 |-- artist_id: string (nullable = true)
 |-- artist_latitude: double (nullable = true)
 |-- artist_location: string (nullable = true)
 |-- artist_longitude: double (nullable = true)
 |-- artist_name: string (nullable = true)
 |-- duration: double (nullable = true)
 |-- num_songs: long (nullable = true)
 |-- song_id: string (nullable = true)
 |-- title: string (nullable = true)
 |-- year: long (nullable = true)



In [19]:
df_songplays_table= dfLog.join(song_df, dfLog.song==song_df.title) \
                                    .select( monotonically_increasing_id().alias('songplay_id'), col('userId').alias('user_id'), col('level'), \
                                     col('song_id'), col('artist_id'), col('sessionId').alias('session_id'), (song_df.artist_location).alias('location'), \
                                     col('userAgent').alias('user_agent'), col('datetime').alias('start_time'),col('month'), dfLog.year).dropDuplicates()
df_songplays_table.printSchema()
df_songplays_table.show(3)

root
 |-- songplay_id: long (nullable = false)
 |-- user_id: string (nullable = true)
 |-- level: string (nullable = true)
 |-- song_id: string (nullable = true)
 |-- artist_id: string (nullable = true)
 |-- session_id: long (nullable = true)
 |-- location: string (nullable = true)
 |-- user_agent: string (nullable = true)
 |-- start_time: string (nullable = true)
 |-- month: integer (nullable = true)
 |-- year: integer (nullable = true)

+-----------+-------+-----+------------------+------------------+----------+---------+--------------------+-------------------+-----+----+
|songplay_id|user_id|level|           song_id|         artist_id|session_id| location|          user_agent|         start_time|month|year|
+-----------+-------+-----+------------------+------------------+----------+---------+--------------------+-------------------+-----+----+
|          0|     15| paid|SOZCTXZ12AB0182364|AR5KOSW1187FB35FF4|       818|Dubai UAE|"Mozilla/5.0 (X11...|2018-11-21 21:56:47|   11|2018|
|

In [21]:
output_data = "spark-warehouse/"
songs_df_ = spark.read.parquet('{}songs/'.format(output_data))
print("Read songs from parquet")
artists_df_ = spark.read.parquet('{}artists/'.format(output_data))
print("Read artists from parquet")

Read songs from parquet
Read artists from parquet


In [22]:
songs_df_.printSchema()

root
 |-- song_id: string (nullable = true)
 |-- title: string (nullable = true)
 |-- duration: double (nullable = true)
 |-- year: integer (nullable = true)
 |-- artist_id: string (nullable = true)



In [23]:
artists_df_.printSchema()

root
 |-- artist_id: string (nullable = true)
 |-- name: string (nullable = true)
 |-- location: string (nullable = true)
 |-- lattitude: double (nullable = true)
 |-- longitude: double (nullable = true)



In [24]:
songs_artists_df = songs_df_.join(artists_df_, artists_df_.artist_id==songs_df_.artist_id).drop(songs_df_.artist_id)
songs_artists_df.printSchema()

root
 |-- song_id: string (nullable = true)
 |-- title: string (nullable = true)
 |-- duration: double (nullable = true)
 |-- year: integer (nullable = true)
 |-- artist_id: string (nullable = true)
 |-- name: string (nullable = true)
 |-- location: string (nullable = true)
 |-- lattitude: double (nullable = true)
 |-- longitude: double (nullable = true)



In [27]:
df_songplays_table_= dfLog.join(songs_artists_df, (dfLog.song==songs_artists_df.title) \
                                & (songs_artists_df.name==dfLog.artist)) \
                                    .select( monotonically_increasing_id().alias('songplay_id'), col('userId').alias('user_id'), col('level'), \
                                     col('song_id'), col('artist_id'), col('sessionId').alias('session_id'), (songs_artists_df.location), \
                                     col('userAgent').alias('user_agent'), col('datetime').alias('start_time'),col('month'), dfLog.year).dropDuplicates()
df_songplays_table_.printSchema()
df_songplays_table_.show(3)

root
 |-- songplay_id: long (nullable = false)
 |-- user_id: string (nullable = true)
 |-- level: string (nullable = true)
 |-- song_id: string (nullable = true)
 |-- artist_id: string (nullable = true)
 |-- session_id: long (nullable = true)
 |-- location: string (nullable = true)
 |-- user_agent: string (nullable = true)
 |-- start_time: string (nullable = true)
 |-- month: integer (nullable = true)
 |-- year: integer (nullable = true)

+-----------+-------+-----+------------------+------------------+----------+---------+--------------------+-------------------+-----+----+
|songplay_id|user_id|level|           song_id|         artist_id|session_id| location|          user_agent|         start_time|month|year|
+-----------+-------+-----+------------------+------------------+----------+---------+--------------------+-------------------+-----+----+
| 8589934592|     15| paid|SOZCTXZ12AB0182364|AR5KOSW1187FB35FF4|       818|Dubai UAE|"Mozilla/5.0 (X11...|2018-11-21 21:56:47|   11|2018|
+