Here, we define the input S3 bucket to download the data from, as well as the output path to HDFS.

We are outputting to HDFS here so that we can use s3 dist cp, since uploading directly into S3 from the spark.write api was taking a very long time, as it was copying and renaming thousands of objects.

In [1]:
input_data = "s3a://udacity-dend/"
output_path = "hdfs:///user/sparkify_data/"

VBox()

Starting Spark application


ID,YARN Application ID,Kind,State,Spark UI,Driver log,Current session?
0,application_1615401410759_0001,pyspark,idle,Link,Link,✔


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

SparkSession available as 'spark'.


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [2]:
import os

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Here we define the specific paths to download the song and songplay log data

In [3]:
path_to_song_data = os.path.join(
    input_data,
    "song_data/*/*/*/*"
)
path_to_log_data = os.path.join(
    input_data,
    "log_data/2018/11/*"
)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Here, we define the paths under the HDFS to save the output tables in

In [4]:
path_to_output_song_table = os.path.join(
    output_path,
    "songs"
)
path_to_output_artist_table = os.path.join(
    output_path,
    "artists"
)
path_to_output_user_table = os.path.join(
    output_path,
    "users"
)
path_to_output_time_table = os.path.join(
    output_path,
    "time"
)
path_to_output_songplay_table = os.path.join(
    output_path,
    "songplays"
)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [5]:
song_data = spark.read.json(path_to_song_data)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [10]:
log_data = spark.read.json(path_to_log_data)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

# Song Table
***

In [47]:
songs_table = song_data.select(
    "song_id",
    "title",
    "artist_id",
    "year",
    "duration"
).dropDuplicates(subset=['song_id'])
print("Number of unique songs found: {}".format(songs_table.count()))
songs_table.take(2)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Number of unique songs found: 14896
[Row(song_id='SOAACFC12A8C140567', title='Supernatural Pt. II', artist_id='ARNHTE41187B99289A', year=0, duration=343.09179), Row(song_id='SOABVPU12AB018AA22', title='Conquer Me', artist_id='ARZZRK91187B9A5CA5', year=2009, duration=180.53179)]

In [67]:
songs_table.write.mode("overwrite").parquet(
    path_to_output_song_table,
    partitionBy=["year", "artist_id"]
)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

# Artist Table
***

In [68]:
artists_table = song_data.select(
    "artist_id",
    "artist_name",
    "artist_location",
    "artist_latitude",
    "artist_longitude"
).dropDuplicates(subset=['artist_id'])
print("Number of unique artists found: {}".format(artists_table.count()))
artists_table.take(2)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Number of unique artists found: 9553
[Row(artist_id='AR04BF811A348F050D', artist_name='Teenagers In Tokyo', artist_location='', artist_latitude=None, artist_longitude=None), Row(artist_id='AR04KY61187FB44E3A', artist_name='Fidel Nadal', artist_location='', artist_latitude=None, artist_longitude=None)]

In [69]:
artists_table.write.mode("overwrite").parquet(
    path_to_output_artist_table
)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

# Users Table
***
Make sure to filter our DataFrame for entries having `page='NextSong'`

In [70]:
nextSong_log_df = log_data.filter("page='NextSong'")
print("Found {} log file rows having page='NextSong'".format(nextSong_log_df.count()))

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Found 6820 log file rows having page='NextSong'

In [71]:
users_table = nextSong_log_df.select(
    "userId",
    "firstName",
    "lastName",
    "gender",
    "level"
).dropDuplicates(subset=['userId'])
print("Number of unique users found: {}".format(users_table.count()))
users_table.take(2)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Number of unique users found: 96
[Row(userId='18', firstName='Jacob', lastName='Rogers', gender='M', level='free'), Row(userId='27', firstName='Carlos', lastName='Carter', gender='M', level='free')]

In [72]:
users_table.write.mode("overwrite").parquet(path_to_output_user_table)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

# Time Table
***
Now we'll import the functions we need to do the time preprocessing

In [73]:
from pyspark.sql.functions import col, year, month, dayofmonth, hour, weekofyear, dayofweek, from_unixtime, to_timestamp

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [74]:
nextSong_log_df = nextSong_log_df.withColumn("start_time", to_timestamp(from_unixtime(col("ts")/1000.0)))\
                                .withColumn("hour", hour(col("start_time")))\
                                .withColumn("day", dayofmonth(col("start_time")))\
                                .withColumn("week", weekofyear(col("start_time")))\
                                .withColumn("month", month(col("start_time")))\
                                .withColumn("year", year(col("start_time")))\
                                .withColumn("weekday", dayofweek(col("start_time")))
time_table = nextSong_log_df.select(
                                "start_time",
                                "hour",
                                "day",
                                "week",
                                "month",
                                "year",
                                "weekday"
                            )
print("Found {} rows in the time table".format(time_table.count()))
time_table.printSchema()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Found 6820 rows in the time table
root
 |-- start_time: timestamp (nullable = true)
 |-- hour: integer (nullable = true)
 |-- day: integer (nullable = true)
 |-- week: integer (nullable = true)
 |-- month: integer (nullable = true)
 |-- year: integer (nullable = true)
 |-- weekday: integer (nullable = true)

In [75]:
time_table.take(2)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

[Row(start_time=datetime.datetime(2018, 11, 15, 0, 30, 26), hour=0, day=15, week=46, month=11, year=2018, weekday=5), Row(start_time=datetime.datetime(2018, 11, 15, 0, 41, 21), hour=0, day=15, week=46, month=11, year=2018, weekday=5)]

In [76]:
time_table.write.mode("overwrite").parquet(
    path_to_output_time_table,
    partitionBy=["year", "month"]
)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

When I wrote the time data to HDFS, I noticed there was only a single year and a single month. This line is just to verify that that is indeed the case.

In [77]:
time_table.select("year","month").distinct().take(2)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

[Row(year=2018, month=11)]

For the songplays data, we will need to merge our song data, as well as our log data together to get the complete rows.

We need the rows:
* songplay_id - nextSong_log_df
* sessionId   - nextSong_log_df
* location    - nextSong_log_df
* user_agent  - nextSong_log_df
* user_id     - nextSong_log_df
* level       - nextSong_log_df
* start_time  - nextSong_log_df
* song_id     - songs_table
* artist_id   - artists_table

What you're gonna have to do is join the `artists_table` and `songs_table` first, on `artist_id`-`artist_id`.

Then, you can join this composite table into the `nextSong_log_df` on `artist_name`-`artist` and `title`-`song`

In [78]:
artists_table.printSchema()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

root
 |-- artist_id: string (nullable = true)
 |-- artist_name: string (nullable = true)
 |-- artist_location: string (nullable = true)
 |-- artist_latitude: double (nullable = true)
 |-- artist_longitude: double (nullable = true)

In [79]:
songs_table.printSchema()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

root
 |-- song_id: string (nullable = true)
 |-- title: string (nullable = true)
 |-- artist_id: string (nullable = true)
 |-- year: long (nullable = true)
 |-- duration: double (nullable = true)

In [80]:
nextSong_log_df.printSchema()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

root
 |-- artist: string (nullable = true)
 |-- auth: string (nullable = true)
 |-- firstName: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- itemInSession: long (nullable = true)
 |-- lastName: string (nullable = true)
 |-- length: double (nullable = true)
 |-- level: string (nullable = true)
 |-- location: string (nullable = true)
 |-- method: string (nullable = true)
 |-- page: string (nullable = true)
 |-- registration: double (nullable = true)
 |-- sessionId: long (nullable = true)
 |-- song: string (nullable = true)
 |-- status: long (nullable = true)
 |-- ts: long (nullable = true)
 |-- userAgent: string (nullable = true)
 |-- userId: string (nullable = true)
 |-- start_time: timestamp (nullable = true)
 |-- hour: integer (nullable = true)
 |-- day: integer (nullable = true)
 |-- week: integer (nullable = true)
 |-- month: integer (nullable = true)
 |-- year: integer (nullable = true)
 |-- weekday: integer (nullable = true)

In [81]:
from pyspark.sql.functions import countDistinct, monotonically_increasing_id

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [82]:
print("Found {} rows in log data".format(nextSong_log_df.count()))
print("Found {} distinct sessions and session items in log data"\
      .format(nextSong_log_df.select("itemInSession","sessionId").distinct().count()))

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Found 6820 rows in log data
Found 6820 distinct sessions and session items in log data

This tells me that the itemInSession and sessionId combination form a primary key for this table

First, we'll join all the data we need, then select the final columns we want to have, and then we can assign a new primary key using the `monotonically_increasing_id` function.

In [83]:
artists_and_songs_table.printSchema()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

root
 |-- artist_id: string (nullable = true)
 |-- artist_name: string (nullable = true)
 |-- artist_location: string (nullable = true)
 |-- artist_latitude: double (nullable = true)
 |-- artist_longitude: double (nullable = true)
 |-- song_id: string (nullable = true)
 |-- title: string (nullable = true)
 |-- year: long (nullable = true)
 |-- duration: double (nullable = true)

In [87]:
artists_and_songs_table = artists_table.join(
    songs_table.withColumnRenamed("year", "song_release_year"),
    on="artist_id",
    how="right"
)
print("{} rows in artists table".format(artists_table.count()))
print("{} rows in songs table".format(songs_table.count()))
print("Retained {} rows after joining artists and songs together".format(artists_and_songs_table.count()))
log_data_with_artist_and_song_data = nextSong_log_df.join(
    artists_and_songs_table,
    (nextSong_log_df.artist==artists_and_songs_table.artist_name) & (nextSong_log_df.song==artists_and_songs_table.title),
    how="left")
print("log_data_with_artist_and_song_data retained {} rows".format(log_data_with_artist_and_song_data.count()))
log_data_with_artist_and_song_data.printSchema()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

9553 rows in artists table
14896 rows in songs table
Retained 14896 rows after joining artists and songs together
log_data_with_artist_and_song_data retained 6820 rows
root
 |-- artist: string (nullable = true)
 |-- auth: string (nullable = true)
 |-- firstName: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- itemInSession: long (nullable = true)
 |-- lastName: string (nullable = true)
 |-- length: double (nullable = true)
 |-- level: string (nullable = true)
 |-- location: string (nullable = true)
 |-- method: string (nullable = true)
 |-- page: string (nullable = true)
 |-- registration: double (nullable = true)
 |-- sessionId: long (nullable = true)
 |-- song: string (nullable = true)
 |-- status: long (nullable = true)
 |-- ts: long (nullable = true)
 |-- userAgent: string (nullable = true)
 |-- userId: string (nullable = true)
 |-- start_time: timestamp (nullable = true)
 |-- hour: integer (nullable = true)
 |-- day: integer (nullable = true)
 |-- week: integer

In [88]:
songplays_table = log_data_with_artist_and_song_data.select(
    "start_time", 
    "userId",
    "level",
    "song_id",
    "artist_id",
    "sessionId",
    "location",
    "userAgent",
    "year",
    "month"
).withColumn("songplay_id", monotonically_increasing_id())
print(songplays_table.count())
songplays_table.printSchema()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

6820
root
 |-- start_time: timestamp (nullable = true)
 |-- userId: string (nullable = true)
 |-- level: string (nullable = true)
 |-- song_id: string (nullable = true)
 |-- artist_id: string (nullable = true)
 |-- sessionId: long (nullable = true)
 |-- location: string (nullable = true)
 |-- userAgent: string (nullable = true)
 |-- year: integer (nullable = true)
 |-- month: integer (nullable = true)
 |-- songplay_id: long (nullable = false)

In [89]:
songplays_table.write.mode("overwrite").parquet(
    path_to_output_songplay_table,
    partitionBy=["year", "month"]
)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Now that all the processing has been written, and the data is successfully written to the HDFS, we need to use `s3-dist-cp` to copy that data into the target S3 bucket.

Here is the command to upload any of the created tables:  
`s3-dist-cp --src [path_to_data_on_hdfs] --dest [s3_bucket_folder_location]`  

So if the `path_to_data_on_hdfs` is `/user/sparkify_data/artists`, then the path to the S3 bucket could be something like `s3://udacity-data-engineer-data-lake-project/artists`

The 5 commands to upload all the data were:
1. `s3-dist-cp --src /user/sparkify_data/users --dest s3://udacity-data-engineer-data-lake-project/users`
2. `s3-dist-cp --src /user/sparkify_data/time --dest s3://udacity-data-engineer-data-lake-project/time`
3. `s3-dist-cp --src /user/sparkify_data/songplays --dest s3://udacity-data-engineer-data-lake-project/songplays`
4. `s3-dist-cp --src /user/sparkify_data/artists --dest s3://udacity-data-engineer-data-lake-project/artists`
5. `s3-dist-cp --src /user/sparkify_data/songs --dest s3://udacity-data-engineer-data-lake-project/songs`