In [1]:
import configparser
from datetime import datetime
import os
from pyspark.sql import SparkSession
from pyspark.sql.functions import udf, col
from pyspark.sql.functions import year, month, dayofmonth, hour, weekofyear, date_format
from pyspark.sql.types import DateType, TimestampType

In [2]:
# Configure environment
config = configparser.ConfigParser()
config.read('dl.cfg')

os.environ['AWS_ACCESS_KEY_ID']=config['AWS']['AWS_ACCESS_KEY_ID']
os.environ['AWS_SECRET_ACCESS_KEY']=config['AWS']['AWS_SECRET_ACCESS_KEY']

In [3]:
# create Spark session
spark = SparkSession \
        .builder \
        .config("spark.jars.packages", "org.apache.hadoop:hadoop-aws:2.7.0") \
        .getOrCreate()

In [4]:
# get filepath to local input and output data file
input_data = "data/"
output_data = "output/"

In [5]:
# get filepath to local song data file
song_data = input_data + 'song_data/*/*/*/*.json'

In [6]:
 # read song data file
df = spark.read.json(song_data)

In [7]:
# create a temporary view against which SQL queries can be run
df.createOrReplaceTempView('songs_table')

In [8]:
print(df.count())

71


In [9]:
df.printSchema()

root
 |-- artist_id: string (nullable = true)
 |-- artist_latitude: double (nullable = true)
 |-- artist_location: string (nullable = true)
 |-- artist_longitude: double (nullable = true)
 |-- artist_name: string (nullable = true)
 |-- duration: double (nullable = true)
 |-- num_songs: long (nullable = true)
 |-- song_id: string (nullable = true)
 |-- title: string (nullable = true)
 |-- year: long (nullable = true)



In [10]:
df.limit(5).toPandas()

Unnamed: 0,artist_id,artist_latitude,artist_location,artist_longitude,artist_name,duration,num_songs,song_id,title,year
0,ARDR4AC1187FB371A1,,,,Montserrat Caballé;Placido Domingo;Vicente Sar...,511.16363,1,SOBAYLL12A8C138AF9,Sono andati? Fingevo di dormire,0
1,AREBBGV1187FB523D2,,"Houston, TX",,Mike Jones (Featuring CJ_ Mello & Lil' Bran),173.66159,1,SOOLYAZ12A6701F4A6,Laws Patrolling (Album Version),0
2,ARMAC4T1187FB3FA4C,40.82624,"Morris Plains, NJ",-74.47995,The Dillinger Escape Plan,207.77751,1,SOBBUGU12A8C13E95D,Setting Fire to Sleeping Giants,2004
3,ARPBNLO1187FB3D52F,40.71455,"New York, NY",-74.00712,Tiny Tim,43.36281,1,SOAOIBZ12AB01815BE,I Hold Your Hand In Mine [Live At Royal Albert...,2000
4,ARDNS031187B9924F0,32.67828,Georgia,-83.22295,Tim Wilson,186.48771,1,SONYPOM12A8C13B2D7,I Think My Wife Is Running Around On Me (Taco ...,2005


In [11]:
# extract columns to create songs table
songs_table = spark.sql("""
                        SELECT song_id, 
                               title, 
                               artist_id, 
                               year, 
                               duration, 
                               artist_name
                        FROM songs_table
                        WHERE song_id IS NOT NULL
                        """)

In [12]:
songs_table.show()

+------------------+--------------------+------------------+----+---------+--------------------+
|           song_id|               title|         artist_id|year| duration|         artist_name|
+------------------+--------------------+------------------+----+---------+--------------------+
|SOBAYLL12A8C138AF9|Sono andati? Fing...|ARDR4AC1187FB371A1|   0|511.16363|Montserrat Caball...|
|SOOLYAZ12A6701F4A6|Laws Patrolling (...|AREBBGV1187FB523D2|   0|173.66159|Mike Jones (Featu...|
|SOBBUGU12A8C13E95D|Setting Fire to S...|ARMAC4T1187FB3FA4C|2004|207.77751|The Dillinger Esc...|
|SOAOIBZ12AB01815BE|I Hold Your Hand ...|ARPBNLO1187FB3D52F|2000| 43.36281|            Tiny Tim|
|SONYPOM12A8C13B2D7|I Think My Wife I...|ARDNS031187B9924F0|2005|186.48771|          Tim Wilson|
|SONWXQJ12A8C134D94|The Ballad Of Sle...|ARNF6401187FB57032|1994|  305.162|   Sophie B. Hawkins|
|SODREIN12A58A7F2E5|A Whiter Shade Of...|ARLTWXK1187FB5A3F8|   0|326.00771|         King Curtis|
|SOWQTQZ12A58A7B63E|Streets On

In [13]:
# write songs table to parquet files partitioned by year and artist
songs_table.write.parquet(os.path.join(output_data, 'songs_table/'), \
                              mode='overwrite', partitionBy=['year', 'artist_id'])

In [14]:
# extract columns to create artists table
artists_table = spark.sql("""
                              SELECT DISTINCT artist_id, 
                                              artist_name as name, 
                                              artist_location as location, 
                                              artist_latitude as latitude, 
                                              artist_longitude as longitude
                              FROM songs_table
                              WHERE artist_id IS NOT NULL
                              """)

In [15]:
artists_table.show()

+------------------+--------------------+--------------------+--------+----------+
|         artist_id|                name|            location|latitude| longitude|
+------------------+--------------------+--------------------+--------+----------+
|ARPBNLO1187FB3D52F|            Tiny Tim|        New York, NY|40.71455| -74.00712|
|ARBEBBY1187B9B43DB|           Tom Petty|     Gainesville, FL|    null|      null|
|AR0IAWL1187B9A96D0|        Danilo Perez|              Panama|  8.4177| -80.11278|
|ARMBR4Y1187B9990EB|        David Martin|     California - SF|37.77916|-122.42005|
|ARD0S291187B9B7BF5|             Rated R|                Ohio|    null|      null|
|AR0RCMP1187FB3F427|    Billie Jo Spears|        Beaumont, TX|30.08615| -94.10158|
|ARKRRTF1187B9984DA|    Sonora Santanera|                    |    null|      null|
|ARHHO3O1187B989413|           Bob Azzam|                    |    null|      null|
|ARJIE2Y1187B994AB7|         Line Renaud|                    |    null|      null|
|ARG

In [16]:
# write artists table to parquet files
artists_table.write.parquet(os.path.join(output_data, 'artists_table/'), mode='overwrite')

In [17]:
# get filepath to local log data file
log_data = input_data + 'log_data/*.json'

In [18]:
# read log data file
df = spark.read.json(log_data)

In [19]:
# filter by actions for song plays
df = df.filter(df.page == 'NextSong')

In [20]:
# create a temporary view against which SQL queries can be run
df.createOrReplaceTempView('log_table')

In [21]:
df.limit(5).toPandas()

Unnamed: 0,artist,auth,firstName,gender,itemInSession,lastName,length,level,location,method,page,registration,sessionId,song,status,ts,userAgent,userId
0,Harmonia,Logged In,Ryan,M,0,Smith,655.77751,free,"San Jose-Sunnyvale-Santa Clara, CA",PUT,NextSong,1541017000000.0,583,Sehr kosmisch,200,1542241826796,"""Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/5...",26
1,The Prodigy,Logged In,Ryan,M,1,Smith,260.07465,free,"San Jose-Sunnyvale-Santa Clara, CA",PUT,NextSong,1541017000000.0,583,The Big Gundown,200,1542242481796,"""Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/5...",26
2,Train,Logged In,Ryan,M,2,Smith,205.45261,free,"San Jose-Sunnyvale-Santa Clara, CA",PUT,NextSong,1541017000000.0,583,Marry Me,200,1542242741796,"""Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/5...",26
3,Sony Wonder,Logged In,Samuel,M,0,Gonzalez,218.06975,free,"Houston-The Woodlands-Sugar Land, TX",PUT,NextSong,1540493000000.0,597,Blackbird,200,1542253449796,"""Mozilla/5.0 (Macintosh; Intel Mac OS X 10_9_4...",61
4,Van Halen,Logged In,Tegan,F,2,Levine,289.38404,paid,"Portland-South Portland, ME",PUT,NextSong,1540794000000.0,602,Best Of Both Worlds (Remastered Album Version),200,1542260935796,"""Mozilla/5.0 (Macintosh; Intel Mac OS X 10_9_4...",80


In [22]:
# extract columns for users table   
users_table = spark.sql("""
                        SELECT DISTINCT userId as user_id, 
                                        firstName as first_name, 
                                        lastName as last_name, 
                                        gender, 
                                        level
                        FROM log_table
                        WHERE userId IS NOT NULL
                        """)

In [23]:
# write users table to parquet files
users_table.write.parquet(os.path.join(output_data, 'users/'), mode='overwrite')

In [24]:
users_table.show()

+-------+----------+---------+------+-----+
|user_id|first_name|last_name|gender|level|
+-------+----------+---------+------+-----+
|     98|    Jordyn|   Powell|     F| free|
|     34|    Evelin|    Ayala|     F| free|
|     85|   Kinsley|    Young|     F| paid|
|     38|    Gianna|    Jones|     F| free|
|     85|   Kinsley|    Young|     F| free|
|     63|      Ayla|  Johnson|     F| free|
|     37|    Jordan|    Hicks|     F| free|
|      6|   Cecilia|    Owens|     F| free|
|     15|      Lily|     Koch|     F| paid|
|     27|    Carlos|   Carter|     M| free|
|     89|   Kynnedi|  Sanchez|     F| free|
|     57| Katherine|      Gay|     F| free|
|     74|    Braden|   Parker|     M| free|
|     29|Jacqueline|    Lynch|     F| paid|
|     75|    Joseph|Gutierrez|     M| free|
|     61|    Samuel| Gonzalez|     M| free|
|     88|  Mohammad|Rodriguez|     M| free|
|     64|    Hannah|  Calhoun|     F| free|
|     15|      Lily|     Koch|     F| free|
|     95|      Sara|  Johnson|  

In [25]:
# create timestamp column from original timestamp column
get_timestamp = udf(lambda x:  datetime.fromtimestamp((x/1000)), TimestampType())
df = df.withColumn('timestamp', get_timestamp(df.ts))

In [26]:
# create datetime column from original timestamp column
get_datetime = udf(lambda x: datetime.fromtimestamp((x/1000)), DateType())
df = df.withColumn('datetime', get_datetime(df.ts))

In [27]:
# create a temporary view against which SQL queries can be run
df.createOrReplaceTempView('time_table')

In [28]:
df.limit(5).toPandas()

Unnamed: 0,artist,auth,firstName,gender,itemInSession,lastName,length,level,location,method,page,registration,sessionId,song,status,ts,userAgent,userId,timestamp,datetime
0,Harmonia,Logged In,Ryan,M,0,Smith,655.77751,free,"San Jose-Sunnyvale-Santa Clara, CA",PUT,NextSong,1541017000000.0,583,Sehr kosmisch,200,1542241826796,"""Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/5...",26,2018-11-15 00:30:26.796,2018-11-15
1,The Prodigy,Logged In,Ryan,M,1,Smith,260.07465,free,"San Jose-Sunnyvale-Santa Clara, CA",PUT,NextSong,1541017000000.0,583,The Big Gundown,200,1542242481796,"""Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/5...",26,2018-11-15 00:41:21.796,2018-11-15
2,Train,Logged In,Ryan,M,2,Smith,205.45261,free,"San Jose-Sunnyvale-Santa Clara, CA",PUT,NextSong,1541017000000.0,583,Marry Me,200,1542242741796,"""Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/5...",26,2018-11-15 00:45:41.796,2018-11-15
3,Sony Wonder,Logged In,Samuel,M,0,Gonzalez,218.06975,free,"Houston-The Woodlands-Sugar Land, TX",PUT,NextSong,1540493000000.0,597,Blackbird,200,1542253449796,"""Mozilla/5.0 (Macintosh; Intel Mac OS X 10_9_4...",61,2018-11-15 03:44:09.796,2018-11-15
4,Van Halen,Logged In,Tegan,F,2,Levine,289.38404,paid,"Portland-South Portland, ME",PUT,NextSong,1540794000000.0,602,Best Of Both Worlds (Remastered Album Version),200,1542260935796,"""Mozilla/5.0 (Macintosh; Intel Mac OS X 10_9_4...",80,2018-11-15 05:48:55.796,2018-11-15


In [29]:
# extract columns to create time table
time_table = spark.sql("""
                       SELECT timestamp as start_time,  
                              hour(timestamp) as hour, 
                              dayofmonth(timestamp) as day, 
                              weekofyear(timestamp) as week, 
                              month(timestamp) as month, 
                              year(timestamp) as year, 
                              dayofweek(timestamp) as weekday 
                       FROM time_table
                       WHERE timestamp IS NOT NULL
                       ORDER BY timestamp
                       """)

In [30]:
time_table.show()

+--------------------+----+---+----+-----+----+-------+
|          start_time|hour|day|week|month|year|weekday|
+--------------------+----+---+----+-----+----+-------+
|2018-11-01 21:01:...|  21|  1|  44|   11|2018|      5|
|2018-11-01 21:05:...|  21|  1|  44|   11|2018|      5|
|2018-11-01 21:08:...|  21|  1|  44|   11|2018|      5|
|2018-11-01 21:11:...|  21|  1|  44|   11|2018|      5|
|2018-11-01 21:17:...|  21|  1|  44|   11|2018|      5|
|2018-11-01 21:24:...|  21|  1|  44|   11|2018|      5|
|2018-11-01 21:28:...|  21|  1|  44|   11|2018|      5|
|2018-11-01 21:42:...|  21|  1|  44|   11|2018|      5|
|2018-11-01 21:52:...|  21|  1|  44|   11|2018|      5|
|2018-11-01 21:55:...|  21|  1|  44|   11|2018|      5|
|2018-11-01 22:23:...|  22|  1|  44|   11|2018|      5|
|2018-11-02 01:25:...|   1|  2|  44|   11|2018|      6|
|2018-11-02 01:30:...|   1|  2|  44|   11|2018|      6|
|2018-11-02 01:34:...|   1|  2|  44|   11|2018|      6|
|2018-11-02 02:42:...|   2|  2|  44|   11|2018| 

In [31]:
# write time table to parquet files partitioned by year and month
time_table.write.parquet(os.path.join(output_data, 'time_table/'), mode='overwrite', partitionBy=['year', 'month'])

In [32]:
# read in song data to use for songplays table
song_df = spark.read.parquet(output_data + 'songs_table/')

In [33]:
# create a temporary view against which SQL queries can be run
song_df.createOrReplaceTempView('song_table')

In [34]:
song_df.show()

+------------------+--------------------+---------+--------------------+----+------------------+
|           song_id|               title| duration|         artist_name|year|         artist_id|
+------------------+--------------------+---------+--------------------+----+------------------+
|SOBAYLL12A8C138AF9|Sono andati? Fing...|511.16363|Montserrat Caball...|   0|ARDR4AC1187FB371A1|
|SOOLYAZ12A6701F4A6|Laws Patrolling (...|173.66159|Mike Jones (Featu...|   0|AREBBGV1187FB523D2|
|SOYMRWW12A6D4FAB14|The Moon And I (O...| 267.7024|Jeff And Sheri Ea...|   0|ARKFYS91187B98E58F|
|SONYPOM12A8C13B2D7|I Think My Wife I...|186.48771|          Tim Wilson|2005|ARDNS031187B9924F0|
|SOAOIBZ12AB01815BE|I Hold Your Hand ...| 43.36281|            Tiny Tim|2000|ARPBNLO1187FB3D52F|
|SODREIN12A58A7F2E5|A Whiter Shade Of...|326.00771|         King Curtis|   0|ARLTWXK1187FB5A3F8|
|SOBBUGU12A8C13E95D|Setting Fire to S...|207.77751|The Dillinger Esc...|2004|ARMAC4T1187FB3FA4C|
|SOWQTQZ12A58A7B63E|Streets On

In [35]:
# extract columns from joined song and log datasets to create songplays table 
songplays_table = df.join(song_df, (df.song == song_df.title) & (df.artist == song_df.artist_name) \
                          & (df.length == song_df.duration), 'left_outer').select(
                             df.timestamp,
                             col("userId").alias('user_id'),
                             df.level,
                             song_df.song_id,
                             song_df.artist_id,
                             col("sessionId").alias("session_id"),
                             df.location,
                             col("useragent").alias("user_agent"),
                             year('datetime').alias('year'),
                             month('datetime').alias('month')) 

In [36]:
songplays_table.show()

+--------------------+-------+-----+-------+---------+----------+--------------------+--------------------+----+-----+
|           timestamp|user_id|level|song_id|artist_id|session_id|            location|          user_agent|year|month|
+--------------------+-------+-----+-------+---------+----------+--------------------+--------------------+----+-----+
|2018-11-15 00:30:...|     26| free|   null|     null|       583|San Jose-Sunnyval...|"Mozilla/5.0 (X11...|2018|   11|
|2018-11-15 00:41:...|     26| free|   null|     null|       583|San Jose-Sunnyval...|"Mozilla/5.0 (X11...|2018|   11|
|2018-11-15 00:45:...|     26| free|   null|     null|       583|San Jose-Sunnyval...|"Mozilla/5.0 (X11...|2018|   11|
|2018-11-15 03:44:...|     61| free|   null|     null|       597|Houston-The Woodl...|"Mozilla/5.0 (Mac...|2018|   11|
|2018-11-15 05:48:...|     80| paid|   null|     null|       602|Portland-South Po...|"Mozilla/5.0 (Mac...|2018|   11|
|2018-11-15 05:53:...|     80| paid|   null|    

In [37]:
# write songplays table to parquet files partitioned by year and month
songplays_table.write.parquet(os.path.join(output_data, 'songplays_table/'),\
                              mode='overwrite', partitionBy=['year', 'month'])