In [1]:
import configparser
from datetime import datetime
import os
import zipfile
import pyspark.sql.functions as f
from pyspark.sql import types as t
from pyspark.sql import SparkSession
from pyspark.sql.functions import udf, col
from pyspark.sql.functions import year, month, dayofmonth, hour, weekofyear, date_format
from zipfile import ZipFile

In [2]:
# # extract song-data.zip
# with ZipFile('data/song-data.zip', 'r') as zip_ref:
#     zip_ref.extractall('data')

In [3]:
# # extract log-data.zip
# with ZipFile('data/log-data.zip', 'r') as zip_ref:
#     zip_ref.extractall('data/log_data')

In [4]:
config = configparser.ConfigParser()
config.read_file(open('dl.cfg'))

os.environ["AWS_ACCESS_KEY_ID"] = config['AWS']['AWS_ACCESS_KEY_ID']
os.environ["AWS_SECRET_ACCESS_KEY"] = config['AWS']['AWS_SECRET_ACCESS_KEY']
# Read config path file in s3
# SONG_DATA = config['AWS']['INPUT_DATA_SONG_DATA']
# LOG_DATA = config['AWS']['INPUT_DATA_LOG_DATA']
# OUTPUT_DATA = config['AWS']['OUTPUT_DATA']
# print(SONG_DATA)
# print(os.environ["AWS_ACCESS_KEY_ID"])

# Read config path file local
SONG_DATA = config['LOCAL']['INPUT_DATA_SONG_DATA_LOCAL']
LOG_DATA = config['LOCAL']['INPUT_DATA_LOG_DATA_LOCAL']
OUTPUT_DATA = config['LOCAL']['OUTPUT_DATA_LOCAL']

# Create spark session with hadoop-aws package

In [5]:
def create_spark_session():
    spark = SparkSession \
        .builder \
        .config("spark.jars.packages", "org.apache.hadoop:hadoop-aws:2.7.0") \
        .getOrCreate()
    return spark

In [6]:
spark = create_spark_session()
spark

# Load song_data (from JSON to Spark)

In [7]:
# Read song_data
song_data_path = SONG_DATA
# song_data_path = SONG_DATA
# df_song_data = spark.read("json").load("s3://udacity-dend/song_data/*/*/*/*.json")
# df_song_data = spark.read.json("s3://udacity-dend/song_data/A/A/A/*.json")
df_song_data = spark.read.json(song_data_path).dropDuplicates()

In [8]:
df_song_data.printSchema()
df_song_data.show(5)

root
 |-- artist_id: string (nullable = true)
 |-- artist_latitude: double (nullable = true)
 |-- artist_location: string (nullable = true)
 |-- artist_longitude: double (nullable = true)
 |-- artist_name: string (nullable = true)
 |-- duration: double (nullable = true)
 |-- num_songs: long (nullable = true)
 |-- song_id: string (nullable = true)
 |-- title: string (nullable = true)
 |-- year: long (nullable = true)

+------------------+---------------+--------------------+----------------+--------------------+---------+---------+------------------+--------------------+----+
|         artist_id|artist_latitude|     artist_location|artist_longitude|         artist_name| duration|num_songs|           song_id|               title|year|
+------------------+---------------+--------------------+----------------+--------------------+---------+---------+------------------+--------------------+----+
|ARPFHN61187FB575F6|       41.88415|         Chicago, IL|       -87.63241|         Lupe Fiasco|2

# Create (songs_table + artists_table) Tables

## Create Songs table + write it to parquet file

In [9]:
df_song_data.createOrReplaceTempView("songs_table_df")
songs_table = spark.sql("""
    SELECT
        song_id,
        title,
        artist_id,
        year,
        duration
    FROM songs_table_df
    ORDER BY song_id
""")
songs_table.printSchema()
songs_table.show(10)

root
 |-- song_id: string (nullable = true)
 |-- title: string (nullable = true)
 |-- artist_id: string (nullable = true)
 |-- year: long (nullable = true)
 |-- duration: double (nullable = true)

+------------------+--------------------+------------------+----+---------+
|           song_id|               title|         artist_id|year| duration|
+------------------+--------------------+------------------+----+---------+
|SOAOIBZ12AB01815BE|I Hold Your Hand ...|ARPBNLO1187FB3D52F|2000| 43.36281|
|SOBAYLL12A8C138AF9|Sono andati? Fing...|ARDR4AC1187FB371A1|   0|511.16363|
|SOBBUGU12A8C13E95D|Setting Fire to S...|ARMAC4T1187FB3FA4C|2004|207.77751|
|SOBBXLX12A58A79DDA|Erica (2005 Digit...|AREDBBQ1187B98AFF5|   0|138.63138|
|SOBCOSW12A8C13D398|  Rumba De Barcelona|AR7SMBG1187B9B9066|   0|218.38322|
|SOBEBDG12A58A76D60|        Kassie Jones|ARI3BMM1187FB4255E|   0|220.78649|
|SOBKWDJ12A8C13B2F3|Wild Rose (Back 2...|AR36F9J1187FB406F1|   0|230.71302|
|SOBLGCN12AB0183212|James (Hold The L...|AR

In [10]:
# Write DF data to JSON file(s)
now = datetime.now().strftime('%Y-%m-%d-%H-%M-%S-%f')
songs_table_path = OUTPUT_DATA + "songs_table" + "_" + now
print(songs_table_path)
# Partitioning: https://stackoverflow.com/questions/43731679/how-to-save-a-partitioned-parquet-file-in-spark-2-1
# this command doesn't have partitioning
# songs_table.write.parquet(songs_table_path)
# Write DF to Spark parquet file (partitioned by year and artist_id)
songs_table.write.partitionBy("year", "artist_id").parquet(songs_table_path)

data/output_data/songs_table_2022-09-11-13-07-50-979773


## Create Artists table + write it to parquet file

In [11]:
df_song_data.createOrReplaceTempView("artists_table_df")
print(df_song_data)
artists_table = spark.sql("""
    SELECT
        artist_id as artist_id,
        artist_name as name,
        artist_location as location,
        artist_latitude as latitude,
        artist_longitude as longitude
    FROM artists_table_df
    ORDER BY artist_id DESC
""")
artists_table.printSchema()
artists_table.show(5, truncate=False)

DataFrame[artist_id: string, artist_latitude: double, artist_location: string, artist_longitude: double, artist_name: string, duration: double, num_songs: bigint, song_id: string, title: string, year: bigint]
root
 |-- artist_id: string (nullable = true)
 |-- name: string (nullable = true)
 |-- location: string (nullable = true)
 |-- latitude: double (nullable = true)
 |-- longitude: double (nullable = true)

+------------------+------------------+--------------+--------+---------+
|artist_id         |name              |location      |latitude|longitude|
+------------------+------------------+--------------+--------+---------+
|ARYKCQI1187FB3B18F|Tesla             |              |null    |null     |
|ARXR32B1187FB57099|Gob               |              |null    |null     |
|ARWB3G61187FB49404|Steve Morse       |Hamilton, Ohio|null    |null     |
|ARVBRGZ1187FB4675A|Gwen Stefani      |              |null    |null     |
|ARULZCI1241B9C8611|Luna Orbit Project|              |null    |null  

In [12]:
# Write DF to Spark parquet file
now = datetime.now().strftime('%Y-%m-%d-%H-%M-%S-%f')
artists_table_path = OUTPUT_DATA + "artists_table.parquet" + "_" + now
artists_table.write.parquet(artists_table_path)

# Load log_data(from Json to Spark)

In [13]:
# Read local log_data
log_data_path = LOG_DATA
df_log_data = spark.read.json(log_data_path).dropDuplicates()

In [14]:
df_log_data.printSchema()
df_log_data.show(5)

root
 |-- artist: string (nullable = true)
 |-- auth: string (nullable = true)
 |-- firstName: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- itemInSession: long (nullable = true)
 |-- lastName: string (nullable = true)
 |-- length: double (nullable = true)
 |-- level: string (nullable = true)
 |-- location: string (nullable = true)
 |-- method: string (nullable = true)
 |-- page: string (nullable = true)
 |-- registration: double (nullable = true)
 |-- sessionId: long (nullable = true)
 |-- song: string (nullable = true)
 |-- status: long (nullable = true)
 |-- ts: long (nullable = true)
 |-- userAgent: string (nullable = true)
 |-- userId: string (nullable = true)

+-------------+---------+---------+------+-------------+--------+---------+-----+--------------------+------+--------+-----------------+---------+--------------------+------+-------------+--------------------+------+
|       artist|     auth|firstName|gender|itemInSession|lastName|   length|level|     

In [15]:
# Filter record page = 'NextSong'
df_log_data_filtered = df_log_data.filter(df_log_data.page == 'NextSong')
df_log_data.show(20)

+--------------------+----------+---------+------+-------------+---------+---------+-----+--------------------+------+--------+-----------------+---------+--------------------+------+-------------+--------------------+------+
|              artist|      auth|firstName|gender|itemInSession| lastName|   length|level|            location|method|    page|     registration|sessionId|                song|status|           ts|           userAgent|userId|
+--------------------+----------+---------+------+-------------+---------+---------+-----+--------------------+------+--------+-----------------+---------+--------------------+------+-------------+--------------------+------+
|             Fat Joe| Logged In|     Kate|     F|           21|  Harrell|241.34485| paid|Lansing-East Lans...|   PUT|NextSong|1.540472624796E12|      605|Safe 2 Say [The I...|   200|1542296032796|"Mozilla/5.0 (X11...|    97|
|         Linkin Park| Logged In|     Kate|     F|           33|  Harrell|259.86567| paid|Lansin

# Create users_table, times_table, songplays_table table

## Create User Table + write it to parquet file

In [16]:
df_log_data_filtered.createOrReplaceTempView("users_table_df")
users_table = spark.sql("""
    SELECT DISTINCT 
        userId as user_id,
        firstName as first_name,
        lastName as last_name,
        gender,
        level
    FROM users_table_df
    ORDER BY last_name
""")
users_table.printSchema()
users_table.show(20)

root
 |-- user_id: string (nullable = true)
 |-- first_name: string (nullable = true)
 |-- last_name: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- level: string (nullable = true)

+-------+----------+---------+------+-----+
|user_id|first_name|last_name|gender|level|
+-------+----------+---------+------+-----+
|     66|     Kevin| Arellano|     M| free|
|     34|    Evelin|    Ayala|     F| free|
|     99|       Ann|    Banks|     F| free|
|    100|     Adler|  Barrera|     M| free|
|     42|    Harper|  Barrett|     M| paid|
|     91|    Jayden|     Bell|     M| free|
|      2|   Jizelle| Benjamin|     F| free|
|     58|     Emily|   Benson|     F| paid|
|     72|    Hayden|    Brock|     F| paid|
|     51|      Maia|    Burke|     F| free|
|     32|      Lily|    Burns|     F| free|
|     90|    Andrea|   Butler|     F| free|
|     64|    Hannah|  Calhoun|     F| free|
|     27|    Carlos|   Carter|     M| free|
|     94|      Noah|   Chavez|     M| free|
|    

In [17]:
# Write DF to Spark parquet file
now = datetime.now().strftime('%Y-%m-%d-%H-%M-%S-%f')
users_table_path = OUTPUT_DATA + "user_table.parquet" + "_" + now
print(users_table_path)
users_table.write.parquet(users_table_path)

data/output_data/user_table.parquet_2022-09-11-13-08-25-949947


## Create Times Table + write it to parquet file

## Create timestamp column

In [18]:
# Create a new column with timestamp
@udf(t.TimestampType())
def get_timestamp (ts):
    return datetime.fromtimestamp(ts / 1000.0)
    
df_log_data_filtered = df_log_data_filtered.withColumn("timestamp", get_timestamp("ts"))
                    

df_log_data_filtered.printSchema()
df_log_data_filtered.show(5, truncate=False)

root
 |-- artist: string (nullable = true)
 |-- auth: string (nullable = true)
 |-- firstName: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- itemInSession: long (nullable = true)
 |-- lastName: string (nullable = true)
 |-- length: double (nullable = true)
 |-- level: string (nullable = true)
 |-- location: string (nullable = true)
 |-- method: string (nullable = true)
 |-- page: string (nullable = true)
 |-- registration: double (nullable = true)
 |-- sessionId: long (nullable = true)
 |-- song: string (nullable = true)
 |-- status: long (nullable = true)
 |-- ts: long (nullable = true)
 |-- userAgent: string (nullable = true)
 |-- userId: string (nullable = true)
 |-- timestamp: timestamp (nullable = true)

+------------------+---------+---------+------+-------------+---------+---------+-----+---------------------------------------+------+--------+-----------------+---------+-----------------------------------------------------+------+-------------+-------------

In [19]:
# df_log_data_filtered.select('timestamp',).limit(5).collect()

## Create datetime column

In [20]:
# Create a new column with datetime
@udf(t.StringType())
def get_datetime(ts):
    return datetime.fromtimestamp(ts / 1000.0).strftime('%Y-%m-%d %H:%M:%S')

df_log_data_filtered = df_log_data_filtered.withColumn("datetime", get_datetime("ts"))
df_log_data_filtered.printSchema()
df_log_data_filtered.show(5, truncate=False)

root
 |-- artist: string (nullable = true)
 |-- auth: string (nullable = true)
 |-- firstName: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- itemInSession: long (nullable = true)
 |-- lastName: string (nullable = true)
 |-- length: double (nullable = true)
 |-- level: string (nullable = true)
 |-- location: string (nullable = true)
 |-- method: string (nullable = true)
 |-- page: string (nullable = true)
 |-- registration: double (nullable = true)
 |-- sessionId: long (nullable = true)
 |-- song: string (nullable = true)
 |-- status: long (nullable = true)
 |-- ts: long (nullable = true)
 |-- userAgent: string (nullable = true)
 |-- userId: string (nullable = true)
 |-- timestamp: timestamp (nullable = true)
 |-- datetime: string (nullable = true)

+------------------+---------+---------+------+-------------+---------+---------+-----+---------------------------------------+------+--------+-----------------+---------+------------------------------------------------

In [21]:
df_log_data_filtered.createOrReplaceTempView("time_table_df")
time_table = spark.sql("""
    SELECT  DISTINCT 
        datetime AS start_time, 
        hour(timestamp) AS hour, 
        day(timestamp)  AS day, 
        weekofyear(timestamp) AS week,
        month(timestamp) AS month,
        year(timestamp) AS year,
        dayofweek(timestamp) AS weekday
    FROM time_table_df
    ORDER BY start_time
""")
time_table.printSchema()
time_table.show(5, truncate=False)

root
 |-- start_time: string (nullable = true)
 |-- hour: integer (nullable = true)
 |-- day: integer (nullable = true)
 |-- week: integer (nullable = true)
 |-- month: integer (nullable = true)
 |-- year: integer (nullable = true)
 |-- weekday: integer (nullable = true)

+-------------------+----+---+----+-----+----+-------+
|start_time         |hour|day|week|month|year|weekday|
+-------------------+----+---+----+-----+----+-------+
|2018-11-01 21:01:46|21  |1  |44  |11   |2018|5      |
|2018-11-01 21:05:52|21  |1  |44  |11   |2018|5      |
|2018-11-01 21:08:16|21  |1  |44  |11   |2018|5      |
|2018-11-01 21:11:13|21  |1  |44  |11   |2018|5      |
|2018-11-01 21:17:33|21  |1  |44  |11   |2018|5      |
+-------------------+----+---+----+-----+----+-------+
only showing top 5 rows



In [23]:
# Write DF to Spark parquet file
now = datetime.now().strftime('%Y-%m-%d-%H-%M-%S-%f')
time_table_path = OUTPUT_DATA + "times_table.parquet" + "_" + now
print(time_table_path)
# time_table.write.parquet(time_table_path)
time_table.write.mode("overwrite").partitionBy("year", "month")\
            .parquet(time_table_path)

data/output_data/times_table.parquet_2022-09-11-13-18-34-141764


# Create songplays table + write it to parquet file

## Join song_data and log_data

In [24]:
df_ld_sd_joined = df_log_data_filtered.join(df_song_data, (df_log_data_filtered.artist == df_song_data.artist_name) & (df_log_data_filtered.song == df_song_data.title))
df_ld_sd_joined.printSchema()
df_ld_sd_joined.show(5, truncate=False)

root
 |-- artist: string (nullable = true)
 |-- auth: string (nullable = true)
 |-- firstName: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- itemInSession: long (nullable = true)
 |-- lastName: string (nullable = true)
 |-- length: double (nullable = true)
 |-- level: string (nullable = true)
 |-- location: string (nullable = true)
 |-- method: string (nullable = true)
 |-- page: string (nullable = true)
 |-- registration: double (nullable = true)
 |-- sessionId: long (nullable = true)
 |-- song: string (nullable = true)
 |-- status: long (nullable = true)
 |-- ts: long (nullable = true)
 |-- userAgent: string (nullable = true)
 |-- userId: string (nullable = true)
 |-- timestamp: timestamp (nullable = true)
 |-- datetime: string (nullable = true)
 |-- artist_id: string (nullable = true)
 |-- artist_latitude: double (nullable = true)
 |-- artist_location: string (nullable = true)
 |-- artist_longitude: double (nullable = true)
 |-- artist_name: string (nullable = 

In [25]:
df_ld_sd_joined = df_ld_sd_joined.withColumn("songplay_id", f.monotonically_increasing_id())
df_ld_sd_joined.createOrReplaceTempView("songplays_table_df")
songplays_table = spark.sql("""
    SELECT
        songplay_id as songplay_id,
        timestamp as start_time,
        userId as user_id,
        level,
        song_id,
        artist_id,
        sessionId as session_id,
        location,
        userAgent as user_agent
    FROM songplays_table_df
    ORDER BY (user_id, session_id) 
""")

songplays_table.printSchema()
songplays_table.show(5, truncate=False)

root
 |-- songplay_id: long (nullable = false)
 |-- start_time: timestamp (nullable = true)
 |-- user_id: string (nullable = true)
 |-- level: string (nullable = true)
 |-- song_id: string (nullable = true)
 |-- artist_id: string (nullable = true)
 |-- session_id: long (nullable = true)
 |-- location: string (nullable = true)
 |-- user_agent: string (nullable = true)

+------------+-----------------------+-------+-----+------------------+------------------+----------+----------------------------------+-----------------------------------------------------------------------------------------------------------------------------------------+
|songplay_id |start_time             |user_id|level|song_id           |artist_id         |session_id|location                          |user_agent                                                                                                                               |
+------------+-----------------------+-------+-----+------------------+--------

In [29]:
# Write DF to Spark parquet file
now = datetime.now().strftime('%Y-%m-%d-%H-%M-%S-%f')
songplays_table_path = OUTPUT_DATA + "songplays_table.parquet" + "_" + now
print(songplays_table_path)
# songplays_table.write.parquet(songplays_table_path)
time_table.write.mode("overwrite").partitionBy("year", "month").\
    parquet(songplays_table_path)

data/output_data/songplays_table.parquet_2022-09-11-13-23-27-330197


In [32]:
# # delete folder output_data in workspace
# !pip install pytest-shutil
# import shutil
# shutil.rmtree("data/output_data/", ignore_errors=False, onerror=None)



In [None]:
# !pip install pycodestyle
# %pycodestyle etl.py