## Assignment #3: Music Data Analysis with Apache Spark

### 1. Data Loading and Schema Understanding

In [1]:
# Importing the SparkSession module from pyspark.sql
from pyspark.sql import SparkSession

# Creating a SparkSession for our application - "spark-spotifySongs"
spark = SparkSession.builder.appName("spark-spotifySongs").getOrCreate()

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
23/12/10 17:10:30 INFO SparkEnv: Registering MapOutputTracker
23/12/10 17:10:30 INFO SparkEnv: Registering BlockManagerMaster
23/12/10 17:10:30 INFO SparkEnv: Registering BlockManagerMasterHeartbeat
23/12/10 17:10:30 INFO SparkEnv: Registering OutputCommitCoordinator


#### Load the dataset into a Spark DataFrame

In [3]:
# Specifying the path to the dataset stored in Google Cloud Storage
path=r"gs://hive-storage2/spotify_songs.csv"

# Loading the dataset into a Spark DataFrame.
# 'inferSchema' is set to True to automatically infer the data types of each column
# 'header' is set to True to use the first line of the file as column names
songs_df = spark.read.csv(path, inferSchema=True, header=True)

                                                                                

#### Print the schema and verify the data types of each column

In [4]:
# Printing schema of the DataFrame
songs_df.printSchema()

root
 |-- track_id: string (nullable = true)
 |-- track_name: string (nullable = true)
 |-- track_artist: string (nullable = true)
 |-- track_popularity: integer (nullable = true)
 |-- track_album_id: string (nullable = true)
 |-- track_album_name: string (nullable = true)
 |-- track_album_release_date: string (nullable = true)
 |-- playlist_name: string (nullable = true)
 |-- playlist_id: string (nullable = true)
 |-- playlist_genre: string (nullable = true)
 |-- playlist_subgenre: string (nullable = true)
 |-- danceability: string (nullable = true)
 |-- energy: string (nullable = true)
 |-- key: string (nullable = true)
 |-- loudness: string (nullable = true)
 |-- mode: string (nullable = true)
 |-- speechiness: double (nullable = true)
 |-- acousticness: double (nullable = true)
 |-- instrumentalness: double (nullable = true)
 |-- liveness: double (nullable = true)
 |-- valence: double (nullable = true)
 |-- tempo: double (nullable = true)
 |-- duration_ms: double (nullable = true)


### 2. Data Aggregation

#### Calculate the average danceability, energy, and tempo of tracks by artist

In [9]:
# Importing the 'avg' function from pyspark.sql.functions for aggregation
from pyspark.sql.functions import avg

# Aggregating the data to calculate the average danceability, energy, and tempo for each artist
# Grouping the data by 'track_artist'
# Using the 'agg' function to apply the 'avg' function on 'danceability', 'energy', and 'tempo' columns
# Renaming the resulting columns to 'avg_danceability', 'avg_energy', and 'avg_tempo' for clarity
avg_by_artist = songs_df.groupBy('track_artist').agg(
                avg('danceability').alias('avg_danceability'),
                avg('energy').alias('avg_energy'),
                avg('tempo').alias('avg_tempo')
                )

In [12]:
# Displaying the result of the aggregation
avg_by_artist.show()



+---------------+-------------------+------------------+------------------+
|   track_artist|   avg_danceability|        avg_energy|         avg_tempo|
+---------------+-------------------+------------------+------------------+
|      Lil Nas X| 0.7744999999999999| 0.618590909090909|145.49290909090908|
|   INTERSECTION| 0.6513333333333334|0.7153333333333333|           110.282|
|       CHUNG HA|             0.6915|0.8160000000000001|          117.4565|
|      Henri PFR|              0.688|             0.693|           114.051|
|         *NSYNC|              0.581|0.8040000000000002|           132.168|
|      TheLavish|              0.699|             0.803|           106.025|
|         Grimes|             0.5555|0.7853333333333333| 141.9238333333333|
|     Eurythmics|  0.668142857142857|0.7522857142857143|126.45485714285714|
|   Shawn Desman|              0.767|             0.633|           120.031|
|     Snoop Dogg| 0.7402777777777779|0.7422500000000002| 98.20511111111111|
|     Chroma

                                                                                

#### Identify the top 5 artists with the highest average track popularity

In [14]:
# Grouping the dataset by 'track_artist' and calculating the average track popularity for each artist
# Sorting the results in descending order of average popularity and limiting the result to the top 5 artists
top_df = songs_df.groupBy('track_artist').agg(
         avg('track_popularity').alias('avg_popularity')
         ).orderBy('avg_popularity', ascending=False).limit(5)

In [15]:
# Displaying the top 5 artists with the highest average track popularity.
top_df.show()

+-------------+-----------------+
| track_artist|   avg_popularity|
+-------------+-----------------+
|Trevor Daniel|             97.0|
|          Y2K|             91.0|
|  Don Toliver|90.71428571428571|
|  Roddy Ricch|88.21052631578948|
|       DaBaby|87.85714285714286|
+-------------+-----------------+



> Top 5 artists based on the average popularity of their tracks. <br>
"Trevor Daniel" has the highest average track popularity of 97.0

### 3. Data Transformation

#### Create a new column called “energy_level” that classifies tracks as 'High Energy' (energy > 0.8) or 'Regular Energy' (energy ≤ 0.8)

In [18]:
# Importing necessary functions from pyspark.sql.functions
from pyspark.sql.functions import when, col

# Adding a new column 'energy_level' to classify tracks as 'High Energy' (energy > 0.8) or 'Regular Energy' (energy ≤ 0.8)
songs_df = songs_df.withColumn('energy_level', when(col('energy') > 0.8, 'High Energy').otherwise('Regular Energy'))

In [20]:
# Displaying a few rows to verify the new 'energy_level' classification.
songs_df.select('energy','energy_level').show()

+------+--------------+
|energy|  energy_level|
+------+--------------+
| 0.916|   High Energy|
| 0.815|   High Energy|
| 0.931|   High Energy|
|  0.93|   High Energy|
| 0.833|   High Energy|
| 0.919|   High Energy|
| 0.856|   High Energy|
| 0.903|   High Energy|
| 0.935|   High Energy|
| 0.818|   High Energy|
| 0.923|   High Energy|
| 0.774|Regular Energy|
| 0.726|Regular Energy|
| 0.915|   High Energy|
|  0.78|Regular Energy|
| 0.835|   High Energy|
| 0.901|   High Energy|
| 0.747|Regular Energy|
| 0.557|Regular Energy|
| 0.821|   High Energy|
+------+--------------+
only showing top 20 rows



> Classified tracks as 'High Energy' (energy > 0.8) or 'Regular Energy' (energy ≤ 0.8)

#### Group the data by this new energy classification and calculate the average popularity and loudness for each energy_level

In [21]:
# Grouping the data by the newly created 'energy_level' column
# Calculating the average popularity and loudness for each energy level
energy_classification = songs_df.groupBy('energy_level').agg(
                avg('track_popularity').alias('avg_popularity'),
                avg('loudness').alias('avg_loudness')
                )

In [23]:
# Displaying the average popularity and loudness for each energy classification
energy_classification.show()

+--------------+-----------------+------------------+
|  energy_level|   avg_popularity|      avg_loudness|
+--------------+-----------------+------------------+
|   High Energy|38.10813030385984|-4.875180217173095|
|Regular Energy|44.66595044344884|-7.636742925067416|
+--------------+-----------------+------------------+



> By above output it is observed how energy levels in tracks correlate with their popularity and loudness.

### 4. Data Exporting

#### Export the data that have been classified as 'High Energy'

In [24]:
# Filtering the DataFrame to include only tracks classified as 'High Energy'
high_energy_df = songs_df.filter(col('energy_level') == 'High Energy')

In [27]:
# Displaying the first few rows of the filtered DataFrame
high_energy_df.show(2)

+--------------------+--------------------+------------+----------------+--------------------+--------------------+------------------------+-------------+--------------------+--------------+-----------------+------------+------+---+--------+----+-----------+------------+----------------+--------+-------+-------+-----------+------------+
|            track_id|          track_name|track_artist|track_popularity|      track_album_id|    track_album_name|track_album_release_date|playlist_name|         playlist_id|playlist_genre|playlist_subgenre|danceability|energy|key|loudness|mode|speechiness|acousticness|instrumentalness|liveness|valence|  tempo|duration_ms|energy_level|
+--------------------+--------------------+------------+----------------+--------------------+--------------------+------------------------+-------------+--------------------+--------------+-----------------+------------+------+---+--------+----+-----------+------------+----------------+--------+-------+-------+---------

In [28]:
# Writing the 'High Energy' tracks to a CSV file in Google Cloud Storage.
high_energy_df.write.csv("gs://hive-storage2/spotify_songs_high_energy.csv",header = True)

                                                                                