In [2]:
import configparser
from datetime import datetime
import os
from pyspark.sql import SparkSession
from pyspark.sql.functions import udf, col
from pyspark.sql.functions import year, month, dayofmonth, hour, weekofyear, date_format


config = configparser.ConfigParser()
config.read("dl.cfg")
os.environ["AWS_ACCESS_KEY_ID"] = config["AWS"]["AWS_ACCESS_KEY_ID"]
os.environ["AWS_SECRET_ACCESS_KEY"] = config["AWS"]["AWS_SECRET_ACCESS_KEY"]


spark = SparkSession.builder.config(
    "spark.jars.packages", "org.apache.hadoop:hadoop-aws:2.7.0"
).getOrCreate()

In [3]:
song_data_path = "s3a://udacity-dend/song_data/A/A/A/"
log_data_path = "s3a://udacity-dend/log_data/2018/11/2018-11-01-events.json"
# output_path = "s3a://spark-data-udacity/sparkify_big_data/"
output_path = "./sparkify_big_data/"

In [3]:
df = spark.read.json(song_data_path)
df.printSchema()

root
 |-- artist_id: string (nullable = true)
 |-- artist_latitude: double (nullable = true)
 |-- artist_location: string (nullable = true)
 |-- artist_longitude: double (nullable = true)
 |-- artist_name: string (nullable = true)
 |-- duration: double (nullable = true)
 |-- num_songs: long (nullable = true)
 |-- song_id: string (nullable = true)
 |-- title: string (nullable = true)
 |-- year: long (nullable = true)



In [4]:
songs_table = df.select(["song_id", "title", "artist_id", "year", "duration"]).dropDuplicates()
songs_table.show(5)

+------------------+--------------------+------------------+----+---------+
|           song_id|               title|         artist_id|year| duration|
+------------------+--------------------+------------------+----+---------+
|SODZYPO12A8C13A91E|Burn My Body (Alb...|AR1C2IX1187B99BF74|   0|177.99791|
|SOIGHOD12A8C13B5A1|        Indian Angel|ARY589G1187B9A9F4E|2004|171.57179|
|SOOVHYF12A8C134892|     I'll Be Waiting|ARCLYBR1187FB53913|1989|304.56118|
|SOAPERH12A58A787DC|The One And Only ...|ARZ5H0P1187B98A1DD|   0|230.42567|
|SOHKNRJ12A6701D1F8|        Drop of Rain|AR10USD1187B99F3F1|   0|189.57016|
+------------------+--------------------+------------------+----+---------+
only showing top 5 rows



In [5]:
songs_table.write.mode("overwrite").parquet(f"{output_path}songs", partitionBy=["year", "artist_id"])

In [10]:
# extract columns to create artist table
artists_table = df.select(["artist_id", "artist_name", "artist_location", "artist_latitude", "artist_longitude"]).dropDuplicates()

In [11]:
artists_table.show(5)

+------------------+-------------+---------------+---------------+----------------+
|         artist_id|  artist_name|artist_location|artist_latitude|artist_longitude|
+------------------+-------------+---------------+---------------+----------------+
|ARC1IHZ1187FB4E920| Jamie Cullum|               |           null|            null|
|ARZKCQM1257509D107|   Dataphiles|               |           null|            null|
|AREWD471187FB49873|     Son Kite|               |           null|            null|
|ARGE7G11187FB37E05| Cyndi Lauper|   Brooklyn, NY|           null|            null|
|ARSVTNL1187B992A91|Jonathan King|London, England|       51.50632|        -0.12714|
+------------------+-------------+---------------+---------------+----------------+
only showing top 5 rows



In [12]:
artists_table = artists_table.toDF("artist_id", "name", "location", "latitude", "longitude")
artists_table.printSchema()

root
 |-- artist_id: string (nullable = true)
 |-- name: string (nullable = true)
 |-- location: string (nullable = true)
 |-- latitude: double (nullable = true)
 |-- longitude: double (nullable = true)



root
 |-- artist: string (nullable = true)
 |-- auth: string (nullable = true)
 |-- firstName: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- itemInSession: long (nullable = true)
 |-- lastName: string (nullable = true)
 |-- length: double (nullable = true)
 |-- level: string (nullable = true)
 |-- location: string (nullable = true)
 |-- method: string (nullable = true)
 |-- page: string (nullable = true)
 |-- registration: double (nullable = true)
 |-- sessionId: long (nullable = true)
 |-- song: string (nullable = true)
 |-- status: long (nullable = true)
 |-- ts: long (nullable = true)
 |-- userAgent: string (nullable = true)
 |-- userId: string (nullable = true)



In [15]:
df.count()

15

11

In [6]:
input_data = "s3a://udacity-dend/"
log_data = os.path.join(input_data, "log_data/2018/11/2018-11-01-events.json")

In [17]:
# get filepath to log data file
df = spark.read.json(log_data)
df.printSchema()

root
 |-- artist: string (nullable = true)
 |-- auth: string (nullable = true)
 |-- firstName: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- itemInSession: long (nullable = true)
 |-- lastName: string (nullable = true)
 |-- length: double (nullable = true)
 |-- level: string (nullable = true)
 |-- location: string (nullable = true)
 |-- method: string (nullable = true)
 |-- page: string (nullable = true)
 |-- registration: double (nullable = true)
 |-- sessionId: long (nullable = true)
 |-- song: string (nullable = true)
 |-- status: long (nullable = true)
 |-- ts: long (nullable = true)
 |-- userAgent: string (nullable = true)
 |-- userId: string (nullable = true)



In [18]:
# filter by actions for song plays
df = df.filter(df.page=="NextSong")
df.count()

11

In [7]:
# extract columns for users table    
users_table = df.select(["userId", "firstName", "lastName", "gender", "level"])
users_table.printSchema()

root
 |-- userId: string (nullable = true)
 |-- firstName: string (nullable = true)
 |-- lastName: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- level: string (nullable = true)



In [8]:
users_table = users_table.toDF("user_id", "first_name", "last_name", "gender", "level")
users_table.printSchema()

root
 |-- user_id: string (nullable = true)
 |-- first_name: string (nullable = true)
 |-- last_name: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- level: string (nullable = true)



In [9]:
output_data = "./sparkify_bigdata/"

In [28]:
users_table.write.mode("overwrite").parquet(os.path.join(output_data, "users"))

In [39]:
import datetime

In [60]:
# filter by actions for song plays
from pyspark.sql.types import TimestampType
# create datetime column from original timestamp column
from pyspark.sql.functions import to_date, to_timestamp
df = spark.read.json(log_data)
df = df.filter(df.page=="NextSong").dropDuplicates()
df.count()
df.head()

Row(artist='Infected Mushroom', auth='Logged In', firstName='Kaylee', gender='F', itemInSession=6, lastName='Summers', length=440.2673, level='free', location='Phoenix-Mesa-Scottsdale, AZ', method='PUT', page='NextSong', registration=1540344794796.0, sessionId=139, song='Becoming Insane', status=200, ts=1541107053796, userAgent='"Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/35.0.1916.153 Safari/537.36"', userId='8')

In [61]:
# create timestamp column from original timestamp column
from pyspark.sql import types as T
get_timestamp = udf(lambda x: datetime.fromtimestamp( (x/1000.0) ), TimestampType()) 
df = df.withColumn("start_time", get_timestamp(df.ts))
df.head()

+-----------------+---------+---------+------+-------------+--------+--------+-----+--------------------+------+--------+-----------------+---------+---------------+------+-------------+--------------------+------+--------------------+
|           artist|     auth|firstName|gender|itemInSession|lastName|  length|level|            location|method|    page|     registration|sessionId|           song|status|           ts|           userAgent|userId|          start_time|
+-----------------+---------+---------+------+-------------+--------+--------+-----+--------------------+------+--------+-----------------+---------+---------------+------+-------------+--------------------+------+--------------------+
|Infected Mushroom|Logged In|   Kaylee|     F|            6| Summers|440.2673| free|Phoenix-Mesa-Scot...|   PUT|NextSong|1.540344794796E12|      139|Becoming Insane|   200|1541107053796|"Mozilla/5.0 (Win...|     8|2018-11-01 21:17:...|
+-----------------+---------+---------+------+----------

In [62]:
# get_timestamp = udf(lambda x: int((int(x)/1000))) # Conversion here
# df = df.withColumn('timestamp', get_timestamp(df.ts))
# df.select("timestamp").show(5)

In [63]:
# df = df.withColumn("tst",to_timestamp(col("ts")/1000)) \
#   .withColumn("date",to_date(col("tst")))

In [64]:
# create datetime column from original timestamp column
df = df.withColumn("date",to_date(col("start_time")))

In [65]:
df.select("date").show(5)

+----------+
|      date|
+----------+
|2018-11-01|
|2018-11-01|
|2018-11-01|
|2018-11-01|
|2018-11-01|
+----------+
only showing top 5 rows



In [67]:
# 
from pyspark.sql.functions import dayofweek
timetable = df.select("start_time",
                      hour("start_time").alias("hour"),
                      dayofmonth("date").alias("day"),
                      weekofyear("date").alias("week"),
                      month("date").alias("month"),
                      year("date").alias("year"),
                      dayofweek("date").alias("weekday"),
                     )
                      
timetable.show(10)

+--------------------+----+---+----+-----+----+-------+
|          start_time|hour|day|week|month|year|weekday|
+--------------------+----+---+----+-----+----+-------+
|2018-11-01 21:17:...|  21|  1|  44|   11|2018|      5|
|2018-11-01 21:28:...|  21|  1|  44|   11|2018|      5|
|2018-11-01 22:23:...|  22|  1|  44|   11|2018|      5|
|2018-11-01 21:42:...|  21|  1|  44|   11|2018|      5|
|2018-11-01 21:11:...|  21|  1|  44|   11|2018|      5|
|2018-11-01 21:01:...|  21|  1|  44|   11|2018|      5|
|2018-11-01 21:08:...|  21|  1|  44|   11|2018|      5|
|2018-11-01 21:55:...|  21|  1|  44|   11|2018|      5|
|2018-11-01 21:24:...|  21|  1|  44|   11|2018|      5|
|2018-11-01 21:52:...|  21|  1|  44|   11|2018|      5|
+--------------------+----+---+----+-----+----+-------+
only showing top 10 rows



In [68]:
# read in song data to use for songplays table
# read in song data to use for songplays table
song_data = os.path.join(input_data, "song_data/A/A/A/")

# read song data file
song_df = spark.read.json(song_data)

In [69]:
song_df.printSchema()

root
 |-- artist_id: string (nullable = true)
 |-- artist_latitude: double (nullable = true)
 |-- artist_location: string (nullable = true)
 |-- artist_longitude: double (nullable = true)
 |-- artist_name: string (nullable = true)
 |-- duration: double (nullable = true)
 |-- num_songs: long (nullable = true)
 |-- song_id: string (nullable = true)
 |-- title: string (nullable = true)
 |-- year: long (nullable = true)



In [71]:
df.printSchema()

root
 |-- artist: string (nullable = true)
 |-- auth: string (nullable = true)
 |-- firstName: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- itemInSession: long (nullable = true)
 |-- lastName: string (nullable = true)
 |-- length: double (nullable = true)
 |-- level: string (nullable = true)
 |-- location: string (nullable = true)
 |-- method: string (nullable = true)
 |-- page: string (nullable = true)
 |-- registration: double (nullable = true)
 |-- sessionId: long (nullable = true)
 |-- song: string (nullable = true)
 |-- status: long (nullable = true)
 |-- ts: long (nullable = true)
 |-- userAgent: string (nullable = true)
 |-- userId: string (nullable = true)
 |-- start_time: timestamp (nullable = true)
 |-- date: date (nullable = true)



In [93]:
songplays_table = df.join(song_df, (song_df.title == df.song)).select(
    df.start_time,
    col("userId").alias("user_id"),
    song_df.song_id,
    song_df.artist_id,
    df.level,
    col("sessionId").alias("session_id"),
    df.location,
    col("userAgent").alias("user_agent"),
    year('date').alias('year'),
    month('date').alias('month')
)

In [94]:
songplays_table.show(1)

+----------+-------+-------+---------+-----+----------+--------+----------+----+-----+
|start_time|user_id|song_id|artist_id|level|session_id|location|user_agent|year|month|
+----------+-------+-------+---------+-----+----------+--------+----------+----+-----+
+----------+-------+-------+---------+-----+----------+--------+----------+----+-----+



In [81]:
song_df.select("title").sort("title").show(3)

+--------------------+
|               title|
+--------------------+
|A Poor Recipe For...|
|Burn My Body (Alb...|
|         Double Wide|
+--------------------+
only showing top 3 rows



In [79]:
df.select("song").sort("song").show(3)

+---------------+
|           song|
+---------------+
|Becoming Insane|
|Congratulations|
|      Eriatarka|
+---------------+
only showing top 3 rows

