# SparkDataFrames

https://spark.apache.org/docs/3.1.1/api/python/reference/api/pyspark.sql.DataFrame.html

In [1]:
# Installing required packages
!pip install pyspark
!pip install findspark
!pip install pandas

Collecting pyspark
  Downloading pyspark-3.5.1.tar.gz (317.0 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m317.0/317.0 MB[0m [31m3.4 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Building wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.5.1-py2.py3-none-any.whl size=317488491 sha256=52ef22980b0ed095344fc78066afe3d8d8a4874bd573c1d3fa9607e59175c814
  Stored in directory: /root/.cache/pip/wheels/80/1d/60/2c256ed38dddce2fdd93be545214a63e02fbd8d74fb0b7f3a6
Successfully built pyspark
Installing collected packages: pyspark
Successfully installed pyspark-3.5.1
Collecting findspark
  Downloading findspark-2.0.1-py2.py3-none-any.whl (4.4 kB)
Installing collected packages: findspark
Successfully installed findspark-2.0.1


## Imports & Load Data

In [24]:
import pandas as pd
import numpy as np

from pyspark.sql import SparkSession
from pyspark.ml.regression import LinearRegression
from pyspark.ml.classification import DecisionTreeClassifier
from pyspark.ml.classification import RandomForestClassifier
from pyspark import SparkContext
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.feature import VectorAssembler
from pyspark.sql.functions import col, avg , sum

import findspark

findspark.init()

df = pd.read_csv("cleaned_music_streaming.csv")

df.head()

Unnamed: 0,artist,track,popularity,danceability,energy,key,loudness,mode,speechiness,acousticness,instrumentalness,liveness,valence,tempo,duration,time_signature,genre
0,Bruno Mars,That's What I Like (feat. Gucci Mane),60.0,0.854,0.564,1.0,-4.964,1,0.0485,0.0171,0.177348,0.0849,0.899,134.071,234.596,4,5
1,Boston,Hitch a Ride,54.0,0.382,0.814,3.0,-7.23,1,0.0406,0.0011,0.00401,0.101,0.569,116.454,251.733,4,10
2,The Raincoats,No Side to Fall In,35.0,0.434,0.614,6.0,-8.334,1,0.0525,0.486,0.000196,0.394,0.787,147.681,109.667,4,6
3,Deno,Lingo (feat. J.I & Chunkz),66.0,0.853,0.597,10.0,-6.528,0,0.0555,0.0212,0.177348,0.122,0.569,107.033,173.968,4,5
4,Red Hot Chili Peppers,Nobody Weird Like Me - Remastered,53.0,0.167,0.975,2.0,-4.279,1,0.216,0.000169,0.0161,0.172,0.0918,199.06,229.96,4,10


## Create Spark Context and Sessoin

In [3]:
# Context
SparkContext = SparkContext()

# Session
spark = SparkSession \
    .builder \
    .appName("Python Spark DataFrames basic example") \
    .config("spark.some.config.option", "some-value") \
    .getOrCreate()

In [4]:
spark_df = spark.createDataFrame(df)

## PrintSchema & Show

In [5]:
spark_df.printSchema()

root
 |-- artist: string (nullable = true)
 |-- track: string (nullable = true)
 |-- popularity: double (nullable = true)
 |-- danceability: double (nullable = true)
 |-- energy: double (nullable = true)
 |-- key: double (nullable = true)
 |-- loudness: double (nullable = true)
 |-- mode: long (nullable = true)
 |-- speechiness: double (nullable = true)
 |-- acousticness: double (nullable = true)
 |-- instrumentalness: double (nullable = true)
 |-- liveness: double (nullable = true)
 |-- valence: double (nullable = true)
 |-- tempo: double (nullable = true)
 |-- duration: double (nullable = true)
 |-- time_signature: long (nullable = true)
 |-- genre: long (nullable = true)



In [6]:
spark_df.show(5)

+--------------------+--------------------+----------+------------+------+----+--------+----+-----------+------------+------------------+--------+-------+-------+--------+--------------+-----+
|              artist|               track|popularity|danceability|energy| key|loudness|mode|speechiness|acousticness|  instrumentalness|liveness|valence|  tempo|duration|time_signature|genre|
+--------------------+--------------------+----------+------------+------+----+--------+----+-----------+------------+------------------+--------+-------+-------+--------+--------------+-----+
|          Bruno Mars|That's What I Lik...|      60.0|       0.854| 0.564| 1.0|  -4.964|   1|     0.0485|      0.0171|0.1773476204719195|  0.0849|  0.899|134.071| 234.596|             4|    5|
|              Boston|        Hitch a Ride|      54.0|       0.382| 0.814| 3.0|   -7.23|   1|     0.0406|      0.0011|           0.00401|   0.101|  0.569|116.454| 251.733|             4|   10|
|       The Raincoats|  No Side to 

## Temp View of Data

In [7]:
spark_df.createOrReplaceTempView("music_streaming")

##Spark operations

### Genre with the highest average popularity

In [8]:
avg_popularity_by_genre_df = spark_df.groupBy("genre").agg(avg("popularity").alias("avg_popularity"))
top_5_popular_genres = avg_popularity_by_genre_df.orderBy(col("avg_popularity").desc()).limit(5)

print("Top 5 genres with the highest average popularity:")
top_5_popular_genres.show()

Top 5 genres with the highest average popularity:
+-----+-----------------+
|genre|   avg_popularity|
+-----+-----------------+
|    4|56.77410851584384|
|    9|54.87524150977302|
|    5|51.07870208095271|
|   10|48.40947614489272|
|    1|45.86965588796474|
+-----+-----------------+



which artists have recorded the most number of songs with a duration of more than 5 minutes

In [9]:
# Filter songs with a duration of more than 5 minutes
songs_gt_5min_df = spark_df.filter(col("duration") > 5)

# Group by artist and count the number of songs
artist_song_count_df = songs_gt_5min_df.groupBy("artist").count()

# Sort the result in descending order by the count of songs
artist_song_count_df = artist_song_count_df.orderBy(col("count").desc())

# Display the top artist(s) with the most number of songs
print("Top artist(s) with the most number of songs with duration > 5 minutes:")
artist_song_count_df.show(5)

Top artist(s) with the most number of songs with duration > 5 minutes:
+------------------+-----+
|            artist|count|
+------------------+-----+
|The Rolling Stones|   31|
|                U2|   27|
|         Metallica|   27|
|    The Black Keys|   22|
|           Nirvana|   22|
+------------------+-----+
only showing top 5 rows



How many songs are included in every Genre?

In [10]:
# Group by genre and count the number of songs for each genre
genre_song_count_df = spark_df.groupBy("genre").count()

print("Number of songs included in every genre:")
genre_song_count_df.show()

Number of songs included in every genre:
+-----+-----+
|genre|count|
+-----+-----+
|    0|  586|
|    7|  465|
|    6| 2263|
|    9| 1828|
|    5| 1210|
|    1| 1268|
|   10| 4264|
|    3|  371|
|    8| 1704|
|    2| 1182|
|    4|  376|
+-----+-----+



 Which artists dominated the charts?

In [25]:
# Group by artist and calculate the total sum of popularity for each artist
artist_popularity_sum_df = spark_df.groupBy("artist").agg(sum("popularity").alias("total_popularity"))

# Sort the result in descending order by the total sum of popularity
dominant_artists_df = artist_popularity_sum_df.orderBy(col("total_popularity").desc())

# Display the top artist(s) with the highest total sum of popularity
print("Top artist(s) that dominated the charts based on total sum of popularity:")
dominant_artists_df.show(5)

Top artist(s) that dominated the charts based on total sum of popularity:
+------------------+------------------+
|            artist|  total_popularity|
+------------------+------------------+
|    Britney Spears| 2637.241221979766|
|   Backstreet Boys|            2615.0|
|The Rolling Stones|1838.3294319910071|
|         Metallica|            1710.0|
|                U2|            1648.0|
+------------------+------------------+
only showing top 5 rows



 Recommend at least 5 fun/not-boring songs that can be played at a party, you can use features like energy, danceability etc.. to represent cheerfulness.

In [14]:
fun_songs_df = spark_df.filter((col("energy") > 0.7) & (col("danceability") > 0.7))
top_fun_songs_df = fun_songs_df.orderBy(col("popularity").desc()).limit(5)

print("Recommended fun/not-boring songs for a party:")
top_fun_songs_df.select("artist", "track").show(truncate=False)


Recommended fun/not-boring songs for a party:
+---------------+------------------------+
|artist         |track                   |
+---------------+------------------------+
|Måneskin       |Beggin'                 |
|Doja Cat       |Kiss Me More (feat. SZA)|
|Ed Sheeran     |Bad Habits              |
|Doja Cat, SZA  |Kiss Me More (feat. SZA)|
|Los Legendarios|Fiel                    |
+---------------+------------------------+



# Spark ML

### Split the data into training and testing sets & Vector assembler

Vector assembler : A feature transformer that merges multiple columns into a vector column.

In [27]:
from pyspark.ml.classification import RandomForestClassifier, DecisionTreeClassifier
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

# Split the data into training and testing sets
train_df, test_df = spark_df.randomSplit([0.8, 0.2])

# Vector assembler
assembler = VectorAssembler(inputCols=['acousticness', 'danceability', 'energy', 'instrumentalness', 'liveness', 'valence', 'loudness', 'speechiness', 'tempo'],
                           outputCol='features')

# Transform the data
train_df = assembler.transform(train_df)
test_df = assembler.transform(test_df)

# Train and fit the data in the models
knn = RandomForestClassifier(featuresCol='features', labelCol='genre')
dt = DecisionTreeClassifier(featuresCol='features', labelCol='genre')
rf = RandomForestClassifier(featuresCol='features', labelCol='genre')

knnModel = knn.fit(train_df)
dtModel = dt.fit(train_df)
rfModel = rf.fit(train_df)

### Predict and Evaluate the models

In [28]:
# Make predictions on the test data
predictions_knn = knnModel.transform(test_df)
predictions_dt = dtModel.transform(test_df)
predictions_rf = rfModel.transform(test_df)

# Evaluate the accuracy of the models
evaluator = MulticlassClassificationEvaluator(labelCol="genre", predictionCol="prediction", metricName="accuracy")
accuracy_knn = evaluator.evaluate(predictions_knn)
accuracy_dt = evaluator.evaluate(predictions_dt)
accuracy_rf = evaluator.evaluate(predictions_rf)

# Print the results
print("k-Nearest Neighbors Accuracy:", accuracy_knn)
print("Decision Tree Accuracy:", accuracy_dt)
print("Random Forest Accuracy:", accuracy_rf)


k-Nearest Neighbors Accuracy: 0.4085771276595745
Decision Tree Accuracy: 0.39029255319148937
Random Forest Accuracy: 0.4085771276595745
