In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import udf, col
from pyspark.sql.functions import year, month, dayofmonth, hour, weekofyear, date_format
from zipfile import ZipFile
import configparser
import os

import boto3
!pip3 install datetime
from datetime import datetime as dt



In [3]:
config = configparser.ConfigParser()
config.read('dl.cfg')

os.environ['AWS_ACCESS_KEY_ID']=config['AWS']['AWS_ACCESS_KEY_ID']
os.environ['AWS_SECRET_ACCESS_KEY']=config['AWS']['AWS_SECRET_ACCESS_KEY']

song_data = config.get('S3_Input', 'Song_Data')
song_data_test = config.get('S3_Input', 'Song_Data_Test')

log_data = config.get('S3_Input', 'Log_Data')

print(os.environ['AWS_ACCESS_KEY_ID'])
print(os.environ['AWS_SECRET_ACCESS_KEY'])
print(song_data)

AKIAV565JVOFR22OXGG5
70e/pw0lw1YwOy5njkRVsnqIC5F5EH25lWzRs2hk
s3a://udacity-dend/song_data/*/*/*/*.json


In [4]:
s3 = boto3.resource('s3')
my_bucket = s3.Bucket('udacity-dend')

for file in my_bucket.objects.filter(Prefix='song_data/A/A/A/'):
    print(file)


s3.ObjectSummary(bucket_name='udacity-dend', key='song_data/A/A/A/TRAAAAK128F9318786.json')
s3.ObjectSummary(bucket_name='udacity-dend', key='song_data/A/A/A/TRAAAAV128F421A322.json')
s3.ObjectSummary(bucket_name='udacity-dend', key='song_data/A/A/A/TRAAABD128F429CF47.json')
s3.ObjectSummary(bucket_name='udacity-dend', key='song_data/A/A/A/TRAAACN128F9355673.json')
s3.ObjectSummary(bucket_name='udacity-dend', key='song_data/A/A/A/TRAAAEA128F935A30D.json')
s3.ObjectSummary(bucket_name='udacity-dend', key='song_data/A/A/A/TRAAAED128E0783FAB.json')
s3.ObjectSummary(bucket_name='udacity-dend', key='song_data/A/A/A/TRAAAEM128F93347B9.json')
s3.ObjectSummary(bucket_name='udacity-dend', key='song_data/A/A/A/TRAAAEW128F42930C0.json')
s3.ObjectSummary(bucket_name='udacity-dend', key='song_data/A/A/A/TRAAAFD128F92F423A.json')
s3.ObjectSummary(bucket_name='udacity-dend', key='song_data/A/A/A/TRAAAGR128F425B14B.json')
s3.ObjectSummary(bucket_name='udacity-dend', key='song_data/A/A/A/TRAAAHD128F426

In [5]:
# https://www.programiz.com/python-programming/datetime/strftime

date_created = dt.now().strftime('%Y-%m-%d-%H-%M-%S-%f')
date_created

'2021-06-04-19-59-19-954342'

In [6]:
spark = SparkSession \
        .builder \
        .config("spark.jars.packages", "org.apache.hadoop:hadoop-aws:2.7.0") \
        .getOrCreate()

In [7]:
# song_data_test used for quick testing
# Use song_data for spark EMR
test_file = spark.read.json(song_data_test)

test_file.printSchema()
test_file.limit(5).toPandas()

root
 |-- artist_id: string (nullable = true)
 |-- artist_latitude: double (nullable = true)
 |-- artist_location: string (nullable = true)
 |-- artist_longitude: double (nullable = true)
 |-- artist_name: string (nullable = true)
 |-- duration: double (nullable = true)
 |-- num_songs: long (nullable = true)
 |-- song_id: string (nullable = true)
 |-- title: string (nullable = true)
 |-- year: long (nullable = true)



Unnamed: 0,artist_id,artist_latitude,artist_location,artist_longitude,artist_name,duration,num_songs,song_id,title,year
0,ARTC1LV1187B9A4858,51.4536,"Goldsmith's College, Lewisham, Lo",-0.01802,The Bonzo Dog Band,301.40036,1,SOAFBCP12A8C13CC7D,King Of Scurf (2007 Digital Remaster),1972
1,ARA23XO1187B9AF18F,40.57885,"Carteret, New Jersey",-74.21956,The Smithereens,192.522,1,SOKTJDS12AF72A25E5,Drown In My Own Tears (24-Bit Digitally Remast...,0
2,ARSVTNL1187B992A91,51.50632,"London, England",-0.12714,Jonathan King,129.85424,1,SOEKAZG12AB018837E,I'll Slap Your Face (Entertainment USA Theme),2001
3,AR73AIO1187B9AD57B,37.77916,"San Francisco, CA",-122.42005,Western Addiction,118.07302,1,SOQPWCR12A6D4FB2A3,A Poor Recipe For Civic Cohesion,2005
4,ARXQBR11187B98A2CC,,"Liverpool, England",,Frankie Goes To Hollywood,821.05424,1,SOBRKGM12A8C139EF6,Welcome to the Pleasuredome,1985


# Load Songs Data

In [None]:
with ZipFile('data/song-data.zip', 'r') as zipObj:
   # Extract all the contents of zip file in current directory
   zipObj.extractall('data/song-data')

In [8]:
song_data_df = spark.read.json(song_data_test)

In [None]:
song_data_df.printSchema()
song_data_df.limit(5).toPandas()

In [None]:
song_pandas = song_data_df.toPandas()
song_pandas['song_id'].nunique()

# Create Song and Artist Tables
## Output tables in Parquet Files

In [None]:
# Create Song Data Table parquet file
song_data_df.createOrReplaceTempView("song_data_DF")

song_table = spark.sql("""
    SELECT  DISTINCT song_id, 
            title, 
            artist_id, 
            year, 
            duration
    FROM song_data_DF
    ORDER BY song_id
    """)

song_table.printSchema()
song_table.limit(5).toPandas()

In [None]:
song_table.describe().show()

In [None]:
import os
if not os.path.exists('data/output_data'):
    os.makedirs('data/output_data')

In [None]:
#https://sparkbyexamples.com/pyspark/pyspark-read-and-write-parquet-file/
    

song_table.write.mode("overwrite").partitionBy("year", "artist_id").parquet("/data/output_data/song_table.parquet")

In [None]:
song_table_read = spark.read.parquet("/data/output_data/song_table.parquet")

In [None]:
song_table_read.printSchema()
song_table_read.limit(5).toPandas()

In [None]:
song_table_read.describe().show()

In [None]:
song_data_df.createOrReplaceTempView("artist_data_DF")

artist_table = spark.sql("""
    SELECT
        DISTINCT artist_id,
                 artist_name as name,
                 artist_location as location,
                 artist_latitude as latitude,
                 artist_longitude as longitude
        FROM artist_data_DF
        ORDER BY artist_id
                 """)

artist_table.printSchema()
artist_table.limit(5).toPandas()

In [None]:
artist_table.write.mode("overwrite").parquet("/data/output_data/artist_table.parquet")

In [None]:
artist_table_df = spark.read.parquet("/data/output_data/artist_table.parquet")

artist_table_df.printSchema()
artist_table_df.limit(5).toPandas()

In [None]:
# Load Log Data

In [None]:
# https://thispointer.com/python-how-to-unzip-a-file-extract-single-multiple-or-all-files-from-a-zip-archive/
# Unzip the log_data file into new folder
with ZipFile('data/log-data.zip', 'r') as zipObj:
   # Extract all the contents of zip file in current directory
   zipObj.extractall('data/log-data')


In [None]:
log_data_df = spark.read.json("data/log-data")

In [None]:
log_data_df.printSchema()
log_data_df.limit(5).toPandas()

In [None]:
log_data_filtered_df = log_data_df.filter(log_data_df.page == 'NextSong')

log_data_filtered_df.limit(5).toPandas()

In [None]:
log_data_filtered_df.createOrReplaceTempView("user_table_DF")

user_table = spark.sql("""
    SELECT DISTINCT int(userId) as user_id,
                    firstName as first_name,
                    lastName as last_name,
                    gender,
                    level
    FROM user_table_DF
    ORDER BY user_id
                    """)
user_table.printSchema()
user_table.limit(5).toPandas()

In [None]:
user_table.write.mode("overwrite").partitionBy("user_id").parquet("/data/output_data/user_table.parquet")

In [None]:
user_table = spark.read.parquet('/data/output_data/user_table.parquet')

user_table.printSchema()
user_table.limit(5).toPandas()

In [None]:
#https://stackoverflow.com/questions/51983037/convert-from-timestamp-to-specific-date-in-pyspark

from pyspark.sql.functions import udf
from datetime import datetime
from pyspark.sql import types as t
# Create a function that returns the desired string from a timestamp

def format_timestamp(ts):
    return datetime.fromtimestamp(ts / 1000.0)

# Create the UDF
format_timestamp_udf = udf(lambda x: format_timestamp(x), t.TimestampType())

# Finally, apply the function to each element of the 'timestamp' column
log_data_filtered_df = log_data_filtered_df.withColumn('timestamp', format_timestamp_udf(log_data_filtered_df['ts']))

In [None]:
log_data_filtered_df.printSchema()
log_data_filtered_df.limit(5).toPandas()

In [None]:
#https://stackoverflow.com/questions/51983037/convert-from-timestamp-to-specific-date-in-pyspark

from pyspark.sql.functions import udf
from datetime import datetime
from pyspark.sql import types as t
# Create a function that returns the desired string from a timestamp

def format_datetime(ts):
    return datetime.fromtimestamp(ts / 1000.0).strftime('%Y-%m-%d %H:%M:%S')

# Create the UDF
format_datetime_udf = udf(lambda x: format_datetime(x), t.StringType())

# Finally, apply the function to each element of the 'timestamp' column
log_data_filtered_df = log_data_filtered_df.withColumn('datetime', format_datetime_udf(log_data_filtered_df['ts']))

In [None]:
log_data_filtered_df.printSchema()
log_data_filtered_df.limit(5).toPandas()

In [None]:
log_data_filtered_df.createOrReplaceTempView("time_table_DF")

time_table = spark.sql("""
    SELECT DISTINCT datetime as start_time,
            hour(timestamp) as hour,
            day(timestamp) as day,
            weekofyear(timestamp) as week,
            month(timestamp) as month,
            year(timestamp) as year,
            dayofweek(timestamp) as weekday
    FROM time_table_DF
    ORDER BY start_time """)

time_table.printSchema()
time_table.limit(5).toPandas()

In [None]:
time_table.write.mode("overwrite").partitionBy("year", "month").parquet("/data/output_data/time_table.parquet")

In [None]:
time_table = spark.read.parquet("/data/output_data/time_table.parquet")

time_table.printSchema()
time_table.limit(5).toPandas()

In [None]:
# https://sparkbyexamples.com/pyspark/pyspark-join-explained-with-examples/

song_log_join_df = song_data_df.join(log_data_filtered_df, (log_data_filtered_df.artist == song_data_df.artist_name) & (log_data_filtered_df.song == song_data_df.title))

song_log_join_df.printSchema()
song_log_join_df.limit(5).toPandas()

In [None]:
# https://stackoverflow.com/questions/46213986/how-could-i-add-a-column-to-a-dataframe-in-pyspark-with-incremental-values

from pyspark.sql.functions import monotonically_increasing_id

song_log_join_df = song_log_join_df.withColumn("songplay_id", monotonically_increasing_id())

song_log_join_df.createOrReplaceTempView('songplay_table_df')

songplay_table = spark.sql("""
    SELECT  songplay_id,
            datetime as start_time,
            userId as user_id,
            level,
            song_id,
            artist_id,
            sessionId as session_id,
            location,
            userAgent as user_agent
    FROM songplay_table_df
    ORDER BY songplay_id
            """)

songplay_table.printSchema()
songplay_table.limit(5).toPandas()