# PySpark ML Pipeline

What we need to do is to apply the features engineering step (of an ML pipeline) using Spark.

I apply the following guides:

0. Start with some basic exploratory analysis.

1. Split the data between train and test

2. Drop the rows with more than 50% of NA values

3. Fill the remaining NA values with the median of the column or the mode if it's a category.

4. Apply a normalization technique to the data (min-max or standard deviation)

5. Generate 2 new columns based on the relations with the existing columns.

# Exploratory analysis

In [4]:
# Importing libraries
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import pyspark
import pyspark_dist_explore as pde
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, when
from pyspark.sql.window import Window
from pyspark.sql.functions import rank, dense_rank, row_number

In [5]:
spark = pyspark.sql.SparkSession.builder.getOrCreate()

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
23/05/03 22:55:12 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
23/05/03 22:55:13 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.


# In this assignment i will use the data frame of my spotify playlist 'liked songs'. I created this dataframe in a medium article, here is the link. Take a look if you like!
https://medium.com/@rubentak/spotify-in-python-part-1-preprocessing-67e690450522

In [8]:
df = pd.read_csv("tracklist.csv")

In [9]:
# Read the dataset into a DataFrame
df = spark.read.csv("tracklist.csv", header=True, inferSchema=True)

Number of rows: 1380
Number of columns: 24


In [10]:
# Check the number of rows and columns
print(f"Number of rows: {df.count()}")
print(f"Number of columns: {len(df.columns)}")

root
 |-- added_at: timestamp (nullable = true)
 |-- id: string (nullable = true)
 |-- name: string (nullable = true)
 |-- popularity: integer (nullable = true)
 |-- uri: string (nullable = true)
 |-- artist: string (nullable = true)
 |-- album: string (nullable = true)
 |-- release_date: string (nullable = true)
 |-- duration_ms: string (nullable = true)
 |-- length: double (nullable = true)
 |-- danceability: double (nullable = true)
 |-- acousticness: double (nullable = true)
 |-- energy: double (nullable = true)
 |-- instrumentalness: double (nullable = true)
 |-- liveness: double (nullable = true)
 |-- loudness: double (nullable = true)
 |-- speechiness: double (nullable = true)
 |-- tempo: double (nullable = true)
 |-- time_signature: double (nullable = true)
 |-- valence: double (nullable = true)
 |-- mode: double (nullable = true)
 |-- key: double (nullable = true)
 |-- genres: string (nullable = true)
 |-- genre_group: string (nullable = true)



In [11]:
# Check the schema
df.printSchema()

Number of missing values per column:
+--------+---+----+----------+---+------+-----+------------+-----------+------+------------+------------+------+----------------+--------+--------+-----------+-----+--------------+-------+----+---+------+-----------+
|added_at| id|name|popularity|uri|artist|album|release_date|duration_ms|length|danceability|acousticness|energy|instrumentalness|liveness|loudness|speechiness|tempo|time_signature|valence|mode|key|genres|genre_group|
+--------+---+----+----------+---+------+-----+------------+-----------+------+------------+------------+------+----------------+--------+--------+-----------+-----+--------------+-------+----+---+------+-----------+
|       0|  0|   0|         0|  0|     0|    0|           0|          0|     2|           2|           2|     2|               2|       2|       2|          2|    2|             2|      2|   2|  2|   233|        288|
+--------+---+----+----------+---+------+-----+------------+-----------+------+------------+---

In [12]:
# Check for missing values
from pyspark.sql.functions import count, when, isnan, col
print("Number of missing values per column:")
df.select([count(when(col(c).isNull(), c)).alias(c) for c in df.columns]).show()


Number of duplicate rows: 0


In [13]:
# Check for duplicates
print(f"Number of duplicate rows: {df.dropDuplicates().count() - df.count()}")


23/05/03 22:55:54 WARN package: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.
[Stage 21:>                                                         (0 + 1) / 1]

+-------+--------------------+--------------------+-----------------+--------------------+--------------------+--------------------+------------------+-----------------+------------------+------------------+-------------------+-------------------+-------------------+-------------------+------------------+-------------------+------------------+------------------+------------------+-------------------+-----------------+---------------+-----------+
|summary|                  id|                name|       popularity|                 uri|              artist|               album|      release_date|      duration_ms|            length|      danceability|       acousticness|             energy|   instrumentalness|           liveness|          loudness|        speechiness|             tempo|    time_signature|           valence|               mode|              key|         genres|genre_group|
+-------+--------------------+--------------------+-----------------+--------------------+----------

                                                                                

In [55]:
# Get summary statistics for numerical columns
df.describe().show()

[Stage 747:>                                                        (0 + 1) / 1]

+-------+--------------------+--------------------+-----------------+--------------------+--------------------+--------------------+------------------+-----------------+------------------+------------------+-------------------+-------------------+-------------------+-------------------+------------------+-------------------+------------------+------------------+------------------+-------------------+-----------------+---------------+-----------+
|summary|                  id|                name|       popularity|                 uri|              artist|               album|      release_date|      duration_ms|            length|      danceability|       acousticness|             energy|   instrumentalness|           liveness|          loudness|        speechiness|             tempo|    time_signature|           valence|               mode|              key|         genres|genre_group|
+-------+--------------------+--------------------+-----------------+--------------------+----------

                                                                                

# Split the data between train and test

In [56]:
# Split the data into train and test sets
train, test = df.randomSplit([0.7, 0.3], seed=42)


# Drop the rows with more than 50% of NA values

In [57]:
# Drop rows with more than 50% of NA values
initial_count = df.count()
df = df.drop(*[c for c in df.columns if (df.where(col(c).isNull()).count() > 0.5*df.count())])
print(f"Number of rows dropping is {df.count() - initial_count}")

Number of rows dropping is 0


# Fill the remaining NA values with the median of the column or the mode if it's a category

In [17]:
# only select the collumns popularity, danceability, energy, key, loudness, mode, speechiness, acousticness, instrumentalness, liveness, valence, tempo, time_signature. These are the columns that i want to use for my model.
df2 = df.select("popularity", "danceability", "energy", "key", "loudness", "mode", "speechiness", "acousticness", "instrumentalness", "liveness", "valence", "tempo", "time_signature")

In [18]:
# Calculate median or mode for each column
from pyspark.sql.types import StringType, TimestampType
def calculate_median_mode(column):
    column_type = df2.schema[column].dataType
    if column_type == StringType():
        mode = df2.groupBy(column).count().orderBy("count", ascending=False).first()
        return mode[0] if mode else "Unknown"
    else:
        data_with_rank = df2.withColumn("rank", row_number().over(Window.orderBy(column)))
        median_index = df2.count() // 2
        median = data_with_rank.filter(col("rank") == median_index).select(column).first()
        if median:
            if column_type == TimestampType():
                return median[0].timestamp()
            else:
                return median[0]
        else:
            return 0 if column_type != TimestampType() else 0.0

23/05/03 22:56:07 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
23/05/03 22:56:07 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
23/05/03 22:56:07 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
23/05/03 22:56:08 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
23/05/03 22:56:08 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
23/05/03 22:56:08 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
23/05/03 2

+----------+------------+------+----+--------+----+-----------+------------+----------------+--------+-------+-------+--------------+
|popularity|danceability|energy| key|loudness|mode|speechiness|acousticness|instrumentalness|liveness|valence|  tempo|time_signature|
+----------+------------+------+----+--------+----+-----------+------------+----------------+--------+-------+-------+--------------+
|        42|       0.729| 0.998| 8.0|  -8.161| 1.0|     0.0614|     0.00283|           0.298|    0.09|  0.653|146.006|           4.0|
|        52|        0.83| 0.688| 5.0|  -9.539| 0.0|     0.0706|      0.0621|           0.708|    0.12|   0.73|123.979|           4.0|
|         3|       0.923| 0.564|11.0|  -9.748| 0.0|      0.123|     0.00506|           0.164|  0.0423|  0.734|124.979|           4.0|
|        42|       0.374| 0.571| 3.0| -10.513| 0.0|     0.0748|        0.71|           0.538|   0.139| 0.0379|127.014|           4.0|
|        39|       0.649| 0.861|11.0|  -6.255| 0.0|     0.0472

In [61]:
# Fill NA values with median or mode
filled_data = df2
for column in df2.columns:
    median_mode = calculate_median_mode(column)
    filled_data = filled_data.na.fill(median_mode, [column])

# Show the result
filled_data.show()

23/05/03 22:46:26 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
23/05/03 22:46:26 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
23/05/03 22:46:26 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
23/05/03 22:46:26 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
23/05/03 22:46:26 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
23/05/03 22:46:26 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
23/05/03 2

+----------+------------+------+----+--------+----+-----------+------------+----------------+--------+-------+-------+--------------+
|popularity|danceability|energy| key|loudness|mode|speechiness|acousticness|instrumentalness|liveness|valence|  tempo|time_signature|
+----------+------------+------+----+--------+----+-----------+------------+----------------+--------+-------+-------+--------------+
|        42|       0.729| 0.998| 8.0|  -8.161| 1.0|     0.0614|     0.00283|           0.298|    0.09|  0.653|146.006|           4.0|
|        52|        0.83| 0.688| 5.0|  -9.539| 0.0|     0.0706|      0.0621|           0.708|    0.12|   0.73|123.979|           4.0|
|         3|       0.923| 0.564|11.0|  -9.748| 0.0|      0.123|     0.00506|           0.164|  0.0423|  0.734|124.979|           4.0|
|        42|       0.374| 0.571| 3.0| -10.513| 0.0|     0.0748|        0.71|           0.538|   0.139| 0.0379|127.014|           4.0|
|        39|       0.649| 0.861|11.0|  -6.255| 0.0|     0.0472

# Apply a normalization technique to the data (min-max or standard deviation)

In [62]:
# Normalize the data
from pyspark.ml.feature import MinMaxScaler
from pyspark.ml.feature import VectorAssembler

assembler = VectorAssembler(inputCols=filled_data.columns, outputCol="features")
assembled_data = assembler.transform(filled_data)

scaler = MinMaxScaler(inputCol="features", outputCol="scaledFeatures")
scaler_model = scaler.fit(assembled_data)
scaled_data = scaler_model.transform(assembled_data)

# Show the result
scaled_data.show()

+----------+------------+------+----+--------+----+-----------+------------+----------------+--------+-------+-------+--------------+--------------------+--------------------+
|popularity|danceability|energy| key|loudness|mode|speechiness|acousticness|instrumentalness|liveness|valence|  tempo|time_signature|            features|      scaledFeatures|
+----------+------------+------+----+--------+----+-----------+------------+----------------+--------+-------+-------+--------------+--------------------+--------------------+
|        42|       0.729| 0.998| 8.0|  -8.161| 1.0|     0.0614|     0.00283|           0.298|    0.09|  0.653|146.006|           4.0|[42.0,0.729,0.998...|[0.46153846153846...|
|        52|        0.83| 0.688| 5.0|  -9.539| 0.0|     0.0706|      0.0621|           0.708|    0.12|   0.73|123.979|           4.0|[52.0,0.83,0.688,...|[0.57142857142857...|
|         3|       0.923| 0.564|11.0|  -9.748| 0.0|      0.123|     0.00506|           0.164|  0.0423|  0.734|124.979|  

# Generate (at least) 2 new columns based on the relations with the existing columns.

In [21]:
# Generate new columns
from pyspark.sql.functions import col, when

new_data = scaled_data.withColumn("energy_danceability", col("energy") * col("danceability"))   .withColumn("energy_valence", col("energy") * col("valence"))

# Show the result
new_data.show()


23/05/03 22:56:18 WARN InstanceBuilder: Failed to load implementation from:dev.ludovic.netlib.blas.JNIBLAS


+----------+------------+------+----+--------+----+-----------+------------+----------------+--------+-------+-------+--------------+--------------------+-------+
|popularity|danceability|energy| key|loudness|mode|speechiness|acousticness|instrumentalness|liveness|valence|  tempo|time_signature|            features|cluster|
+----------+------------+------+----+--------+----+-----------+------------+----------------+--------+-------+-------+--------------+--------------------+-------+
|        42|       0.729| 0.998| 8.0|  -8.161| 1.0|     0.0614|     0.00283|           0.298|    0.09|  0.653|146.006|           4.0|[42.0,0.729,0.998...|      2|
|        52|        0.83| 0.688| 5.0|  -9.539| 0.0|     0.0706|      0.0621|           0.708|    0.12|   0.73|123.979|           4.0|[52.0,0.83,0.688,...|      2|
|         3|       0.923| 0.564|11.0|  -9.748| 0.0|      0.123|     0.00506|           0.164|  0.0423|  0.734|124.979|           4.0|[3.0,0.923,0.564,...|      0|
|        42|       0.3

In [22]:
# add a k-means clustering column
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.clustering import KMeans
from pyspark.ml import Pipeline

# Create a feature vector using the columns you want to include in the clustering
feature_columns = ["popularity", "danceability", "energy", "key", "loudness", "mode", "speechiness", "acousticness", "instrumentalness", "liveness", "valence", "tempo", "time_signature"]
vector_assembler = VectorAssembler(inputCols=feature_columns, outputCol="features")

# Initialize KMeans algorithm
kmeans = KMeans(k=3, seed=1, featuresCol="features", predictionCol="cluster")

# Create a pipeline with the VectorAssembler and KMeans
pipeline = Pipeline(stages=[vector_assembler, kmeans])

# Fit the model and make predictions
model = pipeline.fit(filled_data)
clustered_data = model.transform(filled_data)

# Show the result
clustered_data.show()

NameError: name 'clustered_data' is not defined