# Setup Spark in Google Colab
*reference: https://www.analyticsvidhya.com/blog/2020/11/a-must-read-guide-on-how-to-work-with-pyspark-on-google-colab-for-data-scientists/*


*to install other versions, get the download link from https://spark.apache.org/downloads.html*

In [None]:
!apt-get install openjdk-8-jdk-headless -qq > /dev/null

In [None]:
!wget https://dlcdn.apache.org/spark/spark-3.3.3/spark-3.3.3-bin-hadoop3.tgz

--2023-12-14 19:12:08--  https://dlcdn.apache.org/spark/spark-3.3.3/spark-3.3.3-bin-hadoop3.tgz
Resolving dlcdn.apache.org (dlcdn.apache.org)... 151.101.2.132, 2a04:4e42::644
Connecting to dlcdn.apache.org (dlcdn.apache.org)|151.101.2.132|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 299426263 (286M) [application/x-gzip]
Saving to: ‘spark-3.3.3-bin-hadoop3.tgz.2’


2023-12-14 19:12:14 (50.8 MB/s) - ‘spark-3.3.3-bin-hadoop3.tgz.2’ saved [299426263/299426263]



In [None]:
!tar -xvf spark-3.3.3-bin-hadoop3.tgz

spark-3.3.3-bin-hadoop3/
spark-3.3.3-bin-hadoop3/LICENSE
spark-3.3.3-bin-hadoop3/NOTICE
spark-3.3.3-bin-hadoop3/R/
spark-3.3.3-bin-hadoop3/R/lib/
spark-3.3.3-bin-hadoop3/R/lib/SparkR/
spark-3.3.3-bin-hadoop3/R/lib/SparkR/DESCRIPTION
spark-3.3.3-bin-hadoop3/R/lib/SparkR/INDEX
spark-3.3.3-bin-hadoop3/R/lib/SparkR/Meta/
spark-3.3.3-bin-hadoop3/R/lib/SparkR/Meta/Rd.rds
spark-3.3.3-bin-hadoop3/R/lib/SparkR/Meta/features.rds
spark-3.3.3-bin-hadoop3/R/lib/SparkR/Meta/hsearch.rds
spark-3.3.3-bin-hadoop3/R/lib/SparkR/Meta/links.rds
spark-3.3.3-bin-hadoop3/R/lib/SparkR/Meta/nsInfo.rds
spark-3.3.3-bin-hadoop3/R/lib/SparkR/Meta/package.rds
spark-3.3.3-bin-hadoop3/R/lib/SparkR/Meta/vignette.rds
spark-3.3.3-bin-hadoop3/R/lib/SparkR/NAMESPACE
spark-3.3.3-bin-hadoop3/R/lib/SparkR/R/
spark-3.3.3-bin-hadoop3/R/lib/SparkR/R/SparkR
spark-3.3.3-bin-hadoop3/R/lib/SparkR/R/SparkR.rdb
spark-3.3.3-bin-hadoop3/R/lib/SparkR/R/SparkR.rdx
spark-3.3.3-bin-hadoop3/R/lib/SparkR/doc/
spark-3.3.3-bin-hadoop3/R/lib/Spar

In [None]:
!pip install findspark



In [None]:
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.3.3-bin-hadoop3"

In [None]:
import findspark
findspark.init()

In [None]:
findspark.find()

'/content/spark-3.3.3-bin-hadoop3'

In [None]:
from pyspark.sql import SparkSession

spark = SparkSession.builder\
        .master("local")\
        .appName("Colab")\
        .config('spark.ui.port', '4050')\
        .getOrCreate()

In [None]:
sc = spark.sparkContext

In [None]:
test = sc.parallelize([1, 2, 3, 4, 5])
test.map(lambda x: (x, x**2)).collect()

[(1, 1), (2, 4), (3, 9), (4, 16), (5, 25)]

In [None]:
from google.colab import drive
drive.mount('/content/drive')


Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


In [None]:
#List of file names
file_names = ['the_weeknd.csv', 'taylor_swift.csv', 'sza.csv', 'rihanna.csv', 'justin_bieber.csv', 'ed_sheeran.csv', 'drake.csv', 'doja_cat.csv', 'billie_eilish.csv', 'bad_bunny.csv']

# Initialize an empty DataFrame to union all your DataFrames
combined_df = None

# Loop over the file names, read each into a DataFrame, and combine them
for file_name in file_names:
    file_path = f'/content/drive/My Drive/raw-csvs-top-10/{file_name}'
    # Read the CSV file into a DataFrame
    df = spark.read.csv(file_path, header=True, inferSchema=True)
    # Union the DataFrames
    if combined_df is None:
        combined_df = df
    else:
        combined_df = combined_df.union(df)

# Now `combined_df` contains all the data from the 10 CSV files
combined_df.show(5)  # Show the first 5 rows

+--------------------+--------------------+--------------------+--------------------+--------------+-------------------+-------------+----------+--------------------+-------------------+--------------------+------------+------+---+--------+----+-----------+------------+----------------+--------+-------+-------+--------------+
|          Spotify ID|          Artist IDs|          Track Name|          Album Name|Artist Name(s)|       Release Date|Duration (ms)|Popularity|            Added By|           Added At|              Genres|Danceability|Energy|Key|Loudness|Mode|Speechiness|Acousticness|Instrumentalness|Liveness|Valence|  Tempo|Time Signature|
+--------------------+--------------------+--------------------+--------------------+--------------+-------------------+-------------+----------+--------------------+-------------------+--------------------+------------+------+---+--------+----+-----------+------------+----------------+--------+-------+-------+--------------+
|2ye9iWj5V4g6k6H

In [None]:
from pyspark.sql.functions import col, split

row_count_before_clean = combined_df.count()
print(f"The number of rows before data cleaning is: {row_count_before_clean}")

# List of top 10 artist names:
top_10_artist_names = ["The Weeknd", "Taylor Swift", "SZA", "Rihanna", "Justin Bieber", "Ed Sheeran", "Drake", "Doja Cat", "Billie Eilish", "Bad Bunny"]

# 1. Filter entries where the first artist listed in "Artist Name(s)" is one of the top 10 artists
filtered_df = combined_df.withColumn("FirstArtist", split(col("Artist Name(s)"), ",").getItem(0)) \
                         .filter(col("FirstArtist").isin(top_10_artist_names))

# 2. Remove duplicates based on 'Track Name'
no_duplicates_df = filtered_df.dropDuplicates(["Track Name"])

# 3. Remove live versions of songs
# Filter out rows where 'Track Name' contains "- Live" or "Live Version"
cleaned_df = no_duplicates_df.filter(~col("Track Name").like("%- Live%") &
                                     ~col("Track Name").like("%Live Version%"))

row_count_clean = cleaned_df.count()
print(f"Removed duplicates, live versions, and tracks where the top 10 artist is NOT a primary artist.")
print(f"The number of rows after data cleaning is: {row_count_clean}")


The number of rows before data cleaning is: 3084
Removed duplicates, live versions, and tracks where the top 10 artist is NOT a primary artist.
The number of rows after data cleaning is: 1906


In [None]:


columns_to_drop = ["Album Name", "Release Date", "Added By", "Added At", "Genres", "Mode", "Spotify ID", "Artist IDs", "Track Name", "Artist Name(s)", "FirstArtist"]

# Dropping the columns
cleaned_df = cleaned_df.drop(*columns_to_drop)


In [None]:
from pyspark.sql.functions import when

# Adding a new column 'pop_rating' based on the 'popularity' score
cleaned_df = cleaned_df.withColumn(
    'pop_rating',
    when(cleaned_df.Popularity <= 50, 'low')
    # .when((cleaned_df.Popularity > 50) & (cleaned_df.Popularity < 75), 'medium')
    .otherwise('high')
)


In [None]:
from pyspark.sql.functions import when

# Assigning popularity levels based on the 'Popularity' score
data = cleaned_df.withColumn(
    'popularity_level',
    when(cleaned_df.Popularity <= 50, 1)
    # .when((cleaned_df.Popularity > 30) & (cleaned_df.Popularity <= 60), 2)
    .otherwise(2)
)

# Display the first 10 rows
data.show(10)


+-------------+----------+------------+------+---+--------+-----------+------------+----------------+--------+-------+-------+--------------+----------+----------------+
|Duration (ms)|Popularity|Danceability|Energy|Key|Loudness|Speechiness|Acousticness|Instrumentalness|Liveness|Valence|  Tempo|Time Signature|pop_rating|popularity_level|
+-------------+----------+------------+------+---+--------+-----------+------------+----------------+--------+-------+-------+--------------+----------+----------------+
|       230026|        75|       0.476| 0.718|  5|  -7.227|      0.149|       0.263|         0.00261|   0.109|  0.361|183.932|             4|      high|               2|
|       170573|        68|       0.404| 0.564| 11|  -7.013|     0.0344|       0.915|         0.00252|   0.134|  0.371| 93.631|             4|      high|               2|
|       230453|        94|       0.679| 0.587|  7|  -7.015|      0.276|       0.141|         6.35E-6|   0.137|  0.486|186.003|             4|      hig

In [None]:
from pyspark.sql.functions import col

# Counting the values in 'popularity_level'
popularity_level_counts = data.groupBy("popularity_level").count().orderBy("popularity_level")

# Display the counts
popularity_level_counts.show()


+----------------+-----+
|popularity_level|count|
+----------------+-----+
|               1|  654|
|               2| 1252|
+----------------+-----+



**Next we balanced out the data a bit since classification algorithms perform poorly with imbalanced data.**

In [None]:
from pyspark.sql.functions import col

# Sample a subset of level 2 to match the count of level 1
level_1_count = 654
level_2_sample = data.filter(col('popularity_level') == 2).sample(withReplacement=False, fraction=level_1_count/1252)


# Combine the samples with level 1 data
balanced_data = data.filter(col('popularity_level') == 1).union(level_2_sample)


In [None]:
balanced_data.head()


Row(Duration (ms)=345251, Popularity=47, Danceability=0.425, Energy=0.782, Key=5, Loudness=-6.52, Speechiness=0.0892, Acousticness=0.737, Instrumentalness=0.000181, Liveness=0.5, Valence=0.41, Tempo=150.092, Time Signature=4, pop_rating='low', popularity_level=1)

In [None]:
# Separate features and target
feature_columns = [col for col in balanced_data.columns if col != 'popularity_level']
X = balanced_data.select(*feature_columns)
y = balanced_data.select('popularity_level')

# Split the data into training and testing sets
train_data, test_data = balanced_data.randomSplit([0.75, 0.25], seed=42)


In [None]:
from pyspark.sql.functions import when
from pyspark.ml.feature import MinMaxScaler, VectorAssembler
from pyspark.ml import Pipeline

# Calculate the median or mean for 'Tempo' to replace zero values, if needed
median_tempo = balanced_data.approxQuantile('Tempo', [0.5], 0.01)[0]

# Replace zero values in 'Tempo' if necessary
balanced_data = balanced_data.withColumn('Tempo', when(col('Tempo') == 0, median_tempo).otherwise(col('Tempo')))

# Assemble numerical features into a vector
assembler = VectorAssembler(inputCols=['Duration (ms)', 'Danceability', 'Energy', 'Key', 'Loudness', 'Speechiness', 'Acousticness', 'Instrumentalness', 'Liveness', 'Valence', 'Tempo', 'Time Signature'], outputCol='features_vector')

# Scale the features
scaler = MinMaxScaler(inputCol='features_vector', outputCol='scaled_features')

# Pipeline: Assemble and then scale
pipeline = Pipeline(stages=[assembler, scaler])





In [None]:
from pyspark.sql.functions import col, isnan, when, count

# Check for null or NaN values in each column
nulls_in_each_column = train_data.select([count(when(isnan(c) | col(c).isNull(), c)).alias(c) for c in train_data.columns])
nulls_in_each_column.show()


+-------------+----------+------------+------+---+--------+-----------+------------+----------------+--------+-------+-----+--------------+----------+----------------+
|Duration (ms)|Popularity|Danceability|Energy|Key|Loudness|Speechiness|Acousticness|Instrumentalness|Liveness|Valence|Tempo|Time Signature|pop_rating|popularity_level|
+-------------+----------+------------+------+---+--------+-----------+------------+----------------+--------+-------+-----+--------------+----------+----------------+
|            0|         0|           1|     1|  1|       1|          1|           1|               1|       1|      1|    1|             1|         0|               0|
+-------------+----------+------------+------+---+--------+-----------+------------+----------------+--------+-------+-----+--------------+----------+----------------+



In [None]:
cleaned_train_data = train_data.dropna()
cleaned_test_data = test_data.dropna()


In [None]:
# Fit and transform the data
# Fit the pipeline on the training data
fitted_pipeline = pipeline.fit(cleaned_train_data)

# Transform both training and test data
transformed_train_data = fitted_pipeline.transform(cleaned_train_data)
transformed_test_data = fitted_pipeline.transform(cleaned_test_data)



# **Modelling**:
## (1) Logistic Regression
## (2) Decision Tree
## (3) Random Forest Classifier

In [None]:
from pyspark.ml.classification import LogisticRegression

# Train a Logistic Regression model
lr = LogisticRegression(featuresCol='scaled_features', labelCol='popularity_level')
lr_model = lr.fit(transformed_train_data)

# Make predictions on the test data
lr_predictions = lr_model.transform(transformed_test_data)


In [None]:
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

# Evaluate accuracy
evaluator = MulticlassClassificationEvaluator(labelCol="popularity_level", predictionCol="prediction", metricName="accuracy")
accuracy = evaluator.evaluate(lr_predictions)
print("Accuracy of Logistic Regression model: ", accuracy)


Accuracy of Logistic Regression model:  0.6363636363636364


In [None]:
from pyspark.ml.classification import DecisionTreeClassifier

# Train a Decision Tree model
dt = DecisionTreeClassifier(featuresCol='scaled_features', labelCol='popularity_level')
dt_model = dt.fit(transformed_train_data)

# Make predictions on the test data
dt_predictions = dt_model.transform(transformed_test_data)

In [None]:
# Evaluate accuracy
dt_accuracy = evaluator.evaluate(dt_predictions)
print("Accuracy of Decision Tree model: ", dt_accuracy)

Accuracy of Decision Tree model:  0.6201298701298701


In [None]:
from pyspark.ml.classification import RandomForestClassifier

# Train a Random Forest model
rf = RandomForestClassifier(featuresCol='scaled_features', labelCol='popularity_level')
rf_model = rf.fit(transformed_train_data)

# Make predictions on the test data
rf_predictions = rf_model.transform(transformed_test_data)


In [None]:
# Assuming your predictions from Random Forest model are stored in `rf_predictions`
rf_accuracy = evaluator.evaluate(rf_predictions)
print("Accuracy of Random Forest model: ", rf_accuracy)


Accuracy of Random Forest model:  0.6558441558441559


# Hyper parameter tuning of the Random Forest **Model**

In [None]:
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

# Create an instance of the Random Forest Classifier
rf = RandomForestClassifier(featuresCol='scaled_features', labelCol='popularity_level')

# Simplified parameter grid
paramGrid = ParamGridBuilder() \
    .addGrid(rf.numTrees, [10, 15]) \
    .addGrid(rf.maxDepth, [5, 8]) \
    .build()

In [None]:
# Create a cross-validator with fewer folds
crossval = CrossValidator(estimator=rf,
                          estimatorParamMaps=paramGrid,
                          evaluator=MulticlassClassificationEvaluator(labelCol="popularity_level",
                          predictionCol="prediction", metricName="accuracy"),
                          numFolds=3)

In [None]:
# Run cross-validation, and choose the best set of parameters.
cvModel = crossval.fit(transformed_train_data)

In [None]:
# Make predictions on test data. cvModel uses the best model found.
cv_predictions = cvModel.transform(transformed_test_data)

# Evaluate the model
cv_accuracy = evaluator.evaluate(cv_predictions)
print("Accuracy after Hyperparameter Tuning: ", cv_accuracy)

Accuracy after Hyperparameter Tuning:  0.6558441558441559


In [None]:
from pyspark.mllib.evaluation import MulticlassMetrics

# Convert the predictions DataFrame to an RDD of (prediction, label) tuples
cv_predictions_and_labels = cv_predictions.select("prediction", "popularity_level").rdd.map(lambda row: (row[0], float(row[1])))

# Instantiate metrics object
cv_metrics = MulticlassMetrics(cv_predictions_and_labels)

# Confusion Matrix
cv_confusion_matrix = cv_metrics.confusionMatrix().toArray()

# Output the Confusion Matrix
print("Confusion Matrix:\n", cv_confusion_matrix)

# True Positives, False Positives, True Negatives, and False Negatives can be extracted from the confusion matrix
cv_tp = cv_confusion_matrix[1, 1]
cv_fp = cv_confusion_matrix[0, 1]
cv_tn = cv_confusion_matrix[0, 0]
cv_fn = cv_confusion_matrix[1, 0]

print(f"True Positives: {cv_tp}")
print(f"False Positives: {cv_fp}")
print(f"True Negatives: {cv_tn}")
print(f"False Negatives: {cv_fn}")

Confusion Matrix:
 [[ 86.  52.]
 [ 54. 116.]]
True Positives: 116.0
False Positives: 52.0
True Negatives: 86.0
False Negatives: 54.0
