# Spark Pipeline : Predicting a student's GPA from his performances
DEMBELE Mathilda, MARSOT Elouan


## 1. Starting the Spark session

In [1]:
# Initialisation des librairies pyspark

# Initialisation de Spark
import pyspark

from pyspark.sql import SparkSession

# import ML pyspark modules
# some examples
from pyspark.ml import Pipeline
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml.classification import MultilayerPerceptronClassifier

from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.ml.feature import StringIndexer
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.linalg import Vectors
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder


from pyspark.mllib.classification import LogisticRegressionWithLBFGS
from pyspark.ml.classification import LogisticRegression
from pyspark.mllib.evaluation import MulticlassMetrics
from pyspark.mllib.util import MLUtils
from pyspark.mllib.regression import LabeledPoint

from pyspark import SparkConf, SparkContext, SQLContext

import findspark
findspark.init()

spark = SparkSession \
    .builder \
    .appName("GPAPredictor") \
    .getOrCreate()
 


## 2. Data reading

In [2]:
"""
DATASET DESCRIPTION
- TRAIN : 1531 samples 
- TEST : 384 samples 
- each line in the dataset stands for some student
- each column is a feature of performance for the student

14 features :
- StudentID : int, a four-figures unique number 
- Age : int 
- Gender : binary, 0 for a man, 1 for a woman 
- Ethnicity : categorial (Caucasian, Asian, African American, Other)
- ParentalEducation : categorial (High School, Bachelor, Some College, Higher)
- StudyTimeWeekly : float, nb of hours per week 
- Absences : int 
- Tutoring : binary, 1 if yes, 0 otherwise 
- ParentalSupport : categorial (Low, Moderate, High, Very High)
- Extracurricular : binary
- Sports : binary 
- Music : binary 
- Volunteering : binary 

- GPA : float (from 0 to 4)

"""

fileNameTrain = "datasets/train.csv"
fileNameTest = "datasets/test.csv"

# Reading the datasets
train_set = spark.read.csv(fileNameTrain, header=True, inferSchema=True)
test_set = spark.read.csv(fileNameTest, header=True, inferSchema=True)



In [3]:
# CHECKING THAT THEY HAVE THE SAME SCHEMA
train_set.printSchema()
test_set.printSchema()
train_set.take(1)
test_set.take(1)

root
 |-- StudentID: integer (nullable = true)
 |-- Age: integer (nullable = true)
 |-- Gender: integer (nullable = true)
 |-- Ethnicity: string (nullable = true)
 |-- ParentalEducation: string (nullable = true)
 |-- StudyTimeWeekly: double (nullable = true)
 |-- Absences: integer (nullable = true)
 |-- Tutoring: integer (nullable = true)
 |-- ParentalSupport: string (nullable = true)
 |-- Extracurricular: integer (nullable = true)
 |-- Sports: integer (nullable = true)
 |-- Music: integer (nullable = true)
 |-- Volunteering: integer (nullable = true)
 |-- GPA: double (nullable = true)

root
 |-- StudentID: integer (nullable = true)
 |-- Age: integer (nullable = true)
 |-- Gender: integer (nullable = true)
 |-- Ethnicity: string (nullable = true)
 |-- ParentalEducation: string (nullable = true)
 |-- StudyTimeWeekly: double (nullable = true)
 |-- Absences: integer (nullable = true)
 |-- Tutoring: integer (nullable = true)
 |-- ParentalSupport: string (nullable = true)
 |-- Extracurricul

[Row(StudentID=2340, Age=16, Gender=1, Ethnicity='Other', ParentalEducation='Higher', StudyTimeWeekly=5.04404804318662, Absences=25, Tutoring=1, ParentalSupport='Moderate', Extracurricular=1, Sports=0, Music=0, Volunteering=0, GPA=0.886889415770466)]

## 3. Data preprocessing

### 3.1. Handling missing values

In [4]:
# Missing values
from pyspark.sql.functions import col

train_set.select([(col(c).isNull().cast("int")).alias(c) for c in train_set.columns]).groupBy().sum().show()

+--------------+--------+-----------+--------------+----------------------+--------------------+-------------+-------------+--------------------+--------------------+-----------+----------+-----------------+--------+
|sum(StudentID)|sum(Age)|sum(Gender)|sum(Ethnicity)|sum(ParentalEducation)|sum(StudyTimeWeekly)|sum(Absences)|sum(Tutoring)|sum(ParentalSupport)|sum(Extracurricular)|sum(Sports)|sum(Music)|sum(Volunteering)|sum(GPA)|
+--------------+--------+-----------+--------------+----------------------+--------------------+-------------+-------------+--------------------+--------------------+-----------+----------+-----------------+--------+
|             0|       0|          0|             0|                   142|                   0|            0|            0|                 132|                   0|          0|         0|                0|       0|
+--------------+--------+-----------+--------------+----------------------+--------------------+-------------+-------------+--------

only ParentalEducation and ParentalSupport have some missing values : this should be handled in our future pipelines

In [5]:
categories = train_set.select("ParentalEducation").distinct().collect()
for row in categories:
    print(row["ParentalEducation"])

High School
Higher
Bachelor
Some College
None


In [6]:
categories = train_set.select("ParentalSupport").distinct().collect()
for row in categories:
    print(row["ParentalSupport"])

High
Very High
Low
Moderate
None


Because there is an order among those categories, let's map them then we will try to imput them.

In [7]:
ordinal_mapping_education = {"High School": 1, "Some College": 2, "Bachelor": 3, "Higher": 4}
ordinal_mapping_support = {"Low": 0, "Moderate": 1, "High": 2, "Very High": 4}

In [8]:
from pyspark.ml import Transformer
from pyspark.sql.functions import when, col

class OrdinalEncoder(Transformer):
    def __init__(self, mappings, inputCols, outputCols):
        super(OrdinalEncoder, self).__init__()
        self.mappings = mappings  
        self.inputCols = inputCols
        self.outputCols = outputCols

    def _transform(self, df):
        for inputCol, outputCol, mapping in zip(self.inputCols, self.outputCols, self.mappings):
            expr = None
            for category, value in mapping.items():
                if expr is None:
                    expr = when(col(inputCol) == category, value)
                else:
                    expr = expr.when(col(inputCol) == category, value)
            df = df.withColumn(outputCol, expr.otherwise(None)) 
        return df

In [9]:
from pyspark.ml.feature import Imputer

"""
ORDINAL ENCODING 
maps categorical values in the columns ParentalEducation and ParentalSupport to numeric values based 
on a predefined mapping
"""
encoder = OrdinalEncoder(
    mappings=[ordinal_mapping_education, ordinal_mapping_support],
    inputCols=["ParentalEducation", "ParentalSupport"],
    outputCols=["ParentalEducationEncoded", "ParentalSupportEncoded"]
)


"""
IMPUTING 
replaces missing values in the same columns 
mode strategy = replace with the most frequently occurring value
"""
imputer = Imputer(
    inputCols=["ParentalEducationEncoded", "ParentalSupportEncoded"],
    outputCols=["ParentalEducationImputed", "ParentalSupportImputed"]
).setStrategy("mode")


intermediate_assembler = VectorAssembler(
    inputCols=["ParentalSupportImputed", "ParentalEducationImputed"],  
    outputCol="parents_features_processed"
)


### 3.2. Processing other features 

In [10]:
numerical_features = ["Age", "StudyTimeWeekly", "Absences"]
categorical_features = ["Ethnicity"]

In [11]:
from pyspark.ml.feature import OneHotEncoder, StandardScaler, VectorAssembler, StringIndexer

"""
ONE-HOT ENCODING
transform the categorical feature Ethnicity into a numeric format 
and then into a sparse one-hot encoded vector
"""
indexer = StringIndexer(inputCol="Ethnicity", outputCol="Ethnicity_indexed")
onehot_encoder= OneHotEncoder(inputCol="Ethnicity_indexed", outputCol="Ethnicity_encoded")

"""
VECTOR ASSEMBLY 
combines multiple numeric columns into a single vector column called numeric_features
--> input features in vector form 
"""
#numerical_features = ["Age", "StudyTimeWeekly", "Absences"]
numeric_assembler = VectorAssembler(inputCols=numerical_features, outputCol="numeric_features")

"""
SCALING 
scales the numeric_features vector to have zero mean and unit variance
"""
scaler = StandardScaler(inputCol="numeric_features", outputCol="scaled_numeric_features")

final_assembler = VectorAssembler(
    inputCols=["scaled_numeric_features", "Ethnicity_encoded", "parents_features_processed", "Gender", "Tutoring", "Extracurricular", "Sports", "Music", "Volunteering"],  # Scaled and encoded features
    outputCol="features"
)



## 4. Building the pipeline

### 4.1. ML model and chaining

In [13]:
from pyspark.ml.regression import LinearRegression
from pyspark.ml.regression import RandomForestRegressor
from pyspark.ml.regression import GBTRegressor
"""
TO PREDICT THE GPA : 
- Linear Regression : for interpretability and simplicity
- RANDOM FOREST REGRESSOR or GRADIENT-BOOSTED TREE REGRESSOR: for data with complex, non-linear patterns """

from pyspark.ml import Pipeline

#lr = LinearRegression(featuresCol="features", labelCol="GPA")
#rf = RandomForestRegressor(featuresCol="features", labelCol="GPA")
gbt = GBTRegressor(featuresCol="features", labelCol="GPA")


pipeline = Pipeline(stages=[
    encoder, # encode string values 
    imputer,    # handle missing values 
    intermediate_assembler, # assemble parents features into a vector
    indexer,    # index categorical features 
    onehot_encoder, # one-hot encode 
    numeric_assembler,  # assemble numerical features 
    scaler,     # scale numerical features
    final_assembler,    # assemble all features 
    gbt # ML algorithm
])


In [14]:
# TESTING EACH ELEMENT OF THE PIPELINE 

# Step 1: Apply the encoder
encoded_df = encoder.transform(test_set)
print("Columns after encoder:", encoded_df.columns)

# Step 2: Apply the imputer
imputed_df = imputer.fit(encoded_df).transform(encoded_df)
print("Columns after imputer:", imputed_df.columns)

# Step 3: Apply the intermediate assembler
assembled_df = intermediate_assembler.transform(imputed_df)
print("Columns after intermediate assembler:", assembled_df.columns)
assembled_df.select("parents_features_processed").show(truncate=False)

indexed_df = indexer.fit(assembled_df).transform(assembled_df)
print("After StringIndexer:", indexed_df.columns)

# Step 2: Apply OneHotEncoder
encoded_df = onehot_encoder.fit(indexed_df).transform(indexed_df)
print("After OneHotEncoder:", encoded_df.columns)

# Step 3: Apply VectorAssembler
assembled_df = numeric_assembler.transform(encoded_df)
print("After VectorAssembler:", assembled_df.columns)

# Step 4: Apply StandardScaler
scaled_df = scaler.fit(assembled_df).transform(assembled_df)
print("After StandardScaler:", scaled_df.columns)

# Step 5: Apply Final Assembler
final_df = final_assembler.transform(scaled_df)
print("After Final Assembler:", final_df.columns)


Columns after encoder: ['StudentID', 'Age', 'Gender', 'Ethnicity', 'ParentalEducation', 'StudyTimeWeekly', 'Absences', 'Tutoring', 'ParentalSupport', 'Extracurricular', 'Sports', 'Music', 'Volunteering', 'GPA', 'ParentalEducationEncoded', 'ParentalSupportEncoded']
Columns after imputer: ['StudentID', 'Age', 'Gender', 'Ethnicity', 'ParentalEducation', 'StudyTimeWeekly', 'Absences', 'Tutoring', 'ParentalSupport', 'Extracurricular', 'Sports', 'Music', 'Volunteering', 'GPA', 'ParentalEducationEncoded', 'ParentalSupportEncoded', 'ParentalEducationImputed', 'ParentalSupportImputed']
Columns after intermediate assembler: ['StudentID', 'Age', 'Gender', 'Ethnicity', 'ParentalEducation', 'StudyTimeWeekly', 'Absences', 'Tutoring', 'ParentalSupport', 'Extracurricular', 'Sports', 'Music', 'Volunteering', 'GPA', 'ParentalEducationEncoded', 'ParentalSupportEncoded', 'ParentalEducationImputed', 'ParentalSupportImputed', 'parents_features_processed']
+--------------------------+
|parents_features_proce

### 4.2. Fitting the pipeline on the train set

In [15]:

model = pipeline.fit(train_set)


### 4.3. Evaluating on the test set

In [16]:
transformed_test_set = model.transform(test_set)

# PREDICTIONS
transformed_test_set.select("GPA", "prediction").show(truncate=False)

+-----------------+-------------------+
|GPA              |prediction         |
+-----------------+-------------------+
|0.886889415770466|0.9727391403691974 |
|2.23469628732449 |2.335452454402756  |
|0.875367123899009|0.7666055944440374 |
|0.648705394877608|0.48681221942689495|
|3.46368754912343 |3.3225752150522894 |
|3.10949357887921 |3.1795632390698363 |
|3.0092383227043  |2.462957335582419  |
|2.67682739428337 |2.487941445459509  |
|2.94871767191192 |3.0405219610634155 |
|1.70361180323237 |1.6085162975087512 |
|0.655954077962677|0.8389381365573322 |
|1.97279141390272 |1.4271693337155378 |
|2.23217527771598 |1.9639529561985694 |
|2.97440601491104 |2.4883209962013133 |
|1.60941027714416 |1.7198115303771646 |
|2.30759662333527 |2.673305545430982  |
|1.78996687416233 |1.7377355494889484 |
|2.36601087218728 |2.4604526012399575 |
|0.864785083851288|1.2048523040810628 |
|2.85480392898132 |2.716027484743984  |
+-----------------+-------------------+
only showing top 20 rows



## 5. Evaluation

In [None]:
from pyspark.ml.evaluation import RegressionEvaluator

# Define the evaluator
evaluator_rmse = RegressionEvaluator(labelCol="GPA", predictionCol="prediction", metricName="rmse")
evaluator_mae = RegressionEvaluator(labelCol="GPA", predictionCol="prediction", metricName="mae")
evaluator_r2 = RegressionEvaluator(labelCol="GPA", predictionCol="prediction", metricName="r2")

# Evaluate the model
rmse = evaluator_rmse.evaluate(transformed_test_set)
mae = evaluator_mae.evaluate(transformed_test_set)
r2 = evaluator_r2.evaluate(transformed_test_set)

# Print results
print(f"Root Mean Squared Error (RMSE): {rmse}")
print(f"Mean Absolute Error (MAE): {mae}")
print(f"R² (Coefficient of Determination): {r2}")


### Predicting on a random individual 

In [None]:
from pyspark.sql.functions import rand

# Select a random individual from the test set
# WARNING : DON'T REEXECUTE THE CELL TO COMPARE THE ALGORITHMS ON THE SAME INDIVIDUAL 
random_individual = test_set.limit(27) 
print(random_individual)

In [None]:


# Step 2: Apply the pipeline to the random individual
prediction = model.transform(random_individual)

# Step 3: Display the result
prediction.select("GPA", "prediction").show(truncate=False)

### Cross val for the best model

In [19]:
from pyspark.ml.tuning import CrossValidator
from pyspark.ml.evaluation import RegressionEvaluator

evaluator = RegressionEvaluator(labelCol="GPA", predictionCol="prediction", metricName="rmse")
crossval = CrossValidator(estimator=pipeline,
                          estimatorParamMaps=[{}],
                          evaluator=evaluator,
                          numFolds=5)

cvModel = crossval.fit(train_set)

print("RMSE:", cvModel.avgMetrics)

RMSE: [0.27689339193059037]


In [None]:
spark.stop()