In [1]:
# load all libraries
import configparser
from datetime import datetime
import os
from pyspark.sql import SparkSession
from pyspark.sql.functions import udf, col
from pyspark.sql.functions import year, month, dayofmonth, hour, weekofyear, date_format
from pyspark.sql.types import *
from pyspark.sql.functions import monotonically_increasing_id

In [2]:
# create spark session
spark = SparkSession \
    .builder \
    .config("spark.jars.packages", "org.apache.hadoop:hadoop-aws:2.7.0") \
    .getOrCreate()

In [3]:
# print information about spark session
spark

## Need to create following tables

<strong>Fact Table</strong>

<strong>songplays</strong> - records in log data associated with song plays i.e. records with page NextSong
<li>songplay_id, start_time, user_id, level, song_id, artist_id, session_id, location, user_agent</li>
<p></p>

<strong>Dimension Tables</strong>
<p></p>
<strong>users</strong> - users in the app
<li>user_id, first_name, last_name, gender, level</li>
<strong>songs</strong> - songs in music database
<li>song_id, title, artist_id, year, duration</li>
<strong>artists</strong> - artists in music database
<li>artist_id, name, location, lattitude, longitude</li>
<strong>time</strong> - timestamps of records in songplays broken down into specific units
<li>start_time, hour, day, week, month, year, weekday</li>

In [4]:
# create schema and load song_data to data frame
schema =  StructType([
                        StructField("num_songs", ShortType(), True),
                        StructField("artist_latitude", StringType(), True),
                        StructField("artist_location", StringType(), True),
                        StructField("artist_longitude", StringType(), True),
                        StructField("artist_name", StringType(), True),
                        StructField("song_id", StringType(), False),
                        StructField("title", StringType(), True),
                        StructField("artist_id", StringType(), False),
                        StructField("year", ShortType(), True),
                        StructField("duration", DoubleType(), True)
                    ])

df_song = spark.read.schema(schema).json("data/song-data/song_data/A/*/*/*.json")

In [5]:
# show number or rows 
df_song.count()

71

In [6]:
# print out schema
df_song.printSchema()

root
 |-- num_songs: short (nullable = true)
 |-- artist_latitude: string (nullable = true)
 |-- artist_location: string (nullable = true)
 |-- artist_longitude: string (nullable = true)
 |-- artist_name: string (nullable = true)
 |-- song_id: string (nullable = true)
 |-- title: string (nullable = true)
 |-- artist_id: string (nullable = true)
 |-- year: short (nullable = true)
 |-- duration: double (nullable = true)



In [7]:
# check first row
df_song.take(1)

[Row(num_songs=1, artist_latitude=None, artist_location='', artist_longitude=None, artist_name='Montserrat Caballé;Placido Domingo;Vicente Sardinero;Judith Blegen;Sherrill Milnes;Georg Solti', song_id='SOBAYLL12A8C138AF9', title='Sono andati? Fingevo di dormire', artist_id='ARDR4AC1187FB371A1', year=0, duration=511.16363)]

In [8]:
# create data frame for songs_table and count number of rows 
songs_table = df_song.select("song_id", "title", "artist_id", "year", "duration")
songs_table.count()

71

In [9]:
# display songs_table data frame
songs_table.show()

+------------------+--------------------+------------------+----+---------+
|           song_id|               title|         artist_id|year| duration|
+------------------+--------------------+------------------+----+---------+
|SOBAYLL12A8C138AF9|Sono andati? Fing...|ARDR4AC1187FB371A1|   0|511.16363|
|SOOLYAZ12A6701F4A6|Laws Patrolling (...|AREBBGV1187FB523D2|   0|173.66159|
|SOBBUGU12A8C13E95D|Setting Fire to S...|ARMAC4T1187FB3FA4C|2004|207.77751|
|SOAOIBZ12AB01815BE|I Hold Your Hand ...|ARPBNLO1187FB3D52F|2000| 43.36281|
|SONYPOM12A8C13B2D7|I Think My Wife I...|ARDNS031187B9924F0|2005|186.48771|
|SONWXQJ12A8C134D94|The Ballad Of Sle...|ARNF6401187FB57032|1994|  305.162|
|SODREIN12A58A7F2E5|A Whiter Shade Of...|ARLTWXK1187FB5A3F8|   0|326.00771|
|SOWQTQZ12A58A7B63E|Streets On Fire (...|ARPFHN61187FB575F6|   0|279.97995|
|SODUJBS12A8C132150|Wessex Loses a Bride|ARI2JSK1187FB496EF|   0|111.62077|
|SOGXHEG12AB018653E|It Makes No Diffe...|AR0RCMP1187FB3F427|1992|133.32853|
|SOBZBAZ12A6

In [10]:
# save data frame in parquet format and partition it by year and artist_id
songs_table.write.mode('overwrite').partitionBy("year", "artist_id").parquet("data/parquet/songs_table")

In [11]:
# create data frame for artists_table and count number of rows 
artists_table = df_song.select("artist_id", "artist_name", "artist_location", "artist_latitude", "artist_longitude").distinct()
artists_table.count()

69

In [12]:
# display artists_table data frame
artists_table.show()

+------------------+--------------------+---------------+---------------+----------------+
|         artist_id|         artist_name|artist_location|artist_latitude|artist_longitude|
+------------------+--------------------+---------------+---------------+----------------+
|ARAJPHH1187FB5566A|     The Shangri-Las|     Queens, NY|        40.7038|       -73.83168|
|AR0IAWL1187B9A96D0|        Danilo Perez|         Panama|         8.4177|       -80.11278|
|ARWB3G61187FB49404|         Steve Morse| Hamilton, Ohio|           null|            null|
|ARHHO3O1187B989413|           Bob Azzam|               |           null|            null|
|ARAGB2O1187FB3A161|Pucho & His Latin...|               |           null|            null|
|AREBBGV1187FB523D2|Mike Jones (Featu...|    Houston, TX|           null|            null|
|ARGSAFR1269FB35070|          Blingtones|               |           null|            null|
|ARJIE2Y1187B994AB7|         Line Renaud|               |           null|            null|

In [13]:
# save data frame in parquet format and we don't need partition since each artist_id is unique
artists_table.write.mode('overwrite').parquet("data/parquet/artists_table")

In [14]:
# create schema and load data to data frame

schema =  StructType([
                        StructField("artist", StringType(), True),
                        StructField("auth", StringType(), True),
                        StructField("firstName", StringType(), True),
                        StructField("gender", StringType(), True),
                        StructField("itemInSession", ShortType(), True),
                        StructField("lastName", StringType(), True),
                        StructField("length", StringType(), True),
                        StructField("level", StringType(), True),
                        StructField("location", StringType(), True),
                        StructField("method", StringType(), True),
                        StructField("page", StringType(), True),
                        StructField("registration", StringType(), True),
                        StructField("sessionId", ShortType(), False),
                        StructField("song", StringType(), True),
                        StructField("status", StringType(), True),
                        StructField("ts", DoubleType(), True),
                        StructField("userAgent", StringType(), True),
                        StructField("userId", StringType(), False)
                    ])

df_log = spark.read.schema(schema).json("data/log-data/*.json")

In [15]:
# check first row
df_log.take(1)

[Row(artist='Harmonia', auth='Logged In', firstName='Ryan', gender='M', itemInSession=0, lastName='Smith', length='655.77751', level='free', location='San Jose-Sunnyvale-Santa Clara, CA', method='PUT', page='NextSong', registration='1.541016707796E12', sessionId=583, song='Sehr kosmisch', status='200', ts=1542241826796.0, userAgent='"Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Ubuntu Chromium/36.0.1985.125 Chrome/36.0.1985.125 Safari/537.36"', userId='26')]

In [16]:
# print number of rows 
df_log.count()

8056

In [17]:
# select only row with NextSong this will create a data frame with with active users
df_log = df_log.where(df_log.page == "NextSong")

In [18]:
# print number of rows after filtering only active users
df_log.count()

6820

In [19]:
# create udf to convert ts column into TimestampType format
get_timestamp = udf(lambda x: datetime.fromtimestamp((int(x)/1000.0)), TimestampType())
df_log = df_log.withColumn('timestamp', get_timestamp(df_log.ts))

In [20]:
# create start_time, hour, day, week, month, year, weekday columns from timestamp column
df_log = df_log.withColumn('start_time', date_format("timestamp", 'HH:mm:ss'))\
    .withColumn('hour', hour("timestamp"))\
    .withColumn('day', dayofmonth("timestamp"))\
    .withColumn('week', weekofyear("timestamp"))\
    .withColumn('month', month("timestamp"))\
    .withColumn('year', year("timestamp"))\
    .withColumn('weekday', date_format('timestamp', 'E'))

In [21]:
# display first row of the df_log data frame
df_log.take(1)

[Row(artist='Harmonia', auth='Logged In', firstName='Ryan', gender='M', itemInSession=0, lastName='Smith', length='655.77751', level='free', location='San Jose-Sunnyvale-Santa Clara, CA', method='PUT', page='NextSong', registration='1.541016707796E12', sessionId=583, song='Sehr kosmisch', status='200', ts=1542241826796.0, userAgent='"Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Ubuntu Chromium/36.0.1985.125 Chrome/36.0.1985.125 Safari/537.36"', userId='26', timestamp=datetime.datetime(2018, 11, 15, 0, 30, 26, 796000), start_time='00:30:26', hour=0, day=15, week=46, month=11, year=2018, weekday='Thu')]

In [43]:
# create users_table data frame and count number of rows 
users_table = df_log.select("start_time", "userId", "firstName", "lastName", "gender", "level").distinct()
users_table.count()

6808

In [44]:
# show users_table data frame as a table
users_table.show()

+----------+------+----------+--------+------+-----+
|start_time|userId| firstName|lastName|gender|level|
+----------+------+----------+--------+------+-----+
|  16:36:47|    51|      Maia|   Burke|     F| free|
|  20:16:04|    44|    Aleena|   Kirby|     F| paid|
|  10:42:53|    15|      Lily|    Koch|     F| paid|
|  11:57:37|    15|      Lily|    Koch|     F| paid|
|  13:57:44|    49|     Chloe|  Cuevas|     F| paid|
|  12:03:49|    29|Jacqueline|   Lynch|     F| paid|
|  16:27:47|    61|    Samuel|Gonzalez|     M| free|
|  07:52:27|    58|     Emily|  Benson|     F| paid|
|  14:24:16|    26|      Ryan|   Smith|     M| free|
|  17:07:54|    97|      Kate| Harrell|     F| paid|
|  23:49:48|    12|    Austin| Rosales|     M| free|
|  08:01:53|    25|    Jayden|  Graves|     M| paid|
|  21:10:36|    95|      Sara| Johnson|     F| paid|
|  21:18:37|    95|      Sara| Johnson|     F| paid|
|  23:22:55|    95|      Sara| Johnson|     F| paid|
|  19:54:24|     5|    Elijah|   Davis|     M|

In [47]:
# create a new view table to use SQL
users_table.createOrReplaceTempView("users_table")

In [48]:
# select unique users with the most recent level status
table = spark.sql("""
    SELECT userId, firstName, lastName, gender, level FROM (SELECT *, 
    ROW_NUMBER() OVER(PARTITION BY userId ORDER BY start_time DESC) AS num
    FROM users_table) AS tmp
    WHERE num = 1
""")

table.show()

+------+----------+--------+------+-----+
|userId| firstName|lastName|gender|level|
+------+----------+--------+------+-----+
|    51|      Maia|   Burke|     F| free|
|     7|    Adelyn|  Jordan|     F| free|
|    15|      Lily|    Koch|     F| paid|
|    54|     Kaleb|    Cook|     M| free|
|   101|    Jayden|     Fox|     M| free|
|    11| Christian|  Porter|     F| free|
|    29|Jacqueline|   Lynch|     F| paid|
|    69|  Anabelle| Simpson|     F| free|
|    42|    Harper| Barrett|     M| paid|
|    73|     Jacob|   Klein|     M| paid|
|    87|    Dustin|     Lee|     M| free|
|    64|    Hannah| Calhoun|     F| free|
|     3|     Isaac|  Valdez|     M| free|
|    30|     Avery| Watkins|     F| paid|
|    34|    Evelin|   Ayala|     F| free|
|    59|      Lily|  Cooper|     F| free|
|     8|    Kaylee| Summers|     F| free|
|    22|      Sean|  Wilson|     F| free|
|    28|  Brantley|    West|     M| free|
|    85|   Kinsley|   Young|     F| paid|
+------+----------+--------+------

In [49]:
# count rows
table.count()

96

In [50]:
table = spark.sql("""
    SELECT gender, level, COUNT(*) AS users
    FROM users_table
    GROUP BY gender, level
""")

table.show()

+------+-----+-----+
|gender|level|users|
+------+-----+-----+
|     M| free|  636|
|     F| free|  593|
|     F| paid| 4284|
|     M| paid| 1295|
+------+-----+-----+



In [51]:
# save data frame in parquet format and partition it by gender and level
users_table.write.mode('overwrite').partitionBy("gender", "level").parquet("data/parquet/users_table")

In [52]:
# select start_time, hour, day, week, month, year, weekday columns to create time table
time_table = df_log.select("start_time", "hour", "day", "week", "month", "year", "weekday").distinct()

In [53]:
# display time_table data frame
time_table.show(20)

+----------+----+---+----+-----+----+-------+
|start_time|hour|day|week|month|year|weekday|
+----------+----+---+----+-----+----+-------+
|  11:44:01|  11| 15|  46|   11|2018|    Thu|
|  14:09:23|  14| 15|  46|   11|2018|    Thu|
|  16:54:20|  16| 15|  46|   11|2018|    Thu|
|  19:37:26|  19| 15|  46|   11|2018|    Thu|
|  06:29:28|   6| 21|  47|   11|2018|    Wed|
|  17:41:13|  17| 21|  47|   11|2018|    Wed|
|  12:53:05|  12| 14|  46|   11|2018|    Wed|
|  16:26:21|  16| 14|  46|   11|2018|    Wed|
|  09:28:29|   9| 28|  48|   11|2018|    Wed|
|  09:40:04|   9| 28|  48|   11|2018|    Wed|
|  12:06:59|  12| 28|  48|   11|2018|    Wed|
|  19:06:42|  19| 28|  48|   11|2018|    Wed|
|  05:51:57|   5|  5|  45|   11|2018|    Mon|
|  09:14:07|   9|  5|  45|   11|2018|    Mon|
|  11:21:24|  11|  5|  45|   11|2018|    Mon|
|  15:09:31|  15|  5|  45|   11|2018|    Mon|
|  17:23:52|  17|  5|  45|   11|2018|    Mon|
|  13:59:17|  13| 13|  46|   11|2018|    Tue|
|  15:33:06|  15| 30|  48|   11|20

In [54]:
# create a new view table to use SQL
time_table.createOrReplaceTempView("time_table")

In [55]:
table = spark.sql("""
    SELECT hour, COUNT(*) AS sessions
    FROM time_table
    GROUP BY hour
    ORDER BY hour
""")

table.show(24)

+----+--------+
|hour|sessions|
+----+--------+
|   0|     155|
|   1|     154|
|   2|     117|
|   3|     109|
|   4|     135|
|   5|     162|
|   6|     183|
|   7|     179|
|   8|     207|
|   9|     270|
|  10|     312|
|  11|     336|
|  12|     308|
|  13|     324|
|  14|     430|
|  15|     477|
|  16|     541|
|  17|     493|
|  18|     497|
|  19|     366|
|  20|     360|
|  21|     280|
|  22|     217|
|  23|     201|
+----+--------+



In [56]:
# save data frame in parquet format and partition it by year and month
time_table.write.mode('overwrite').partitionBy("year", "month").parquet("data/parquet/time_table")

In [64]:
df_song_small = df_song.select("num_songs", "title", "artist_id", "duration", "artist_name", "song_id")

In [65]:
df_song_small.show()

+---------+--------------------+------------------+---------+--------------------+------------------+
|num_songs|               title|         artist_id| duration|         artist_name|           song_id|
+---------+--------------------+------------------+---------+--------------------+------------------+
|        1|Sono andati? Fing...|ARDR4AC1187FB371A1|511.16363|Montserrat Caball...|SOBAYLL12A8C138AF9|
|        1|Laws Patrolling (...|AREBBGV1187FB523D2|173.66159|Mike Jones (Featu...|SOOLYAZ12A6701F4A6|
|        1|Setting Fire to S...|ARMAC4T1187FB3FA4C|207.77751|The Dillinger Esc...|SOBBUGU12A8C13E95D|
|        1|I Hold Your Hand ...|ARPBNLO1187FB3D52F| 43.36281|            Tiny Tim|SOAOIBZ12AB01815BE|
|        1|I Think My Wife I...|ARDNS031187B9924F0|186.48771|          Tim Wilson|SONYPOM12A8C13B2D7|
|        1|The Ballad Of Sle...|ARNF6401187FB57032|  305.162|   Sophie B. Hawkins|SONWXQJ12A8C134D94|
|        1|A Whiter Shade Of...|ARLTWXK1187FB5A3F8|326.00771|         King Curtis|

In [66]:
# join df_join and df_song data frames and use it to create songplays_table
df_join = df_log.join(df_song_small, df_log.song == df_song_small.title)

In [67]:
# display first row
df_join.take(1)

[Row(artist='Elena', auth='Logged In', firstName='Lily', gender='F', itemInSession=5, lastName='Koch', length='269.58322', level='paid', location='Chicago-Naperville-Elgin, IL-IN-WI', method='PUT', page='NextSong', registration='1.541048010796E12', sessionId=818, song='Setanta matins', status='200', ts=1542837407796.0, userAgent='"Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Ubuntu Chromium/36.0.1985.125 Chrome/36.0.1985.125 Safari/537.36"', userId='15', timestamp=datetime.datetime(2018, 11, 21, 21, 56, 47, 796000), start_time='21:56:47', hour=21, day=21, week=47, month=11, year=2018, weekday='Wed', num_songs=1, title='Setanta matins', artist_id='AR5KOSW1187FB35FF4', duration=269.58322, artist_name='Elena', song_id='SOZCTXZ12AB0182364')]

In [68]:
# create songplays_table data frame
songplays_table = df_join.select("start_time",
                                 "userId",
                                 "level",
                                 "song_id",
                                 "artist_id",
                                 "sessionId",
                                 "location",
                                 "userAgent",
                                 "year",
                                 "month").distinct()

In [69]:
# display frist row
songplays_table.take(1)

[Row(start_time='05:06:03', userId='10', level='free', song_id='SOGDBUF12A8C140FAA', artist_id='AR558FS1187FB45658', sessionId=484, location='Washington-Arlington-Alexandria, DC-VA-MD-WV', userAgent='"Mozilla/5.0 (Macintosh; Intel Mac OS X 10_9_4) AppleWebKit/537.77.4 (KHTML, like Gecko) Version/7.0.5 Safari/537.77.4"', year=2018, month=11)]

In [70]:
# create songplay_id column with unique id
songplays_table = songplays_table.withColumn("songplay_id", monotonically_increasing_id())

In [71]:
# display songplays_table data frame as a table
songplays_table.show()

+----------+------+-----+------------------+------------------+---------+--------------------+--------------------+----+-----+-------------+
|start_time|userId|level|           song_id|         artist_id|sessionId|            location|           userAgent|year|month|  songplay_id|
+----------+------+-----+------------------+------------------+---------+--------------------+--------------------+----+-----+-------------+
|  05:06:03|    10| free|SOGDBUF12A8C140FAA|AR558FS1187FB45658|      484|Washington-Arling...|"Mozilla/5.0 (Mac...|2018|   11| 489626271744|
|  09:14:20|    24| paid|SOGDBUF12A8C140FAA|AR558FS1187FB45658|      672|Lake Havasu City-...|"Mozilla/5.0 (Win...|2018|   11|1374389534720|
|  21:56:47|    15| paid|SOZCTXZ12AB0182364|AR5KOSW1187FB35FF4|      818|Chicago-Napervill...|"Mozilla/5.0 (X11...|2018|   11|1417339207680|
|  22:35:59|    80| paid|SOGDBUF12A8C140FAA|AR558FS1187FB45658|      992|Portland-South Po...|"Mozilla/5.0 (Mac...|2018|   11|1709396983808|
+----------+-

In [72]:
# rearrange songplays_table data frame
songplays_table = songplays_table.select("songplay_id",
                                         "start_time",
                                         "userId",
                                         "level",
                                         "song_id",
                                         "artist_id",
                                         "sessionId",
                                         "location",
                                         "userAgent",
                                         "year",
                                         "month") 

In [73]:
# display first row of songplays_table data frame
songplays_table.take(1)

[Row(songplay_id=489626271744, start_time='05:06:03', userId='10', level='free', song_id='SOGDBUF12A8C140FAA', artist_id='AR558FS1187FB45658', sessionId=484, location='Washington-Arlington-Alexandria, DC-VA-MD-WV', userAgent='"Mozilla/5.0 (Macintosh; Intel Mac OS X 10_9_4) AppleWebKit/537.77.4 (KHTML, like Gecko) Version/7.0.5 Safari/537.77.4"', year=2018, month=11)]

In [74]:
# save songplays_table data frame in parquet format
songplays_table.write.mode('overwrite').partitionBy("year", "month").parquet("data/parquet/songplays_table/")

### Testing the pipleline with AWS S3 

#### to run this section you would need to restart kernel

In [1]:
# load all libraries
import configparser
from datetime import datetime
import os
from pyspark.sql import SparkSession
from pyspark.sql.functions import udf, col
from pyspark.sql.functions import year, month, dayofmonth, hour, weekofyear, date_format
from pyspark.sql.types import *
from pyspark.sql.functions import monotonically_increasing_id

In [2]:
config = configparser.ConfigParser()
config.read('dl.cfg')

['dl.cfg']

In [3]:
os.environ['AWS_ACCESS_KEY_ID']=config['AWS CREDS']['AWS_ACCESS_KEY_ID']
os.environ['AWS_SECRET_ACCESS_KEY']=config['AWS CREDS']['AWS_SECRET_ACCESS_KEY']

In [4]:
input_data = "s3a://udacity-dend/"
output_data = "s3a://udacity-data-engineering-stan/data"

In [12]:
# create spark session

spark = SparkSession \
    .builder \
    .config("spark.jars.packages", "org.apache.hadoop:hadoop-aws:2.7.0") \
    .config("spark.hadoop.fs.s3a.impl","org.apache.hadoop.fs.s3a.S3AFileSystem") \
    .config("spark.hadoop.fs.s3a.awsAccessKeyId", os.environ['AWS_ACCESS_KEY_ID']) \
    .config("spark.hadoop.fs.s3a.awsSecretAccessKey", os.environ['AWS_SECRET_ACCESS_KEY']) \
    .getOrCreate()

sc = spark.sparkContext
sc._jsc.hadoopConfiguration().set("mapreduce.fileoutputcommitter.algorithm.version", "2")
sc._jsc.hadoopConfiguration().set("spark.speculation","false")

In [13]:
spark

In [9]:
schema =  StructType([
                        StructField("num_songs", ShortType(), True),
                        StructField("artist_latitude", StringType(), True),
                        StructField("artist_location", StringType(), True),
                        StructField("artist_longitude", StringType(), True),
                        StructField("artist_name", StringType(), True),
                        StructField("song_id", StringType(), False),
                        StructField("title", StringType(), True),
                        StructField("artist_id", StringType(), False),
                        StructField("year", ShortType(), True),
                        StructField("duration", DoubleType(), True)
                    ])

df_song_data = spark.read.schema(schema).json("s3a://udacity-dend/song_data/A/A/*/*.json")

In [None]:
df_song_data.write.mode('overwrite').partitionBy("year", "artist_id").parquet(output_data + "/songs_data/")

In [None]:
songs_table = df_song.select("song_id", "title", "artist_id", "year", "duration")

In [None]:
songs_table.write.mode('overwrite').partitionBy("year", "artist_id").parquet(output_data + "/songs_table/")