In [1]:
import configparser
from datetime import datetime
import os
from pyspark.sql import SparkSession
import pyspark.sql.functions as F
from pyspark.sql.functions import udf, col
from pyspark.sql.functions import year, month, dayofmonth, hour, weekofyear, date_format
from pyspark.sql.types import StructType as R, StructField as Fld, DoubleType as Dbl, StringType as Str, IntegerType as Int, DateType as Date

In [2]:
config = configparser.ConfigParser()

#Normally this file should be in ~/.aws/credentials
config.read_file(open('dl.cfg'))

os.environ["AWS_ACCESS_KEY_ID"]= config['AWS']['AWS_ACCESS_KEY_ID']
os.environ["AWS_SECRET_ACCESS_KEY"]= config['AWS']['AWS_SECRET_ACCESS_KEY']

In [3]:
spark = SparkSession.builder\
                     .config("spark.jars.packages","org.apache.hadoop:hadoop-aws:2.7.0")\
                     .getOrCreate()

# Song Data

In [4]:
songSchema = R([
            Fld("num_songs",Int()),
            Fld("artist_id",Str()),
            Fld("artist_latitude",Dbl()),
            Fld("artist_longitude",Dbl()),
            Fld("artist_location",Str()),
            Fld("artist_name",Str()),  
            Fld("title",Str()),  
            Fld("duration",Dbl()),  
            Fld("year",Int()),  
            ])

In [5]:
df = spark.read.json("s3a://udacity-dend/song_data/A/A/*/*.json", schema=songSchema)

In [6]:
df.printSchema()
df.show(5)
df.count()

root
 |-- num_songs: integer (nullable = true)
 |-- artist_id: string (nullable = true)
 |-- artist_latitude: double (nullable = true)
 |-- artist_longitude: double (nullable = true)
 |-- artist_location: string (nullable = true)
 |-- artist_name: string (nullable = true)
 |-- title: string (nullable = true)
 |-- duration: double (nullable = true)
 |-- year: integer (nullable = true)

+---------+------------------+---------------+----------------+--------------------+--------------------+--------------------+----------+----+
|num_songs|         artist_id|artist_latitude|artist_longitude|     artist_location|         artist_name|               title|  duration|year|
+---------+------------------+---------------+----------------+--------------------+--------------------+--------------------+----------+----+
|        1|ARSUVLW12454A4C8B8|       35.83073|       -85.97874|           Tennessee|Royal Philharmoni...|Faust: Ballet Mus...|  94.56281|   0|
|        1|ARXQC081187FB4AD42|       54.

604

In [7]:
song_field = ["title", "duration", "year", "artist_id"]
songs_table = df.select(song_field).dropDuplicates().withColumn("song_id", F.monotonically_increasing_id()).filter(~col("year").isin([0]) & col("year").isNotNull() & col("artist_id").isNotNull())

In [8]:
songs_table.printSchema()
songs_table.show(5)
songs_table.count()

root
 |-- title: string (nullable = true)
 |-- duration: double (nullable = true)
 |-- year: integer (nullable = true)
 |-- artist_id: string (nullable = true)
 |-- song_id: long (nullable = false)

+--------------------+---------+----+------------------+-----------+
|               title| duration|year|         artist_id|    song_id|
+--------------------+---------+----+------------------+-----------+
|Always And Never ...|  229.642|2005|AR10USD1187B99F3F1| 8589934592|
|    Commercial Reign|283.76771|1990|AR9AM2N1187B9AD2F1|17179869184|
|Terapia De Amor I...|340.29669|1988|ARBJSO81187B9BA09B|17179869187|
|Walking With The ...|152.16281|1966|ARE5F2F1187B9AB7E9|25769803776|
|               Pills| 256.1824|2000|ARW63XP1187FB5AB99|34359738369|
+--------------------+---------+----+------------------+-----------+
only showing top 5 rows



406

In [9]:
artist_field = ["artist_id", "artist_name", "artist_location", "artist_latitude", "artist_longitude"]
artists_table = df.select(artist_field).dropDuplicates().dropna(subset=["artist_id","artist_name"])

In [10]:
artists_table.printSchema()
artists_table.show(5)
artists_table.count()

root
 |-- artist_id: string (nullable = true)
 |-- artist_name: string (nullable = true)
 |-- artist_location: string (nullable = true)
 |-- artist_latitude: double (nullable = true)
 |-- artist_longitude: double (nullable = true)

+------------------+-----------------+--------------------+---------------+----------------+
|         artist_id|      artist_name|     artist_location|artist_latitude|artist_longitude|
+------------------+-----------------+--------------------+---------------+----------------+
|AR1S3NH1187B98C2BC|        Anthony B|Clarks Town, Jamaica|           null|            null|
|ARPIKA31187FB4C233|       The Action|            New York|       40.71455|       -74.00712|
|ARYL56G11C8A41634E|    Mick Flannery|                    |           null|            null|
|AR1XL241187FB3F4AB|Nortec Collective|                    |           null|            null|
|ARMI4NV1187B99D55D|          Man Man|    Philadelphia, PA|       39.95227|       -75.16237|
+------------------+----

591

In [39]:
songs_table.write.partitionBy("year", "artist_id").parquet("s3a://sparkifytest/songs/", mode="overwrite")

In [40]:
artists_table.write.parquet("s3a://sparkifytest/artists/", mode="overwrite")

# Log Data

In [11]:
df = spark.read.json("s3a://udacity-dend/log_data/2018/11/*.json")

In [12]:
df.printSchema()
df.show(5)
df.count()

root
 |-- artist: string (nullable = true)
 |-- auth: string (nullable = true)
 |-- firstName: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- itemInSession: long (nullable = true)
 |-- lastName: string (nullable = true)
 |-- length: double (nullable = true)
 |-- level: string (nullable = true)
 |-- location: string (nullable = true)
 |-- method: string (nullable = true)
 |-- page: string (nullable = true)
 |-- registration: double (nullable = true)
 |-- sessionId: long (nullable = true)
 |-- song: string (nullable = true)
 |-- status: long (nullable = true)
 |-- ts: long (nullable = true)
 |-- userAgent: string (nullable = true)
 |-- userId: string (nullable = true)

+-----------+---------+---------+------+-------------+--------+---------+-----+--------------------+------+--------+-----------------+---------+---------------+------+-------------+--------------------+------+
|     artist|     auth|firstName|gender|itemInSession|lastName|   length|level|            lo

8056

In [13]:
df = df.filter(df.page == 'NextSong')

In [14]:
df.printSchema()
df.show(5)
df.count()

root
 |-- artist: string (nullable = true)
 |-- auth: string (nullable = true)
 |-- firstName: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- itemInSession: long (nullable = true)
 |-- lastName: string (nullable = true)
 |-- length: double (nullable = true)
 |-- level: string (nullable = true)
 |-- location: string (nullable = true)
 |-- method: string (nullable = true)
 |-- page: string (nullable = true)
 |-- registration: double (nullable = true)
 |-- sessionId: long (nullable = true)
 |-- song: string (nullable = true)
 |-- status: long (nullable = true)
 |-- ts: long (nullable = true)
 |-- userAgent: string (nullable = true)
 |-- userId: string (nullable = true)

+-----------+---------+---------+------+-------------+--------+---------+-----+--------------------+------+--------+-----------------+---------+--------------------+------+-------------+--------------------+------+
|     artist|     auth|firstName|gender|itemInSession|lastName|   length|level|         

6820

In [15]:
user_field = [" userId as user_id", "firstName as first_name", "lastName as last_name", "gender", "level"]
users_table = df.selectExpr(user_field).dropDuplicates().dropna(how = "any")

In [16]:
users_table.printSchema()
users_table.show(5)
users_table.count()

root
 |-- user_id: string (nullable = true)
 |-- first_name: string (nullable = true)
 |-- last_name: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- level: string (nullable = true)

+-------+----------+---------+------+-----+
|user_id|first_name|last_name|gender|level|
+-------+----------+---------+------+-----+
|     26|      Ryan|    Smith|     M| free|
|      7|    Adelyn|   Jordan|     F| free|
|     71|    Ayleen|     Wise|     F| free|
|     81|    Sienna|    Colon|     F| free|
|     87|    Dustin|      Lee|     M| free|
+-------+----------+---------+------+-----+
only showing top 5 rows



104

In [None]:
users_table.write.parquet("s3a://sparkifytest/users/", mode="overwrite")

In [17]:
df = df.withColumn(
    "start_time",
    F.to_timestamp(F.from_unixtime((col("ts") / 1000) , 'yyyy-MM-dd HH:mm:ss.SSS')).cast("Timestamp")
)

In [18]:
df.printSchema()
df.show(5)
df.count()

root
 |-- artist: string (nullable = true)
 |-- auth: string (nullable = true)
 |-- firstName: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- itemInSession: long (nullable = true)
 |-- lastName: string (nullable = true)
 |-- length: double (nullable = true)
 |-- level: string (nullable = true)
 |-- location: string (nullable = true)
 |-- method: string (nullable = true)
 |-- page: string (nullable = true)
 |-- registration: double (nullable = true)
 |-- sessionId: long (nullable = true)
 |-- song: string (nullable = true)
 |-- status: long (nullable = true)
 |-- ts: long (nullable = true)
 |-- userAgent: string (nullable = true)
 |-- userId: string (nullable = true)
 |-- start_time: timestamp (nullable = true)

+-----------+---------+---------+------+-------------+--------+---------+-----+--------------------+------+--------+-----------------+---------+--------------------+------+-------------+--------------------+------+-------------------+
|     artist|     auth|

6820

In [19]:
df = df.withColumn("test_time", F.to_timestamp(col("ts") / 1000))

In [20]:
df.printSchema()
df.show(5)
df.count()

root
 |-- artist: string (nullable = true)
 |-- auth: string (nullable = true)
 |-- firstName: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- itemInSession: long (nullable = true)
 |-- lastName: string (nullable = true)
 |-- length: double (nullable = true)
 |-- level: string (nullable = true)
 |-- location: string (nullable = true)
 |-- method: string (nullable = true)
 |-- page: string (nullable = true)
 |-- registration: double (nullable = true)
 |-- sessionId: long (nullable = true)
 |-- song: string (nullable = true)
 |-- status: long (nullable = true)
 |-- ts: long (nullable = true)
 |-- userAgent: string (nullable = true)
 |-- userId: string (nullable = true)
 |-- start_time: timestamp (nullable = true)
 |-- test_time: timestamp (nullable = true)

+-----------+---------+---------+------+-------------+--------+---------+-----+--------------------+------+--------+-----------------+---------+--------------------+------+-------------+--------------------+------+

6820

In [21]:
time_table = df.select("start_time").dropDuplicates() \
.withColumn("hour", hour("start_time")).withColumn("day", dayofmonth("start_time")) \
.withColumn("week", weekofyear("start_time")).withColumn("month", month("start_time")) \
.withColumn("year", year("start_time")).withColumn("weekday", date_format("start_time", "E")).dropna(how="any")

In [22]:
time_table.printSchema()
time_table.show(5)
time_table.count()

root
 |-- start_time: timestamp (nullable = true)
 |-- hour: integer (nullable = true)
 |-- day: integer (nullable = true)
 |-- week: integer (nullable = true)
 |-- month: integer (nullable = true)
 |-- year: integer (nullable = true)
 |-- weekday: string (nullable = true)

+-------------------+----+---+----+-----+----+-------+
|         start_time|hour|day|week|month|year|weekday|
+-------------------+----+---+----+-----+----+-------+
|2018-11-21 10:52:12|  10| 21|  47|   11|2018|    Wed|
|2018-11-21 19:46:29|  19| 21|  47|   11|2018|    Wed|
|2018-11-14 04:37:40|   4| 14|  46|   11|2018|    Wed|
|2018-11-14 12:14:41|  12| 14|  46|   11|2018|    Wed|
|2018-11-14 16:19:02|  16| 14|  46|   11|2018|    Wed|
+-------------------+----+---+----+-----+----+-------+
only showing top 5 rows



6813

In [None]:
time_table.write.partitionBy("year", "month").parquet("s3a://sparkifytest/time/", mode="overwrite")

In [23]:
song_df = spark.read.parquet("s3a://sparkifytest/songs/*/*/*")
artist_df = spark.read.parquet("s3a://sparkifytest/artists/*")

In [24]:
Join_song = df.join(song_df, ((song_df.title == df.song) & (song_df.duration == df.length)))
artists_songs_logs = Join_song.join(artist_df, (Join_song.artist == artist_df.artist_name))
songplays = artists_songs_logs.join(time_table, (artists_songs_logs.start_time == time_table.start_time), 'left').drop(artists_songs_logs.start_time)

In [25]:
songplays.printSchema()
songplays.show(5)
songplays.count()

root
 |-- artist: string (nullable = true)
 |-- auth: string (nullable = true)
 |-- firstName: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- itemInSession: long (nullable = true)
 |-- lastName: string (nullable = true)
 |-- length: double (nullable = true)
 |-- level: string (nullable = true)
 |-- location: string (nullable = true)
 |-- method: string (nullable = true)
 |-- page: string (nullable = true)
 |-- registration: double (nullable = true)
 |-- sessionId: long (nullable = true)
 |-- song: string (nullable = true)
 |-- status: long (nullable = true)
 |-- ts: long (nullable = true)
 |-- userAgent: string (nullable = true)
 |-- userId: string (nullable = true)
 |-- test_time: timestamp (nullable = true)
 |-- title: string (nullable = true)
 |-- duration: double (nullable = true)
 |-- song_id: long (nullable = true)
 |-- artist_id: string (nullable = true)
 |-- artist_name: string (nullable = true)
 |-- artist_location: string (nullable = true)
 |-- artist_lat

0

In [26]:
songplays_field = ["start_time", "userId as user_id", "level", "song_id", "artist_id", "sessionid as session_id", "artist_location as location", "userAgent as user_agent", "year", "month"]

In [27]:
songplays_table = songplays.selectExpr(songplays_field).dropDuplicates().dropna(subset=["user_id", "artist_id", \
"song_id"]).withColumn("songplay_id", F.monotonically_increasing_id())

In [28]:
songplays_table.printSchema()
songplays_table.show(5)
songplays_table.count()

root
 |-- start_time: timestamp (nullable = true)
 |-- user_id: string (nullable = true)
 |-- level: string (nullable = true)
 |-- song_id: long (nullable = true)
 |-- artist_id: string (nullable = true)
 |-- session_id: long (nullable = true)
 |-- location: string (nullable = true)
 |-- user_agent: string (nullable = true)
 |-- year: integer (nullable = true)
 |-- month: integer (nullable = true)
 |-- songplay_id: long (nullable = false)

+----------+-------+-----+-------+---------+----------+--------+----------+----+-----+-----------+
|start_time|user_id|level|song_id|artist_id|session_id|location|user_agent|year|month|songplay_id|
+----------+-------+-----+-------+---------+----------+--------+----------+----+-----+-----------+
+----------+-------+-----+-------+---------+----------+--------+----------+----+-----+-----------+



0

In [None]:
songplays_table.write.partitionBy("year", "month").parquet(output_date + "songplays/", mode="overwrite")
 

In [None]:
songplays_table.createOrReplaceTempView("songplays")

spark.sql("""
    SELECT month, count(song_id) as song_num
    FROM songplays
    GROUP by month
    order by song_num desc
""").show()