In [1]:
import configparser
from datetime import datetime
import os
from pyspark.sql import SparkSession
from pyspark.sql.functions import udf, col
from pyspark.sql.functions import year, month, dayofmonth, hour, weekofyear, date_format
from pyspark.sql import functions as F
from pyspark.sql import types as T
config = configparser.ConfigParser()
config.read('dl.cfg')
os.environ['AWS_ACCESS_KEY_ID']=''
os.environ['AWS_SECRET_ACCESS_KEY']=''

def create_spark_session():
    spark = SparkSession \
        .builder \
        .config("spark.jars.packages", "org.apache.hadoop:hadoop-aws:2.7.5") \
        .getOrCreate()
    return spark

spark = create_spark_session()

In [3]:
input_data = "s3a://atharva-data-eng/data-lake/small/"
output_data = "s3a://atharva-data-eng/data-lake/small/"
# get filepath to song data file
song_data = input_data + "song_data/*/*/*/*.json"

In [4]:
# read song data file
df = spark.read.json(song_data)

In [5]:
# extract columns to create songs table
songs_table = df.select(["song_id", "title", "artist_id", "year", "duration"])

In [6]:
# write songs table to parquet files partitioned by year and artist
songs_table.write.parquet(output_data + "songs.parquet", partitionBy=["year", "artist_id"], mode="overwrite")

In [7]:
# extract columns to create artists table
artists_table = df.selectExpr("artist_id", "artist_name as name", "artist_location as location", "artist_latitude as lattitude", "artist_longitude as longitude")

In [8]:
# write artists table to parquet files
artists_table.write.parquet(output_data + "artists.parquet", mode="overwrite")

In [9]:
log_data = input_data + "log_data"

In [10]:
# read log data file
df = spark.read.json(log_data)

In [11]:
df.printSchema()

root
 |-- artist: string (nullable = true)
 |-- auth: string (nullable = true)
 |-- firstName: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- itemInSession: long (nullable = true)
 |-- lastName: string (nullable = true)
 |-- length: double (nullable = true)
 |-- level: string (nullable = true)
 |-- location: string (nullable = true)
 |-- method: string (nullable = true)
 |-- page: string (nullable = true)
 |-- registration: double (nullable = true)
 |-- sessionId: long (nullable = true)
 |-- song: string (nullable = true)
 |-- status: long (nullable = true)
 |-- ts: long (nullable = true)
 |-- userAgent: string (nullable = true)
 |-- userId: string (nullable = true)



In [12]:
# filter by actions for song plays
df = df.filter(df.page == "NextSong")

In [13]:
# extract columns for users table    
users_table = df.selectExpr("userId as user_id", "firstName as first_name", "lastName as last_name", "gender", "level")

In [14]:
users_table = users_table.dropDuplicates(["user_id"])

In [15]:
users_table.printSchema()

root
 |-- user_id: string (nullable = true)
 |-- first_name: string (nullable = true)
 |-- last_name: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- level: string (nullable = true)



In [16]:
# write users table to parquet files
users_table.write.parquet(output_data + "users.parquet", mode="overwrite")

In [17]:
users_table.show(5)

+-------+----------+---------+------+-----+
|user_id|first_name|last_name|gender|level|
+-------+----------+---------+------+-----+
|     51|      Maia|    Burke|     F| free|
|      7|    Adelyn|   Jordan|     F| free|
|     15|      Lily|     Koch|     F| paid|
|     54|     Kaleb|     Cook|     M| free|
|    101|    Jayden|      Fox|     M| free|
+-------+----------+---------+------+-----+
only showing top 5 rows



In [18]:
# create timestamp column from original timestamp column
get_timestamp = F.udf(lambda x: datetime.fromtimestamp( (x/1000.0) ), T.TimestampType()) 
df = df.withColumn("timestamp", get_timestamp(df.ts))

In [19]:
# create datetime column from original timestamp column
get_timestamp = F.udf(lambda x: datetime.fromtimestamp( (x/1000.0) ), T.TimestampType()) 
df = df.withColumn("datetime", get_timestamp(df.ts))    

In [20]:
# extract columns to create time table
time_table = df.selectExpr("timestamp as start_time", "hour(timestamp) as hour", "day(timestamp) as day", "weekofyear(timestamp) as week", "month(timestamp) as month", "year(timestamp) as year", "dayofweek(timestamp) as weekday").dropDuplicates(["start_time"])

In [21]:
# write time table to parquet files partitioned by year and month
time_table.write.parquet(output_data + "time.parquet", partitionBy=["year", "month"], mode="overwrite")

In [22]:
# read in song data to use for songplays table
song_data = input_data + "song_data/*/*/*/*.json"
song_df = spark.read.json(song_data)
song_df.createOrReplaceTempView("songs")
df.createOrReplaceTempView("logs")

In [23]:
# extract columns from joined song and log datasets to create songplays table 
songplays_table = spark.sql("""
    SELECT  l.timestamp as start_time, 
            l.userId as user_id,
            l.level as level,
            s.song_id as song_id,
            s.artist_id as artist_id,
            l.sessionId as session_id,
            l.location as location,
            l.userAgent as user_agent,
            year(l.timestamp) as year,
            month(l.timestamp) as month
    FROM logs l
     JOIN songs s ON (l.song=s.title AND l.length=s.duration AND l.artist=s.artist_name)
""").withColumn("songplay_id", F.monotonically_increasing_id())

In [24]:
# write songplays table to parquet files partitioned by year and month
songplays_table.write.parquet(output_data + "songplays.parquet", partitionBy=["year", "month"], mode="overwrite")

In [25]:
user_df = spark.read.parquet(output_data + 'users.parquet')
user_df.registerTempTable("user_data")

gender_paid_counts = spark.sql(
"""
SELECT 
    COUNT(*) GENDER_COUNT,
    gender, 
    ROUND(AVG(CASE WHEN level = 'paid' THEN 1 ELSE 0 END), 2) PAID_FRACTION
FROM user_data
GROUP BY 2
""").collect()

In [54]:
from prettytable import PrettyTable
table = PrettyTable(['Gender', 'Count', 'Paid Fraction'])
for row in gender_paid_counts:
    table.add_row([row.gender, row.GENDER_COUNT, row.PAID_FRACTION])
print(table)

+--------+-------+---------------+
| Gender | Count | Paid Fraction |
+--------+-------+---------------+
|   F    |   55  |      0.27     |
|   M    |   41  |      0.15     |
+--------+-------+---------------+
