                           Part 1: Data Exploration with Apache Spark

1.	Data Loading and Schema Understanding 

a) Load the dataset into a Spark Data Frame

In [1]:
# Starting sparksession
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("spark-spotifymusicdata").getOrCreate()

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
23/12/03 19:32:00 INFO SparkEnv: Registering MapOutputTracker
23/12/03 19:32:00 INFO SparkEnv: Registering BlockManagerMaster
23/12/03 19:32:00 INFO SparkEnv: Registering BlockManagerMasterHeartbeat
23/12/03 19:32:00 INFO SparkEnv: Registering OutputCommitCoordinator


In [2]:
#Loading the data:

path = 'gs://sparkassignmentstorage/spotify_songs.csv'
file_type = "csv"

#CSV
infer_schema = 'true'
first_row_is_header = 'true'
delimiter = ','


#Import csv

df = spark.read.format(file_type)\
    .option("inferSchema",infer_schema)\
    .option("header", first_row_is_header) \
    .option("sep", delimiter)\
    .load(path)

                                                                                

In [3]:
#Verify if the data is loaded--sanity check
total_rows = df.count()
print("Total number of rows:", total_rows)
total_column = df.count()
print("Total number of column:", total_column)

                                                                                

Total number of rows: 32833
Total number of column: 32833


Part 2: b) Print the schema and verify the data types of each column

In [4]:
df.printSchema()

root
 |-- track_id: string (nullable = true)
 |-- track_name: string (nullable = true)
 |-- track_artist: string (nullable = true)
 |-- track_popularity: integer (nullable = true)
 |-- track_album_id: string (nullable = true)
 |-- track_album_name: string (nullable = true)
 |-- track_album_release_date: string (nullable = true)
 |-- playlist_name: string (nullable = true)
 |-- playlist_id: string (nullable = true)
 |-- playlist_genre: string (nullable = true)
 |-- playlist_subgenre: string (nullable = true)
 |-- danceability: string (nullable = true)
 |-- energy: string (nullable = true)
 |-- key: string (nullable = true)
 |-- loudness: string (nullable = true)
 |-- mode: string (nullable = true)
 |-- speechiness: double (nullable = true)
 |-- acousticness: double (nullable = true)
 |-- instrumentalness: double (nullable = true)
 |-- liveness: double (nullable = true)
 |-- valence: double (nullable = true)
 |-- tempo: double (nullable = true)
 |-- duration_ms: double (nullable = true)


2.	Data Aggregation 

In [None]:
a) Calculate the average danceability, energy, and tempo of tracks by artist: 

In [6]:
# importing avg from pyspark sql functions
from pyspark.sql.functions import avg

# average danceability, energy, and tempo of tracks by artist
average_by_artist = df.groupBy('track_artist').agg(
    avg('danceability').alias('avg_danceability'),
    avg('energy').alias('avg_energy'),
    avg('tempo').alias('avg_tempo')
)

average_by_artist.show()



+---------------+-------------------+------------------+------------------+
|   track_artist|   avg_danceability|        avg_energy|         avg_tempo|
+---------------+-------------------+------------------+------------------+
|      Lil Nas X| 0.7744999999999999| 0.618590909090909|145.49290909090908|
|   INTERSECTION| 0.6513333333333334|0.7153333333333333|           110.282|
|       CHUNG HA|             0.6915|0.8160000000000001|          117.4565|
|      Henri PFR|              0.688|             0.693|           114.051|
|         *NSYNC|              0.581|0.8040000000000002|           132.168|
|      TheLavish|              0.699|             0.803|           106.025|
|         Grimes|             0.5555|0.7853333333333333| 141.9238333333333|
|     Eurythmics|  0.668142857142857|0.7522857142857143|126.45485714285714|
|   Shawn Desman|              0.767|             0.633|           120.031|
|     Snoop Dogg| 0.7402777777777779|0.7422500000000002| 98.20511111111111|
|     Chroma

                                                                                

b) Identify the top 5 artists with the highest average track popularity: 

In [7]:
# Calculate average popularity by artist
average_popularity_by_artist = df.groupBy('track_artist').agg(
    avg('track_popularity').alias('avg_popularity')
)

# Identify the top 5 artists with the highest average track popularity
top_artists = average_popularity_by_artist.orderBy('avg_popularity', ascending=False).limit(5)

top_artists.show()

[Stage 11:>                                                         (0 + 2) / 2]

+-------------+-----------------+
| track_artist|   avg_popularity|
+-------------+-----------------+
|Trevor Daniel|             97.0|
|          Y2K|             91.0|
|  Don Toliver|90.71428571428571|
|  Roddy Ricch|88.21052631578948|
|       DaBaby|87.85714285714286|
+-------------+-----------------+



                                                                                

3.	Data Transformation 

a) Create a new column called “energy_level” that classifies tracks as 'High Energy' (energy > 0.8) or 'Regular Energy' (energy ≤ 0.8):

In [8]:
# importing when function from pyspark.sql.functions
from pyspark.sql.functions import when, col

# Create a new column 'energy_level' based on the 'energy' column
df = df.withColumn(
    'energy_level',
    when(col('energy') > 0.8, 'High Energy').otherwise('Regular Energy')
)
# dispalying few columns with new energy level column created
df.select('track_id','track_name','track_artist','track_popularity','energy_level').show()

+--------------------+--------------------+----------------+----------------+--------------+
|            track_id|          track_name|    track_artist|track_popularity|  energy_level|
+--------------------+--------------------+----------------+----------------+--------------+
|6f807x0ima9a1j3VP...|I Don't Care (wit...|      Ed Sheeran|              66|   High Energy|
|0r7CVbZTWZgbTCYdf...|Memories - Dillon...|        Maroon 5|              67|   High Energy|
|1z1Hg7Vb0AhHDiEmn...|All the Time - Do...|    Zara Larsson|              70|   High Energy|
|75FpbthrwQmzHlBJL...|Call You Mine - K...|The Chainsmokers|              60|   High Energy|
|1e8PAfcKUYoKkxPhr...|Someone You Loved...|   Lewis Capaldi|              69|   High Energy|
|7fvUMiyapMsRRxr07...|Beautiful People ...|      Ed Sheeran|              67|   High Energy|
|2OAylPUDDfwRGfe0l...|Never Really Over...|      Katy Perry|              62|   High Energy|
|6b1RNvAcJjQH73eZO...|Post Malone (feat...|       Sam Feldt|          

b) Group the data by this new energy classification and calculate the average popularity and loudness for each energy_level:

In [9]:
# importing average function from pyspark.sql.functions
from pyspark.sql.functions import avg

# Group data by 'energy_level' and calculate average popularity and loudness
energy_level_stats = df.groupBy('energy_level').agg(
    avg('track_popularity').alias('avg_popularity'),
    avg('loudness').alias('avg_loudness')
)

energy_level_stats.show()



+--------------+-----------------+------------------+
|  energy_level|   avg_popularity|      avg_loudness|
+--------------+-----------------+------------------+
|   High Energy|38.10813030385984|-4.875180217173095|
|Regular Energy|44.66595044344884|-7.636742925067416|
+--------------+-----------------+------------------+



                                                                                

4.	Data Exporting 

a) Export the data that have been classified as 'High Energy':

In [10]:
# Filter data for 'High Energy'
high_energy_data = df.filter(col('energy_level') == 'High Energy')

# Specify the output path for 'High Energy' data in a cloud bucket
output_path = 'gs://sparkassignmentstorage/high_energy_data.csv'

# Write 'High Energy' data to a CSV file
high_energy_data.write.csv(output_path, header=True, mode='overwrite')

# Stop Spark gracefully
spark.stop()

                                                                                