## Data Loading and Cleaning

In [None]:
# Import packages
import numpy as np
import pandas as pd
import os
from pyspark import SparkConf, SparkContext
from pyspark.sql import SparkSession, SQLContext
from pyspark.sql.types import *
import pyspark.sql.functions as F
from pyspark.sql.functions import col, explode, regexp_replace, split

In [None]:
# Create a Spark Session
spark = SparkSession.builder.master("local[2]").appName("Spotify-Huge-Dataset").getOrCreate() #.enableHiveSupport()
spark

In [None]:
# Load and Read File
df = spark.read.csv('/FileStore/tables/tracks.csv', header=True)

In [None]:
# Remove [''] from the 'artists' column
# Remove spaces after commas
# Create a new column 'artists1'
df1 = df.select("artists", regexp_replace(regexp_replace(col("artists"), "[\\[\\]']", ""), ",\\s*", ",").alias("artists1"))

In [None]:
# Explode the 'artist' column to create separate rows
df1 = df1.select("artists", "artists1", explode(split("artists1", ",")).alias("artist"))

In [None]:
df_joined = df.join(df1, on='artists', how='left')

In [None]:
df_joined = df_joined.withColumn("release_date", F.to_date("release_date", "yyyy-MM-dd"))

## EDA

In [None]:
# See amount of unique songs
df_joined.select(["name","artist"]).distinct().count()

In [None]:
# Create a temporary view that can run SQL tables
df_joined.createOrReplaceTempView("df_table")

# MAGIC %md
# MAGIC #EDA

In [None]:
# Most popular artist (by sum of popularity of songs)
query = """
SELECT
Artist,
ROUND(SUM(popularity), 2) AS Populartiy
FROM df_table
WHERE YEAR(release_date) >= YEAR(CURRENT_DATE) - 50
GROUP BY Artist
ORDER BY AVG(Popularity) DESC
LIMIT 10
"""

spark.sql(query).display()

In [None]:
# Most popular song with their corresponding artists.
query = """
SELECT
artist, name, release_date,
ROUND(AVG(popularity), 2) AS populartiy
FROM df_table
GROUP BY artist, name, release_date
ORDER BY AVG(popularity) DESC
LIMIT 10
"""

spark.sql(query).display()

In [None]:
# Most popular song per decade
query = """
SELECT
    ROUND(Year(release_date), -1) as Decade,
    ROUND(Max(popularity), 2) as Popularity,
    SUBSTRING(MAX(CONCAT(LPAD(Popularity, 11, 0), name)), 12) AS Title,
    SUBSTRING(MAX(CONCAT(LPAD(Popularity, 11, 0), artist)), 12) AS Artist
FROM
    df_table
WHERE
    WHERE YEAR(release_date) >= YEAR(CURRENT_DATE) - 50
GROUP BY Decade
ORDER BY Decade ASC
"""

spark.sql(query).display()

In [None]:
# Most popular song by danceability
query = """
SELECT Name, Artist, Danceability, ROUND(COUNT(Popularity), 2) AS Populartiy
FROM df_table
GROUP BY Danceability, Name, Artist
ORDER BY AVG(Popularity) DESC
LIMIT 10
"""

spark.sql(query).display()

In [None]:
# How has music changed over the decades?
# Calculate average values for various audio features based on popularity
query="""
SELECT
    ROUND(YEAR(release_date), -1) AS Decade,
    SUBSTRING(MAX(CONCAT(LPAD(Popularity, 11, 0), instrumentalness)), 12) AS Instrumentalness,
    SUBSTRING(MAX(CONCAT(LPAD(Popularity, 11, 0), acousticness)), 12) AS Acousticness,
    SUBSTRING(MAX(CONCAT(LPAD(Popularity, 11, 0), energy)), 12) AS Energy,
    SUBSTRING(MAX(CONCAT(LPAD(Popularity, 11, 0), danceability)), 12) AS Dancability,
    SUBSTRING(MAX(CONCAT(LPAD(Popularity, 11, 0), valence)), 12) AS Valence,
    SUBSTRING(MAX(CONCAT(LPAD(Popularity, 11, 0), liveness)), 12) AS Liveness,
    SUBSTRING(MAX(CONCAT(LPAD(Popularity, 11, 0), loudness)), 12) AS Loudness,
    SUBSTRING(MAX(CONCAT(LPAD(Popularity, 11, 0), speechiness)), 12) AS Speechiness
FROM
    df_table
WHERE
    YEAR(release_date) >= YEAR(CURRENT_DATE) - 50
GROUP BY
    ROUND(YEAR(release_date), -1)
ORDER BY Decade ASC
"""

spark.sql(query).display()

In [None]:
# How has music changed over the decades?
# Calculate average values for various audio features

query="""
SELECT
    ROUND(YEAR(release_date), -1) AS Decade,
    AVG(danceability) AS Danceability,
    AVG(energy) AS Energy,
    AVG(instrumentalness) AS Instrumentalness,
    AVG(valence) AS Valence,
    AVG(liveness) AS Liveness,
    AVG(speechiness) AS Speechiness,
    AVG(loudness) as Loudness,
    AVG(acousticness) as Acousticness
FROM
    df_table
WHERE
    YEAR(release_date) >= YEAR(CURRENT_DATE) - 50
GROUP BY
    ROUND(YEAR(release_date), -1)
ORDER BY Decade ASC
"""

spark.sql(query).display()


query="""
SELECT
    FLOOR(danceability * 10) / 10 AS Danceability,
    explicit AS Explicit
FROM
    df_table
"""
spark.sql(query).display()


query="""
SELECT
    ROUND(acousticness, 1) AS Acousticness,
    popularity AS Popularity
FROM
    df_table
"""
spark.sql(query).display()


query="""
SELECT
    YEAR(release_date) AS Year,
    PERCENTILE_APPROX(duration_ms, 0.5) AS Duration
FROM
    df_table
WHERE
    YEAR(release_date) BETWEEN 1970 AND 2022
GROUP BY
 YEAR(release_date)
"""
spark.sql(query).display()