## 1.Data Loading and Schema Understanding 

In [19]:
# !pip install pyspark

from pyspark.sql import SparkSession

# Initialize a Spark session
spark = SparkSession.builder.appName("MusicDataAnalysis").getOrCreate()

# Load csv into a Spark DataFrame
df = spark.read.csv("./dataset.csv", header=True, inferSchema=True)

# Print the schema to verify the data types
df.printSchema()


root
 |-- id: integer (nullable = true)
 |-- track_id: string (nullable = true)
 |-- artists: string (nullable = true)
 |-- album_name: string (nullable = true)
 |-- track_name: string (nullable = true)
 |-- popularity: string (nullable = true)
 |-- duration_ms: string (nullable = true)
 |-- explicit: string (nullable = true)
 |-- danceability: string (nullable = true)
 |-- energy: string (nullable = true)
 |-- key: string (nullable = true)
 |-- loudness: string (nullable = true)
 |-- mode: string (nullable = true)
 |-- speechiness: string (nullable = true)
 |-- acousticness: string (nullable = true)
 |-- instrumentalness: double (nullable = true)
 |-- liveness: string (nullable = true)
 |-- valence: string (nullable = true)
 |-- tempo: double (nullable = true)
 |-- time_signature: double (nullable = true)
 |-- track_genre: string (nullable = true)



## 2.Data Aggregation

In [20]:
from pyspark.sql import functions as F

# Calculate the averages by artist
df_aggregated = df.groupBy("artists").agg(
    F.avg("danceability").alias("avg_danceability"),
    F.avg("energy").alias("avg_energy"),
    F.avg("tempo").alias("avg_tempo")
)

# Top 5 artists by average track popularity
top_artists = df.groupBy("artists").agg(
    F.avg("popularity").alias("avg_popularity")
).orderBy(F.desc("avg_popularity")).limit(5)

top_artists.show()

+--------------------+--------------+
|             artists|avg_popularity|
+--------------------+--------------+
|Sam Smith;Kim Petras|         100.0|
|    Bizarrap;Quevedo|          99.0|
|       Manuel Turizo|          98.0|
|Bad Bunny;Chencho...|          97.0|
|Bad Bunny;Bomba E...|          94.5|
+--------------------+--------------+



## 3. Data Transformation

In [21]:
# Create a new column "energy_level"
df_with_energy_level = df.withColumn(
    "energy_level",
    F.when(F.col("energy") > 0.8, "High Energy").otherwise("Regular Energy")
)

# Calculate the average popularity and loudness for each energy level
df_grouped_by_energy = df_with_energy_level.groupBy("energy_level").agg(
    F.avg("popularity").alias("avg_popularity"),
    F.avg("loudness").alias("avg_loudness")
)

df_grouped_by_energy.show()

+--------------+------------------+------------------+
|  energy_level|    avg_popularity|      avg_loudness|
+--------------+------------------+------------------+
|   High Energy|31.786889141167464|-5.121428733599088|
|Regular Energy|33.990498624800956| 2.625479753796758|
+--------------+------------------+------------------+



## 4. Data Exporting


In [23]:
# Filter the data for 'High Energy' tracks
high_energy_tracks = df_with_energy_level.filter(F.col("energy_level") == "High Energy")

# Export the high energy tracks
high_energy_tracks.coalesce(1).write.option("header", "true").csv("./high_energy")
