In [1]:
import configparser
from datetime import datetime
import os
from pyspark.sql import SparkSession
from pyspark.sql.functions import udf, col, monotonically_increasing_id
from pyspark.sql.functions import year, month, dayofmonth, hour, weekofyear, date_format

In [2]:
config = configparser.ConfigParser()
config.read('dl.cfg')

['dl.cfg']

In [3]:
os.environ['AWS_ACCESS_KEY_ID']=config['AWS']['AWS_ACCESS_KEY_ID']
os.environ['AWS_SECRET_ACCESS_KEY']=config['AWS']['AWS_SECRET_ACCESS_KEY']
os.environ['AWS_REGION']=config['AWS']['AWS_REGION']

In [4]:
spark = SparkSession \
        .builder \
        .config("spark.jars.packages", "org.apache.hadoop:hadoop-aws:2.7.0") \
        .getOrCreate()

In [6]:
# get filepath to song data file
song_data = "data/song_data/*/*/*/*.json"

In [7]:
# read song data file
df = spark.read.json(song_data)

In [8]:
df.show()

+------------------+---------------+--------------------+----------------+--------------------+---------+---------+------------------+--------------------+----+
|         artist_id|artist_latitude|     artist_location|artist_longitude|         artist_name| duration|num_songs|           song_id|               title|year|
+------------------+---------------+--------------------+----------------+--------------------+---------+---------+------------------+--------------------+----+
|ARDR4AC1187FB371A1|           null|                    |            null|Montserrat Caball...|511.16363|        1|SOBAYLL12A8C138AF9|Sono andati? Fing...|   0|
|AREBBGV1187FB523D2|           null|         Houston, TX|            null|Mike Jones (Featu...|173.66159|        1|SOOLYAZ12A6701F4A6|Laws Patrolling (...|   0|
|ARMAC4T1187FB3FA4C|       40.82624|   Morris Plains, NJ|       -74.47995|The Dillinger Esc...|207.77751|        1|SOBBUGU12A8C13E95D|Setting Fire to S...|2004|
|ARPBNLO1187FB3D52F|       40.7145

In [9]:
# extract columns to create songs table
df.createOrReplaceTempView("songs")
songs_table = spark.sql("select distinct song_id, title, artist_id, year, duration from songs")

In [10]:
songs_table.show()

+------------------+--------------------+------------------+----+---------+
|           song_id|               title|         artist_id|year| duration|
+------------------+--------------------+------------------+----+---------+
|SOGNCJP12A58A80271|Do You Finally Ne...|ARB29H41187B98F0EF|1972|342.56934|
|SOOJPRH12A8C141995|   Loaded Like A Gun|ARBGXIG122988F409D|   0|173.19138|
|SOFCHDR12AB01866EF|         Living Hell|AREVWGE1187B9B890A|   0|282.43546|
|SOWTBJW12AC468AC6E|Broken-Down Merry...|ARQGYP71187FB44566|   0|151.84934|
|SOGOSOV12AF72A285E|   ¿Dónde va Chichi?|ARGUVEV1187B98BA17|1997|313.12934|
|SOTUKVB12AB0181477|   Blessed Assurance|AR7ZKHQ1187B98DD73|1993|  270.602|
|SOMVWWT12A58A7AE05|Knocked Out Of Th...|ARQ9BO41187FB5CF1F|   0|183.17016|
|SOBEBDG12A58A76D60|        Kassie Jones|ARI3BMM1187FB4255E|   0|220.78649|
|SOILPQQ12AB017E82A|Sohna Nee Sohna Data|AR1ZHYZ1187FB3C717|   0|599.24853|
|SOYMRWW12A6D4FAB14|The Moon And I (O...|ARKFYS91187B98E58F|   0| 267.7024|
|SOBCOSW12A8

In [16]:
# write songs table to parquet files partitioned by year and artist
songs_table.write.partitionBy("year", "artist_id").parquet(path = "s3a://udacity-store/songs.parquet", mode = "overwrite")

In [11]:
# extract columns to create artists table
df.createOrReplaceTempView("artists")
artists_table = spark.sql("""select distinct artist_id, 
                            artist_name as name, 
                            artist_location as location, 
                            artist_latitude as latitude, 
                            artist_longitude as longitude 
                            from artists
                          """)

In [12]:
artists_table.show()

+------------------+--------------------+--------------------+--------+----------+
|         artist_id|                name|            location|latitude| longitude|
+------------------+--------------------+--------------------+--------+----------+
|ARPBNLO1187FB3D52F|            Tiny Tim|        New York, NY|40.71455| -74.00712|
|ARBEBBY1187B9B43DB|           Tom Petty|     Gainesville, FL|    null|      null|
|AR0IAWL1187B9A96D0|        Danilo Perez|              Panama|  8.4177| -80.11278|
|ARMBR4Y1187B9990EB|        David Martin|     California - SF|37.77916|-122.42005|
|ARD0S291187B9B7BF5|             Rated R|                Ohio|    null|      null|
|AR0RCMP1187FB3F427|    Billie Jo Spears|        Beaumont, TX|30.08615| -94.10158|
|ARKRRTF1187B9984DA|    Sonora Santanera|                    |    null|      null|
|ARHHO3O1187B989413|           Bob Azzam|                    |    null|      null|
|ARJIE2Y1187B994AB7|         Line Renaud|                    |    null|      null|
|ARG

In [19]:
# write artists table to parquet files
artists_table.write.parquet(path = "s3a://udacity-store/artists.parquet", mode = "overwrite")

In [13]:
# get filepath to log data file
log_data = "data/log_data/*.json"

In [14]:
# read log data file
df = spark.read.json(log_data)

In [15]:
df.show()

+--------------------+----------+---------+------+-------------+--------+---------+-----+--------------------+------+--------+-----------------+---------+--------------------+------+-------------+--------------------+------+
|              artist|      auth|firstName|gender|itemInSession|lastName|   length|level|            location|method|    page|     registration|sessionId|                song|status|           ts|           userAgent|userId|
+--------------------+----------+---------+------+-------------+--------+---------+-----+--------------------+------+--------+-----------------+---------+--------------------+------+-------------+--------------------+------+
|            Harmonia| Logged In|     Ryan|     M|            0|   Smith|655.77751| free|San Jose-Sunnyval...|   PUT|NextSong|1.541016707796E12|      583|       Sehr kosmisch|   200|1542241826796|"Mozilla/5.0 (X11...|    26|
|         The Prodigy| Logged In|     Ryan|     M|            1|   Smith|260.07465| free|San Jose-Su

In [16]:
# filter by actions for song plays
df.createOrReplaceTempView("stg_events")
songplays_table = spark.sql("select * from stg_events where page='NextSong'")

In [17]:
songplays_table.show()

+--------------------+---------+---------+------+-------------+--------+---------+-----+--------------------+------+--------+-----------------+---------+--------------------+------+-------------+--------------------+------+
|              artist|     auth|firstName|gender|itemInSession|lastName|   length|level|            location|method|    page|     registration|sessionId|                song|status|           ts|           userAgent|userId|
+--------------------+---------+---------+------+-------------+--------+---------+-----+--------------------+------+--------+-----------------+---------+--------------------+------+-------------+--------------------+------+
|            Harmonia|Logged In|     Ryan|     M|            0|   Smith|655.77751| free|San Jose-Sunnyval...|   PUT|NextSong|1.541016707796E12|      583|       Sehr kosmisch|   200|1542241826796|"Mozilla/5.0 (X11...|    26|
|         The Prodigy|Logged In|     Ryan|     M|            1|   Smith|260.07465| free|San Jose-Sunnyva

In [18]:
# extract columns for users table
df.createOrReplaceTempView("users")
users_table = spark.sql("select distinct userId, firstName, lastName, gender, level from users")

In [19]:
users_table.show()

+------+----------+---------+------+-----+
|userId| firstName| lastName|gender|level|
+------+----------+---------+------+-----+
|    98|    Jordyn|   Powell|     F| free|
|    34|    Evelin|    Ayala|     F| free|
|    85|   Kinsley|    Young|     F| paid|
|    38|    Gianna|    Jones|     F| free|
|    85|   Kinsley|    Young|     F| free|
|    63|      Ayla|  Johnson|     F| free|
|    37|    Jordan|    Hicks|     F| free|
|     6|   Cecilia|    Owens|     F| free|
|    15|      Lily|     Koch|     F| paid|
|    27|    Carlos|   Carter|     M| free|
|    89|   Kynnedi|  Sanchez|     F| free|
|    21|   Preston|  Sanders|     M| free|
|    57| Katherine|      Gay|     F| free|
|    74|    Braden|   Parker|     M| free|
|    29|Jacqueline|    Lynch|     F| paid|
|    75|    Joseph|Gutierrez|     M| free|
|    61|    Samuel| Gonzalez|     M| free|
|    88|  Mohammad|Rodriguez|     M| free|
|    64|    Hannah|  Calhoun|     F| free|
|    15|      Lily|     Koch|     F| free|
+------+---

In [27]:
# write users table to parquet files
users_table.write.parquet(path = "s3a://udacity-store/users.parquet", mode = "overwrite")

In [28]:
# create timestamp column from original timestamp column
get_timestamp = udf(lambda x: str(int(int(x)/1000)))
df = df.withColumn('time_stamp', get_timestamp(df.ts))

In [29]:
# create datetime column from original timestamp column
get_timestamp = udf(lambda x: str(datetime.fromtimestamp(int(x) / 1000.0)))
df = df.withColumn("start_time", get_timestamp("ts"))

In [30]:
df.createOrReplaceTempView("time")
time_table = spark.sql("""select distinct time_stamp as start_time, 
                                        hour(start_time) as hour, 
                                        day(start_time) as day, 
                                        weekofyear(start_time) as week, 
                                        month(start_time) as month, 
                                        year(start_time) as year, 
                                        dayofweek(start_time) as weekday 
                           from time
                       """)

In [31]:
time_table.show()

+----------+----+---+----+-----+----+-------+
|start_time|hour|day|week|month|year|weekday|
+----------+----+---+----+-----+----+-------+
|1542279082|  10| 15|  46|   11|2018|      5|
|1542314651|  20| 15|  46|   11|2018|      5|
|1542764683|   1| 21|  47|   11|2018|      4|
|1542770516|   3| 21|  47|   11|2018|      4|
|1542824412|  18| 21|  47|   11|2018|      4|
|1542170773|   4| 14|  46|   11|2018|      4|
|1542212114|  16| 14|  46|   11|2018|      4|
|1542221682|  18| 14|  46|   11|2018|      4|
|1542235409|  22| 14|  46|   11|2018|      4|
|1543414833|  14| 28|  48|   11|2018|      4|
|1543449016|  23| 28|  48|   11|2018|      4|
|1542101811|   9| 13|  46|   11|2018|      3|
|1542144812|  21| 13|  46|   11|2018|      3|
|1542386893|  16| 16|  46|   11|2018|      6|
|1542399989|  20| 16|  46|   11|2018|      6|
|1542408893|  22| 16|  46|   11|2018|      6|
|1542680132|   2| 20|  47|   11|2018|      3|
|1542699157|   7| 20|  47|   11|2018|      3|
|1543069209|  14| 24|  47|   11|20

In [30]:
# write time table to parquet files partitioned by year and month
time_table.write.partitionBy("year", "month").parquet(path = "s3a://udacity-store/time.parquet", mode = "overwrite")

In [31]:
# read in song data to use for songplays table
song_df = spark.read.parquet("s3a://udacity-store/songs.parquet")
song_df.createOrReplaceTempView("stg_songs")

In [32]:
# extract columns from joined song and log datasets to create songplays table

songplays_table = spark.sql("""select
                    events.ts start_time,
                    events.userId user_id,
                    events.level,
                    songs.song_id,
                    songs.artist_id,
                    events.sessionId session_id,
                    events.location,
                    events.userAgent user_agent
                    from stg_events events inner join stg_songs songs
                    on events.song=songs.title
                    where events.page='NextSong'
"""
)

In [33]:
songplays_table = songplays_table.withColumn("songplay_id", monotonically_increasing_id())

In [34]:
songplays_table.show()

+-------------+-------+-----+------------------+------------------+----------+--------------------+--------------------+-----------+
|   start_time|user_id|level|           song_id|         artist_id|session_id|            location|          user_agent|songplay_id|
+-------------+-------+-----+------------------+------------------+----------+--------------------+--------------------+-----------+
|1542837407796|     15| paid|SOZCTXZ12AB0182364|AR5KOSW1187FB35FF4|       818|Chicago-Napervill...|"Mozilla/5.0 (X11...|          0|
|1542171963796|     10| free|SOGDBUF12A8C140FAA|AR558FS1187FB45658|       484|Washington-Arling...|"Mozilla/5.0 (Mac...|          1|
|1542618860796|     24| paid|SOGDBUF12A8C140FAA|AR558FS1187FB45658|       672|Lake Havasu City-...|"Mozilla/5.0 (Win...|          2|
|1543358159796|     80| paid|SOGDBUF12A8C140FAA|AR558FS1187FB45658|       992|Portland-South Po...|"Mozilla/5.0 (Mac...|          3|
+-------------+-------+-----+------------------+------------------+--

In [35]:
songplays_joined_time = songplays_table.join(time_table, (songplays_table.start_time == time_table.start_time), how="inner") \
.select("songplay_id", songplays_table.start_time, "user_id", "level", "song_id", "artist_id", "session_id", "location", "user_agent", "year", "month")

In [36]:
songplays_joined_time.show()

+-----------+-------------+-------+-----+------------------+------------------+----------+--------------------+--------------------+----+-----+
|songplay_id|   start_time|user_id|level|           song_id|         artist_id|session_id|            location|          user_agent|year|month|
+-----------+-------------+-------+-----+------------------+------------------+----------+--------------------+--------------------+----+-----+
|          0|1542837407796|     15| paid|SOZCTXZ12AB0182364|AR5KOSW1187FB35FF4|       818|Chicago-Napervill...|"Mozilla/5.0 (X11...|2018|   11|
|          1|1542171963796|     10| free|SOGDBUF12A8C140FAA|AR558FS1187FB45658|       484|Washington-Arling...|"Mozilla/5.0 (Mac...|2018|   11|
|          2|1542618860796|     24| paid|SOGDBUF12A8C140FAA|AR558FS1187FB45658|       672|Lake Havasu City-...|"Mozilla/5.0 (Win...|2018|   11|
|          3|1543358159796|     80| paid|SOGDBUF12A8C140FAA|AR558FS1187FB45658|       992|Portland-South Po...|"Mozilla/5.0 (Mac...|2018

In [37]:
# write songplays table to parquet files partitioned by year and month
songplays_joined_time.write.partitionBy("year", "month").parquet(path = "s3a://udacity-store/songplays.parquet", mode = "overwrite")