# ETL with Spark SQL (Local)

In [29]:
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, DoubleType, StringType, IntegerType, DateType, TimestampType

import pyspark.sql.functions as F

In [30]:
input_data = "./data"
output_data = "./output"

In [31]:
song_data = f"{input_data}/song_data/*/*/*/"
log_data = f"{input_data}/log_data/"

In [32]:
spark = SparkSession.builder \
    .appName("ETL") \
    .getOrCreate()

In [33]:
song_schema = StructType([
    StructField("artist_id", StringType()),
    StructField("artist_latitude", DoubleType()),
    StructField("artist_location", StringType()),
    StructField("artist_longitude", DoubleType()),
    StructField("artist_name", StringType()),
    StructField("duration", DoubleType()),
    StructField("num_songs", IntegerType()),
    StructField("song_id", StringType()),
    StructField("title", StringType()),
    StructField("year", IntegerType()),
])
songs = spark.read.json(song_data, schema=song_schema)
songs.printSchema()

root
 |-- artist_id: string (nullable = true)
 |-- artist_latitude: double (nullable = true)
 |-- artist_location: string (nullable = true)
 |-- artist_longitude: double (nullable = true)
 |-- artist_name: string (nullable = true)
 |-- duration: double (nullable = true)
 |-- num_songs: integer (nullable = true)
 |-- song_id: string (nullable = true)
 |-- title: string (nullable = true)
 |-- year: integer (nullable = true)



In [34]:
songs.limit(3).toPandas()

Unnamed: 0,artist_id,artist_latitude,artist_location,artist_longitude,artist_name,duration,num_songs,song_id,title,year
0,ARDR4AC1187FB371A1,,,,Montserrat Caballé;Placido Domingo;Vicente Sar...,511.16363,1,SOBAYLL12A8C138AF9,Sono andati? Fingevo di dormire,0
1,AREBBGV1187FB523D2,,"Houston, TX",,Mike Jones (Featuring CJ_ Mello & Lil' Bran),173.66159,1,SOOLYAZ12A6701F4A6,Laws Patrolling (Album Version),0
2,ARMAC4T1187FB3FA4C,40.82624,"Morris Plains, NJ",-74.47995,The Dillinger Escape Plan,207.77751,1,SOBBUGU12A8C13E95D,Setting Fire to Sleeping Giants,2004


In [35]:
songs.createOrReplaceTempView("staging_songs")

In [36]:
log_schema = StructType([
    StructField("artist", StringType()),
    StructField("auth", StringType()),
    StructField("firstName", StringType()),
    StructField("gender", StringType()),
    StructField("itemInSession", IntegerType()),
    StructField("lastName", StringType()),
    StructField("length", DoubleType()),
    StructField("level", StringType()),
    StructField("location", StringType()),
    StructField("method", StringType()),
    StructField("page", StringType()),
    StructField("registration", DoubleType()),
    StructField("sessionId", IntegerType()),
    StructField("song", StringType()),
    StructField("status", IntegerType()),
    StructField("ts", DoubleType()),
    StructField("userAgent", StringType()),
    StructField("userId", IntegerType()),
])
logs = spark.read.json(log_data, schema=log_schema)
logs.printSchema()

root
 |-- artist: string (nullable = true)
 |-- auth: string (nullable = true)
 |-- firstName: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- itemInSession: integer (nullable = true)
 |-- lastName: string (nullable = true)
 |-- length: double (nullable = true)
 |-- level: string (nullable = true)
 |-- location: string (nullable = true)
 |-- method: string (nullable = true)
 |-- page: string (nullable = true)
 |-- registration: double (nullable = true)
 |-- sessionId: integer (nullable = true)
 |-- song: string (nullable = true)
 |-- status: integer (nullable = true)
 |-- ts: double (nullable = true)
 |-- userAgent: string (nullable = true)
 |-- userId: integer (nullable = true)



In [37]:
logs.limit(3).toPandas()

Unnamed: 0,artist,auth,firstName,gender,itemInSession,lastName,length,level,location,method,page,registration,sessionId,song,status,ts,userAgent,userId
0,Harmonia,Logged In,Ryan,M,0,Smith,655.77751,free,"San Jose-Sunnyvale-Santa Clara, CA",PUT,NextSong,1541017000000.0,583,Sehr kosmisch,200,1542242000000.0,"""Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/5...",
1,The Prodigy,Logged In,Ryan,M,1,Smith,260.07465,free,"San Jose-Sunnyvale-Santa Clara, CA",PUT,NextSong,1541017000000.0,583,The Big Gundown,200,1542242000000.0,"""Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/5...",
2,Train,Logged In,Ryan,M,2,Smith,205.45261,free,"San Jose-Sunnyvale-Santa Clara, CA",PUT,NextSong,1541017000000.0,583,Marry Me,200,1542243000000.0,"""Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/5...",


In [38]:
logs = logs.withColumn("timestamp", F.to_timestamp(F.col("ts") / 1000))

In [39]:
logs.limit(3).toPandas()

Unnamed: 0,artist,auth,firstName,gender,itemInSession,lastName,length,level,location,method,page,registration,sessionId,song,status,ts,userAgent,userId,timestamp
0,Harmonia,Logged In,Ryan,M,0,Smith,655.77751,free,"San Jose-Sunnyvale-Santa Clara, CA",PUT,NextSong,1541017000000.0,583,Sehr kosmisch,200,1542242000000.0,"""Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/5...",,2018-11-15 00:30:26.796
1,The Prodigy,Logged In,Ryan,M,1,Smith,260.07465,free,"San Jose-Sunnyvale-Santa Clara, CA",PUT,NextSong,1541017000000.0,583,The Big Gundown,200,1542242000000.0,"""Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/5...",,2018-11-15 00:41:21.796
2,Train,Logged In,Ryan,M,2,Smith,205.45261,free,"San Jose-Sunnyvale-Santa Clara, CA",PUT,NextSong,1541017000000.0,583,Marry Me,200,1542243000000.0,"""Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/5...",,2018-11-15 00:45:41.796


In [40]:
logs.createOrReplaceTempView("staging_events")

## ETL

In [41]:
songs_table = spark.sql("""
    SELECT
      DISTINCT song_id AS song_id,
      title,
      artist_id,
      year,
      duration
    FROM
      staging_songs
""")

In [46]:
# songs_table.write \
#     .partitionBy("year", "artist_id") \
#     .mode("overwrite") \
#     .parquet(f"{output_data}/songs")

In [47]:
songs_table.write \
    .partitionBy("year", "artist_id") \
    .mode("overwrite") \
    .csv(f"{output_data}/songs")

In [48]:
artists_table = spark.sql("""
    SELECT
      DISTINCT artist_id,
      artist_name,
      artist_location,
      artist_latitude,
      artist_longitude
    FROM
      staging_songs
""")

In [49]:
# artists_table.write \
#     .mode("overwrite") \
#     .parquet(f"{output_data}/artists")

In [50]:
artists_table.write \
    .mode("overwrite") \
    .csv(f"{output_data}/artists")

In [51]:
users_table = spark.sql("""
    SELECT
      DISTINCT userid AS user_id,
      firstname AS first_name,
      lastname AS last_name,
      gender,
      level
    FROM
      staging_events
    WHERE
      page = 'NextSong'
""")

In [52]:
# users_table.write \
#     .mode("overwrite") \
#     .parquet(f"{output_data}/users")

In [53]:
users_table.write \
    .mode("overwrite") \
    .csv(f"{output_data}/users")

In [54]:
time_table = spark.sql("""
    SELECT
      DISTINCT timestamp AS start_time,
      EXTRACT(hour FROM timestamp) AS hour,
      EXTRACT(day FROM timestamp) AS day,
      EXTRACT(week FROM timestamp) AS week,
      EXTRACT(month FROM timestamp) AS month,
      EXTRACT(year FROM timestamp) AS year,
      EXTRACT(dayofweek FROM timestamp) AS weekday
    FROM
      staging_events
""")

In [55]:
# time_table.write \
#     .partitionBy("year", "month") \
#     .mode("overwrite") \
#     .parquet(f"{output_data}/time")

In [56]:
time_table.write \
    .partitionBy("year", "month") \
    .mode("overwrite") \
    .csv(f"{output_data}/time")

In [57]:
songplays_table = spark.sql("""
    SELECT
      timestamp AS start_time,
      e.userid AS user_id,
      e.level AS level,
      s.song_id AS song_id,
      s.artist_id AS artist_id,
      e.sessionId AS session_id,
      e.location AS location,
      e.userAgent AS user_agent,
      EXTRACT(month FROM timestamp) AS month,
      EXTRACT(year FROM timestamp) AS year
    FROM
      staging_events e
    JOIN
      staging_songs s
    ON
      e.artist = s.artist_name
      AND e.song = s.title
    WHERE
      e.page = 'NextSong'
""")

In [58]:
# songplays_table.write \
#     .partitionBy("year", "month") \
#     .mode("overwrite") \
#     .parquet(f"{output_data}/songplays")

In [59]:
songplays_table.write \
    .partitionBy("year", "month") \
    .mode("overwrite") \
    .csv(f"{output_data}/songplays")