In [1]:
import configparser
from datetime import datetime
import os
from pyspark.sql import SparkSession
from pyspark.sql.functions import udf, col
from pyspark.sql.functions import year, month, dayofmonth, hour, weekofyear, date_format
import pyspark.sql.functions as f
from pyspark.sql import types as t

In [2]:
config = configparser.ConfigParser()
config.read('dl.cfg')


['dl.cfg']

In [3]:
os.environ["AWS_ACCESS_KEY_ID"]= config['AWS']['AWS_ACCESS_KEY_ID']
os.environ["AWS_SECRET_ACCESS_KEY"]= config['AWS']['AWS_SECRET_ACCESS_KEY']


SONG_DATA_LOCAL = config['LOCAL']['INPUT_DATA_SD_LOCAL']
INPUT_DATA_LOCAL =config['LOCAL']['INPUT_DATA_LD_LOCAL']
OUTPUT_DATA_LOCAL = config['LOCAL']['OUTPUT_DATA_LOCAL']

#print(AWS_ACCESS_KEY_ID)
#print(AWS_SECRET_ACCESS_KEY)
#print (INPUT_DATA)
#print (OUTPUT_DATA)
print (SONG_DATA_LOCAL)
print(INPUT_DATA_LOCAL)
print (OUTPUT_DATA_LOCAL)

data/song_data/*/*/*/*.json
data/log_data/*/*/*.json
data/output_data/


In [4]:
spark = SparkSession.builder\
                     .config("spark.jars.packages","org.apache.hadoop:hadoop-aws:2.7.0")\
                     .getOrCreate()

### Load song_data

In [5]:
# Load songs
#Read local song

song_data_path = SONG_DATA_LOCAL
# Use this instead if you want to read song_data from S3.
#song_data_path = INPUT_DATA_SD

df_sd = spark.read.json(song_data_path)

In [6]:
df_sd.printSchema()
df_sd.show(5, truncate=False)

root
 |-- artist_id: string (nullable = true)
 |-- artist_latitude: double (nullable = true)
 |-- artist_location: string (nullable = true)
 |-- artist_longitude: double (nullable = true)
 |-- artist_name: string (nullable = true)
 |-- duration: double (nullable = true)
 |-- num_songs: long (nullable = true)
 |-- song_id: string (nullable = true)
 |-- title: string (nullable = true)
 |-- year: long (nullable = true)

+------------------+---------------+------------------------+----------------+----------------------------------------------------------------------------------------------+---------+---------+------------------+----------------------------------------------------+----+
|artist_id         |artist_latitude|artist_location         |artist_longitude|artist_name                                                                                   |duration |num_songs|song_id           |title                                               |year|
+------------------+---------------+-

## Create Tables: Song_tables and artists_table  and write to a parquet file


### start with song table and write to a parquet file


In [7]:
df_sd.createOrReplaceTempView("songs_table_DF")
songs_table = spark.sql("""
    SELECT song_id, title, artist_id, year, duration
    FROM songs_table_DF
    ORDER BY song_id
""")
songs_table.printSchema()
songs_table.show()




root
 |-- song_id: string (nullable = true)
 |-- title: string (nullable = true)
 |-- artist_id: string (nullable = true)
 |-- year: long (nullable = true)
 |-- duration: double (nullable = true)

+------------------+--------------------+------------------+----+---------+
|           song_id|               title|         artist_id|year| duration|
+------------------+--------------------+------------------+----+---------+
|SOAOIBZ12AB01815BE|I Hold Your Hand ...|ARPBNLO1187FB3D52F|2000| 43.36281|
|SOBAYLL12A8C138AF9|Sono andati? Fing...|ARDR4AC1187FB371A1|   0|511.16363|
|SOBBUGU12A8C13E95D|Setting Fire to S...|ARMAC4T1187FB3FA4C|2004|207.77751|
|SOBBXLX12A58A79DDA|Erica (2005 Digit...|AREDBBQ1187B98AFF5|   0|138.63138|
|SOBCOSW12A8C13D398|  Rumba De Barcelona|AR7SMBG1187B9B9066|   0|218.38322|
|SOBEBDG12A58A76D60|        Kassie Jones|ARI3BMM1187FB4255E|   0|220.78649|
|SOBKWDJ12A8C13B2F3|Wild Rose (Back 2...|AR36F9J1187FB406F1|   0|230.71302|
|SOBLGCN12AB0183212|James (Hold The L...|AR

In [8]:
# write DF to JSON files
# Ref: https://stackoverflow.com/questions/29908892/save-a-large-spark-dataframe-as-a-single-json-file-in-s3
#df_sd.write.mode('append').json(songs_table_path)
#---
## Ref: https://spark.apache.org/docs/latest/sql-data-sources-parquet.html
# Partitioning: https://stackoverflow.com/questions/43731679/how-to-save-a-partitioned-parquet-file-in-spark-2-1
# for Local Output
now = datetime.now().strftime('%Y-%m-%d-%H-%M-%S-%f')
songs_table_path = OUTPUT_DATA_LOCAL + "songs_table.parquet" + "_" + now
print(songs_table_path)

# for Output on S3 bucket
#songs_table_path = OUTPUT_DATA + "songs_table.parquet" + "_" + now
#print(songs_table_path)

# NOTE: this command doesn't have partitioning!!
#songs_table.write.parquet(songs_table_path)

# Write DF to Spark parquet file (partitioned by year and artist_id)
songs_table.write.partitionBy("year", "artist_id").parquet(songs_table_path)

data/output_data/songs_table.parquet_2022-03-28-14-39-14-954662


### Create artist tableand write to parquet file

In [9]:
df_sd.createOrReplaceTempView("artists_table_DF")
artists_table = spark.sql("""
    SELECT  artist_id AS artist_id, 
            artist_name AS name,
            artist_location AS location,
            artist_latitude AS latitude,
            artist_longitude AS longitude
    FROM artists_table_DF
    ORDER BY artist_id desc
""")
artists_table.printSchema()
artists_table.show()

root
 |-- artist_id: string (nullable = true)
 |-- name: string (nullable = true)
 |-- location: string (nullable = true)
 |-- latitude: double (nullable = true)
 |-- longitude: double (nullable = true)

+------------------+--------------------+--------------------+--------+----------+
|         artist_id|                name|            location|latitude| longitude|
+------------------+--------------------+--------------------+--------+----------+
|ARYKCQI1187FB3B18F|               Tesla|                    |    null|      null|
|ARXR32B1187FB57099|                 Gob|                    |    null|      null|
|ARVBRGZ1187FB4675A|        Gwen Stefani|                    |    null|      null|
|ARULZCI1241B9C8611|  Luna Orbit Project|                    |    null|      null|
|ARQGYP71187FB44566|        Jimmy Wakely|         Mineola, AR|34.31109| -94.02978|
|ARQ9BO41187FB5CF1F|          John Davis|        Pennsylvania|40.99471| -77.60454|
|ARPFHN61187FB575F6|         Lupe Fiasco|        

In [10]:
# Write DF to parquet file
now = datetime.now().strftime('%Y-%m-%d-%H-%M-%S-%f')
artists_table_path = OUTPUT_DATA_LOCAL + "artists_table.parquet" + "_" + now
#print(artists_table_path)

# Write DF to Spark parquet file (partitioned by year and artist_id)
artists_table.write.parquet(artists_table_path)

## Load log data into spark and write to parquet file

In [11]:
#Read local song

log_data_path = INPUT_DATA_LOCAL
# Use this instead if you want to read song_data from S3.
#song_data_path = INPUT_DATA_SD

df_ld = spark.read.json(log_data_path)

In [12]:
df_ld.printSchema()
df_ld.show(5, truncate=False)

root
 |-- artist: string (nullable = true)
 |-- auth: string (nullable = true)
 |-- firstName: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- itemInSession: long (nullable = true)
 |-- lastName: string (nullable = true)
 |-- length: double (nullable = true)
 |-- level: string (nullable = true)
 |-- location: string (nullable = true)
 |-- method: string (nullable = true)
 |-- page: string (nullable = true)
 |-- registration: double (nullable = true)
 |-- sessionId: long (nullable = true)
 |-- song: string (nullable = true)
 |-- status: long (nullable = true)
 |-- ts: long (nullable = true)
 |-- userAgent: string (nullable = true)
 |-- userId: string (nullable = true)

+-----------+---------+---------+------+-------------+--------+---------+-----+-------------------------------------+------+--------+-----------------+---------+---------------+------+-------------+---------------------------------------------------------------------------------------------------------

In [13]:
df_ld_filtered = df_ld.filter(df_ld.page == 'NextSong')
df_ld_filtered.show(20)

+--------------------+---------+---------+------+-------------+--------+---------+-----+--------------------+------+--------+-----------------+---------+--------------------+------+-------------+--------------------+------+
|              artist|     auth|firstName|gender|itemInSession|lastName|   length|level|            location|method|    page|     registration|sessionId|                song|status|           ts|           userAgent|userId|
+--------------------+---------+---------+------+-------------+--------+---------+-----+--------------------+------+--------+-----------------+---------+--------------------+------+-------------+--------------------+------+
|            Harmonia|Logged In|     Ryan|     M|            0|   Smith|655.77751| free|San Jose-Sunnyval...|   PUT|NextSong|1.541016707796E12|      583|       Sehr kosmisch|   200|1542241826796|"Mozilla/5.0 (X11...|    26|
|         The Prodigy|Logged In|     Ryan|     M|            1|   Smith|260.07465| free|San Jose-Sunnyva

In [14]:
df_ld_filtered.createOrReplaceTempView("users_table_DF")
users_table = spark.sql("""
    SELECT DISTINCT userId AS user_id,
                    firstName AS first_name,
                    lastName AS last_name,
                    gender,
                    level
    FROM users_table_DF
    ORDER BY last_name
    
""")
users_table.printSchema()
users_table.show()

root
 |-- user_id: string (nullable = true)
 |-- first_name: string (nullable = true)
 |-- last_name: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- level: string (nullable = true)

+-------+----------+---------+------+-----+
|user_id|first_name|last_name|gender|level|
+-------+----------+---------+------+-----+
|     66|     Kevin| Arellano|     M| free|
|     34|    Evelin|    Ayala|     F| free|
|     99|       Ann|    Banks|     F| free|
|    100|     Adler|  Barrera|     M| free|
|     42|    Harper|  Barrett|     M| paid|
|     91|    Jayden|     Bell|     M| free|
|      2|   Jizelle| Benjamin|     F| free|
|     58|     Emily|   Benson|     F| paid|
|     72|    Hayden|    Brock|     F| paid|
|     51|      Maia|    Burke|     F| free|
|     32|      Lily|    Burns|     F| free|
|     90|    Andrea|   Butler|     F| free|
|     64|    Hannah|  Calhoun|     F| free|
|     27|    Carlos|   Carter|     M| free|
|     94|      Noah|   Chavez|     M| free|
|    

In [15]:
# Write DF to parquet file
now = datetime.now().strftime('%Y-%m-%d-%H-%M-%S-%f')
users_table_path = OUTPUT_DATA_LOCAL + "users_table.parquet" + "_" + now
print(users_table_path)

# Write DF to Spark parquet file (partitioned by year and artist_id)
users_table.write.parquet(users_table_path)

data/output_data/users_table.parquet_2022-03-28-14-39-23-505061


### Create time table and write to parquet file

In [16]:
# Create a new column with timestamp
# 
import pyspark.sql.functions as f
from pyspark.sql.functions import udf
from pyspark.sql import types as t
#from datetime import datetime

@udf(t.TimestampType())
def get_timestamp(ts):
    return datetime.fromtimestamp(ts/1000.0)

df_ld_filtered = df_ld_filtered.withColumn("timestamp", get_timestamp("ts"))    
df_ld_filtered.printSchema()
df_ld_filtered.show(5, truncate=False)

root
 |-- artist: string (nullable = true)
 |-- auth: string (nullable = true)
 |-- firstName: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- itemInSession: long (nullable = true)
 |-- lastName: string (nullable = true)
 |-- length: double (nullable = true)
 |-- level: string (nullable = true)
 |-- location: string (nullable = true)
 |-- method: string (nullable = true)
 |-- page: string (nullable = true)
 |-- registration: double (nullable = true)
 |-- sessionId: long (nullable = true)
 |-- song: string (nullable = true)
 |-- status: long (nullable = true)
 |-- ts: long (nullable = true)
 |-- userAgent: string (nullable = true)
 |-- userId: string (nullable = true)
 |-- timestamp: timestamp (nullable = true)

+-----------+---------+---------+------+-------------+--------+---------+-----+------------------------------------+------+--------+-----------------+---------+----------------------------------------------+------+-------------+-------------------------------

In [18]:
#  create a new column with datetime
# Create a new column with datetime
# Ref: https://spark.apache.org/docs/latest/api/python/pyspark.sql.html?highlight=dateformat#pyspark.sql.functions.from_unixtime
@udf(t.StringType())
def get_datetime(ts):
    return datetime.fromtimestamp(ts/1000.0).strftime('%Y-%m-%d %H:%M:%S')

df_ld_filtered = df_ld_filtered.withColumn("datetime", get_datetime("ts"))
df_ld_filtered.printSchema()
df_ld_filtered.show(5, truncate=False)


root
 |-- artist: string (nullable = true)
 |-- auth: string (nullable = true)
 |-- firstName: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- itemInSession: long (nullable = true)
 |-- lastName: string (nullable = true)
 |-- length: double (nullable = true)
 |-- level: string (nullable = true)
 |-- location: string (nullable = true)
 |-- method: string (nullable = true)
 |-- page: string (nullable = true)
 |-- registration: double (nullable = true)
 |-- sessionId: long (nullable = true)
 |-- song: string (nullable = true)
 |-- status: long (nullable = true)
 |-- ts: long (nullable = true)
 |-- userAgent: string (nullable = true)
 |-- userId: string (nullable = true)
 |-- timestamp: timestamp (nullable = true)
 |-- datetime: string (nullable = true)

+-----------+---------+---------+------+-------------+--------+---------+-----+------------------------------------+------+--------+-----------------+---------+----------------------------------------------+------+-----

In [19]:
df_ld_filtered.createOrReplaceTempView("time_table_DF")
time_table = spark.sql("""
    SELECT  DISTINCT datetime AS start_time, 
                     hour(timestamp) AS hour, 
                     day(timestamp)  AS day, 
                     weekofyear(timestamp) AS week,
                     month(timestamp) AS month,
                     year(timestamp) AS year,
                     dayofweek(timestamp) AS weekday
    FROM time_table_DF
    ORDER BY start_time
""")
time_table.printSchema()
time_table.show(5, truncate=False)

root
 |-- start_time: string (nullable = true)
 |-- hour: integer (nullable = true)
 |-- day: integer (nullable = true)
 |-- week: integer (nullable = true)
 |-- month: integer (nullable = true)
 |-- year: integer (nullable = true)
 |-- weekday: integer (nullable = true)

+-------------------+----+---+----+-----+----+-------+
|start_time         |hour|day|week|month|year|weekday|
+-------------------+----+---+----+-----+----+-------+
|2018-11-01 16:01:46|16  |1  |44  |11   |2018|5      |
|2018-11-01 16:05:52|16  |1  |44  |11   |2018|5      |
|2018-11-01 16:08:16|16  |1  |44  |11   |2018|5      |
|2018-11-01 16:11:13|16  |1  |44  |11   |2018|5      |
|2018-11-01 16:17:33|16  |1  |44  |11   |2018|5      |
+-------------------+----+---+----+-----+----+-------+
only showing top 5 rows



In [20]:
# Write DF to parquet file
now = datetime.now().strftime('%Y-%m-%d-%H-%M-%S-%f')
time_table_path = OUTPUT_DATA_LOCAL + "time_table.parquet" + "_" + now
print(time_table_path)

# Write DF to Spark parquet file (partitioned by year and artist_id)
time_table.write.parquet(time_table_path)

data/output_data/time_table.parquet_2022-03-28-14-48-46-285065


## Create songplays table + write it to parquet file
### Join song_data and log_data

In [21]:
# Ref: https://stackoverflow.com/questions/33745964/how-to-join-on-multiple-columns-in-pyspark
df_ld_sd_joined = df_ld_filtered.join(df_sd, (df_ld_filtered.artist == df_sd.artist_name) & (df_ld_filtered.song == df_sd.title))
df_ld_sd_joined.printSchema()
df_ld_sd_joined.show(5, truncate=False)

root
 |-- artist: string (nullable = true)
 |-- auth: string (nullable = true)
 |-- firstName: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- itemInSession: long (nullable = true)
 |-- lastName: string (nullable = true)
 |-- length: double (nullable = true)
 |-- level: string (nullable = true)
 |-- location: string (nullable = true)
 |-- method: string (nullable = true)
 |-- page: string (nullable = true)
 |-- registration: double (nullable = true)
 |-- sessionId: long (nullable = true)
 |-- song: string (nullable = true)
 |-- status: long (nullable = true)
 |-- ts: long (nullable = true)
 |-- userAgent: string (nullable = true)
 |-- userId: string (nullable = true)
 |-- timestamp: timestamp (nullable = true)
 |-- datetime: string (nullable = true)
 |-- artist_id: string (nullable = true)
 |-- artist_latitude: double (nullable = true)
 |-- artist_location: string (nullable = true)
 |-- artist_longitude: double (nullable = true)
 |-- artist_name: string (nullable = 

In [22]:
df_ld_sd_joined = df_ld_sd_joined.withColumn("songplay_id", f.monotonically_increasing_id())


df_ld_sd_joined.createOrReplaceTempView("songplays_table_DF")
songplays_table = spark.sql("""
    SELECT  songplay_id AS songplay_id, 
            timestamp   AS start_time, 
            userId      AS user_id, 
            level       AS level,
            song_id     AS song_id,
            artist_id   AS artist_id,
            sessionId   AS session_id,
            location    AS location,
            userAgent   AS user_agent
    FROM songplays_table_DF
    ORDER BY (user_id, session_id) 
""")

songplays_table.printSchema()
songplays_table.show(5, truncate=False)

root
 |-- songplay_id: long (nullable = false)
 |-- start_time: timestamp (nullable = true)
 |-- user_id: string (nullable = true)
 |-- level: string (nullable = true)
 |-- song_id: string (nullable = true)
 |-- artist_id: string (nullable = true)
 |-- session_id: long (nullable = true)
 |-- location: string (nullable = true)
 |-- user_agent: string (nullable = true)

+-----------+-----------------------+-------+-----+------------------+------------------+----------+----------------------------------+-----------------------------------------------------------------------------------------------------------------------------------------+
|songplay_id|start_time             |user_id|level|song_id           |artist_id         |session_id|location                          |user_agent                                                                                                                               |
+-----------+-----------------------+-------+-----+------------------+-----------

In [24]:
 #Write DF to Spark parquet file
# Ref: https://spark.apache.org/docs/latest/sql-data-sources-parquet.html
now = datetime.now().strftime('%Y-%m-%d-%H-%M-%S-%f')
songplays_table_path = OUTPUT_DATA_LOCAL + "songplays_table.parquet" + "_" + now
print(songplays_table_path)
time_table.write.parquet(songplays_table_path)

data/output_data/songplays_table.parquet_2022-03-28-15-32-59-661809


### Check files in s3 bucket 


In [26]:
#import boto3

#s3 = boto3.resource('s3',
#                       region_name="us-west-2",
#                       aws_access_key_id=config['AWS']['AWS_ACCESS_KEY_ID'],
#                       aws_secret_access_key=config['AWS']['AWS_SECRET_ACCESS_KEY']
#                     )
#song_data_path = "udacity-dend"
#log_data_path = INPUT_DATA + "log_data/"
#print(song_data_path)
#print(log_data_path)

#input_data_bucket =  s3.Bucket(song_data_path)

In [27]:
#count_sd = 0
#for obj in input_data_bucket.objects.filter(Prefix="song_data"):
#    count_sd += 1
#    print(obj)
#print(count_sd)


In [28]:
#count_ld = 0
#for obj in input_data_bucket.objects.filter(Prefix="log_data"):
#    count_ld += 1
#    print(obj)
#print(count_ld)

### Read data from parquet files locally

In [30]:
# Read songs_table
#input_data_parquet = OUTPUT_DATA_LOCAL + "songplays_table.parquet*"   # the * to take care of the time appended to the file
#df_sd = spark.read.parquet(input_data_parquet)


In [31]:
df_sd.show()

+-------------------+----+---+----+-----+----+-------+
|         start_time|hour|day|week|month|year|weekday|
+-------------------+----+---+----+-----+----+-------+
|2018-11-25 07:56:40|   7| 25|  47|   11|2018|      1|
|2018-11-25 08:13:48|   8| 25|  47|   11|2018|      1|
|2018-11-25 08:17:10|   8| 25|  47|   11|2018|      1|
|2018-11-25 10:06:25|  10| 25|  47|   11|2018|      1|
|2018-11-25 10:10:39|  10| 25|  47|   11|2018|      1|
|2018-11-25 10:14:24|  10| 25|  47|   11|2018|      1|
|2018-11-25 10:18:23|  10| 25|  47|   11|2018|      1|
|2018-11-25 10:22:11|  10| 25|  47|   11|2018|      1|
|2018-11-25 11:29:05|  11| 25|  47|   11|2018|      1|
|2018-11-25 12:21:32|  12| 25|  47|   11|2018|      1|
|2018-11-25 12:24:18|  12| 25|  47|   11|2018|      1|
|2018-11-25 12:27:30|  12| 25|  47|   11|2018|      1|
|2018-11-25 12:31:25|  12| 25|  47|   11|2018|      1|
|2018-11-25 12:36:14|  12| 25|  47|   11|2018|      1|
|2018-11-25 17:48:10|  17| 25|  47|   11|2018|      1|
|2018-11-2