### Libraries to use

In [1]:
import os
import warnings
from pyspark import SparkContext
from pyspark.sql import *
from pyspark.sql.functions import *
from pyspark.sql.types import *
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.feature import OneHotEncoder, StringIndexer, MinMaxScaler
from pyspark.ml.evaluation import BinaryClassificationEvaluator
from pyspark.ml import Pipeline
from pyspark.ml.classification import LinearSVC, LogisticRegression
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator
import matplotlib.pyplot as plt

warnings.filterwarnings('ignore')
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-11-openjdk-amd64"
os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages  org.apache.spark:spark-avro_2.12:3.4.1,io.delta:delta-core_2.12:2.4.0 pyspark-shell'

In [2]:
spark = SparkSession.builder.master("local[*]").appName("Spark_ML") \
        .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
        .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog")\
        .enableHiveSupport()\
        .getOrCreate()

sc = SparkContext.getOrCreate()
sqlCtx = SQLContext(sc)

your 131072x1 screen size is bogus. expect trouble
23/07/10 23:24:21 WARN Utils: Your hostname, DESKTOP-JKDOQO9 resolves to a loopback address: 127.0.1.1; using 172.26.19.142 instead (on interface eth0)
23/07/10 23:24:21 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address


:: loading settings :: url = jar:file:/home/josealcocer27/.local/lib/python3.8/site-packages/pyspark/jars/ivy-2.5.1.jar!/org/apache/ivy/core/settings/ivysettings.xml


Ivy Default Cache set to: /home/josealcocer27/.ivy2/cache
The jars for the packages stored in: /home/josealcocer27/.ivy2/jars
org.apache.spark#spark-avro_2.12 added as a dependency
io.delta#delta-core_2.12 added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent-72ef704e-ec74-4d66-914d-a263d5db1020;1.0
	confs: [default]
	found org.apache.spark#spark-avro_2.12;3.4.1 in central
	found org.tukaani#xz;1.9 in central
	found io.delta#delta-core_2.12;2.4.0 in central
	found io.delta#delta-storage;2.4.0 in central
	found org.antlr#antlr4-runtime;4.9.3 in central
:: resolution report :: resolve 261ms :: artifacts dl 12ms
	:: modules in use:
	io.delta#delta-core_2.12;2.4.0 from central in [default]
	io.delta#delta-storage;2.4.0 from central in [default]
	org.antlr#antlr4-runtime;4.9.3 from central in [default]
	org.apache.spark#spark-avro_2.12;3.4.1 from central in [default]
	org.tukaani#xz;1.9 from central in [default]
	--------------------------------------------

### Data reading

In [3]:
df = spark.read.format('delta')\
    .load('delta/saber11')

Number of rows and schema

In [4]:
df.count()

23/07/10 23:24:32 WARN package: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.
                                                                                

10361

In [5]:
df.printSchema()

root
 |-- lenguaje: string (nullable = true)
 |-- matematicas: string (nullable = true)
 |-- sociales: string (nullable = true)
 |-- Id: integer (nullable = true)
 |-- filosofia: string (nullable = true)
 |-- biologia: string (nullable = true)
 |-- quimica: string (nullable = true)
 |-- fisica: string (nullable = true)
 |-- nivel_ingles: string (nullable = true)
 |-- sisben: integer (nullable = true)
 |-- estrato: integer (nullable = true)
 |-- genero: string (nullable = true)
 |-- puntaje_saber11: string (nullable = true)



In [6]:
for column in df.columns:
    if column != 'Id':
        print(f'\nUnique values at column {column}\n')
        df.select(column).distinct().show()


Unique values at column lenguaje



                                                                                

+--------+
|lenguaje|
+--------+
|    bajo|
|   medio|
|superior|
|    alto|
+--------+


Unique values at column matematicas

+-----------+
|matematicas|
+-----------+
|       bajo|
|      medio|
|   superior|
|       alto|
+-----------+


Unique values at column sociales

+--------+
|sociales|
+--------+
|    bajo|
|   medio|
|superior|
|    alto|
+--------+


Unique values at column filosofia

+---------+
|filosofia|
+---------+
|     bajo|
|    medio|
| superior|
|     alto|
+---------+


Unique values at column biologia

+--------+
|biologia|
+--------+
|    bajo|
|   medio|
|superior|
|    alto|
+--------+


Unique values at column quimica

+--------+
| quimica|
+--------+
|    bajo|
|   medio|
|superior|
|    alto|
+--------+


Unique values at column fisica

+--------+
|  fisica|
+--------+
|    bajo|
|superior|
|   medio|
|    alto|
+--------+


Unique values at column nivel_ingles

+------------+
|nivel_ingles|
+------------+
|          A2|
|          A-|
|          B1|
|    

### Feature transform

In [7]:
df = df.drop('id')
df = df.dropna()

Target to binary values

In [8]:
df = df.withColumn('puntaje_saber11',
                   when(col('puntaje_saber11')=='bajo', 0)\
                   .otherwise(1))

Balanced or imbalanced dataset?

In [9]:
df.groupBy("puntaje_saber11").count().show()

+---------------+-----+
|puntaje_saber11|count|
+---------------+-----+
|              1| 7140|
|              0| 3221|
+---------------+-----+



Let's proceed to balance the dataset

In [10]:
class_0 = df.groupBy("puntaje_saber11").count().sort(asc("puntaje_saber11")).collect()[0][1]
class_1 = df.groupBy("puntaje_saber11").count().sort(asc("puntaje_saber11")).collect()[1][1]

# Find the ratio of the class sizes
class_ratio = class_0 / class_1 

# Define the sampling fractions for each class
sampling_fractions = {
    0: 1.0,  # No sampling for class 0 (majority class)
    1: class_ratio  # Sampling based on class ratio for class 1 (minority class)
}

balanced_df = df.sampleBy("puntaje_saber11", fractions=sampling_fractions, seed=1234)
balanced_df.groupBy("puntaje_saber11").count().show()

+---------------+-----+
|puntaje_saber11|count|
+---------------+-----+
|              1| 3219|
|              0| 3221|
+---------------+-----+



#### String Indexer, OneHotEncoder and MinMaxScaler

In [11]:
# features for OHE and MinMaxScaler

OHE_cols = [x[0] for x in df.dtypes if x[1] == 'string' and x[0] != 'puntaje_saber11']
to_scale = [x[0] for x in df.dtypes if x[1] in ['int', 'float'] and x[0] != 'puntaje_saber11']

In [12]:
# Multiple Indexers for every column in OHE_cols
indexer = [StringIndexer(inputCol = col, outputCol = col+'_indexed') for col in OHE_cols]

# Multiple OHE for every column in OHE_cols
encoder = [OneHotEncoder(inputCol = col+'_indexed', outputCol = col+'_encoded') for col in OHE_cols]

aux_features = [col+'_encoded' for col in OHE_cols] + to_scale

# Vector Assembler to can apply MinMaxScaler
vector_to_scale = [VectorAssembler(inputCols = [col], outputCol = col+'_vec') for col in aux_features]
scaler = [MinMaxScaler(inputCol = col+'_vec', outputCol = col+'_scaled') for col in aux_features]

# Final Vector Assembler
vector = VectorAssembler(inputCols = [col+'_scaled' for col in aux_features], outputCol ='features')

transform_steps = indexer + encoder + vector_to_scale + scaler

# Pipeline
features_pipeline = Pipeline(stages = transform_steps)
df_trans = features_pipeline.fit(balanced_df).transform(balanced_df)
to_select = [col for col in df_trans.columns if '_scaled' in col or col == 'puntaje_saber11']
df_trans.select(to_select).show(5)

+---------------+-----------------------+--------------------------+-----------------------+------------------------+-----------------------+----------------------+---------------------+---------------------------+---------------------+--------------------+--------------------+
|puntaje_saber11|lenguaje_encoded_scaled|matematicas_encoded_scaled|sociales_encoded_scaled|filosofia_encoded_scaled|biologia_encoded_scaled|quimica_encoded_scaled|fisica_encoded_scaled|nivel_ingles_encoded_scaled|genero_encoded_scaled|       sisben_scaled|      estrato_scaled|
+---------------+-----------------------+--------------------------+-----------------------+------------------------+-----------------------+----------------------+---------------------+---------------------------+---------------------+--------------------+--------------------+
|              0|          [1.0,0.0,0.0]|             [1.0,0.0,0.0]|          [1.0,0.0,0.0]|           [1.0,0.0,0.0]|          [0.0,1.0,0.0]|         [0.0,1.0,0.0]

### Modeling

Train test split

In [13]:
(trainingData, testingData) = balanced_df.randomSplit([0.70, 0.3], seed = 1234)

Baseline (logistic regression)

In [14]:
import mlflow
import mlflow.spark

mlflow.set_tracking_uri("sqlite:///mlflow.db")
mlflow.set_experiment(experiment_name='saber11')

2023/07/10 23:25:00 INFO mlflow.store.db.utils: Creating initial MLflow database tables...
2023/07/10 23:25:00 INFO mlflow.store.db.utils: Updating database tables
INFO  [alembic.runtime.migration] Context impl SQLiteImpl.
INFO  [alembic.runtime.migration] Will assume non-transactional DDL.
INFO  [alembic.runtime.migration] Context impl SQLiteImpl.
INFO  [alembic.runtime.migration] Will assume non-transactional DDL.
2023/07/10 23:25:00 INFO mlflow.tracking.fluent: Experiment with name 'saber11' does not exist. Creating a new experiment.


<Experiment: artifact_location='/home/josealcocer27/git-projects/PySpark-and-mlflow/mlruns/1', creation_time=1689049500078, experiment_id='1', last_update_time=1689049500078, lifecycle_stage='active', name='saber11', tags={}>

First let's create a function to create ROC curve that can be reused multiple times

In [16]:
def plot_roc(roc):
    fig = plt.figure()
    plt.plot(roc['FPR'],roc['TPR'])
    plt.ylabel('False Positive Rate')
    plt.xlabel('True Positive Rate')
    plt.title('ROC Curve')
    plt.close(fig)
    return fig

In [17]:
# to_vector = to_select
# to_vector.append('puntaje_saber11')
mlflow.spark.autolog(disable=True)
with mlflow.start_run(run_name = 'log_reg_baseline'):
    # define model and set a tag
    model_lr = LogisticRegression(featuresCol = 'features', labelCol = 'puntaje_saber11')
    mlflow.set_tag('model_name', str(model_lr.__class__.__name__))

    # define pipeline and fit
    pipeline_lr = Pipeline(stages = [features_pipeline, vector, model_lr])
    lr_model = pipeline_lr.fit(trainingData)
    trainingSummary = lr_model.stages[-1].summary

    # evaluate and log metrics and model
    evaluator = BinaryClassificationEvaluator(rawPredictionCol = 'probability', labelCol = 'puntaje_saber11')
    trainPredictions = lr_model.transform(trainingData)
    trainAUC = evaluator.evaluate(trainPredictions)
    testPredictions = lr_model.transform(testingData)
    testAUC = evaluator.evaluate(testPredictions)
    mlflow.log_metrics({"train AUC": trainAUC, "test AUC": testAUC, "training accuracy": trainingSummary.accuracy,
                        "training weighted precision": trainingSummary.weightedPrecision, 
                        "training weighted recall": trainingSummary.weightedRecall})
    mlflow.spark.log_model(lr_model, 'log_reg_spark')
    roc = trainingSummary.roc.toPandas()
    fig = plot_roc(roc)
    mlflow.log_figure(fig, "ROC_training_curve.png")

23/07/10 23:26:06 WARN InstanceBuilder: Failed to load implementation from:dev.ludovic.netlib.blas.JNIBLAS


Linear SVM

In [170]:
with mlflow.start_run(run_name = 'Linear_SVM_with_grid'):    
    # define model and set a tag
    svc_model = LinearSVC(featuresCol='features', labelCol='puntaje_saber11')
    mlflow.set_tag('model_name', str(svc_model.__class__.__name__))

    # build parameter grid
    paramGridSVM = ParamGridBuilder()\
        .addGrid(svc_model.regParam, [0.01, 0.5, 1, 5])\
        .addGrid(svc_model.maxIter, [100, 150])\
        .build()

    # create an evaluator
    evaluator = BinaryClassificationEvaluator(predictionCol='prediction', labelCol='puntaje_saber11')

    # create cross validator for tuning
    crossvalSVM = CrossValidator(estimator = svc_model,
                                estimatorParamMaps = paramGridSVM,
                                evaluator = evaluator,
                                numFolds = 3)

    # define pipeline with stages and train it
    pipeline = Pipeline(stages=[features_pipeline, vector, crossvalSVM])
    model = pipeline.fit(trainingData)

    # evaluate and log metrics and best model
    predictions = model.transform(testingData)
    accuracy = evaluator.evaluate(predictions)
    best_model = model.stages[-1].bestModel
    mlflow.log_metric("AUC", accuracy)
    mlflow.log_params(params={"RegParam": best_model.getRegParam(), "maxIter": best_model.getMaxIter()})
    mlflow.spark.log_model(best_model, "SparkLinearSVC")

TypeError: __init__() got an unexpected keyword argument 'predictionCol'

In [156]:
a = model.stages[-1].bestModel.summary

BinaryClassificationEvaluator()

In [158]:
a

<bound method LinearSVCModel.summary of LinearSVCModel: uid=LinearSVC_fa8e13bb2dc2, numClasses=2, numFeatures=28>

In [148]:
model.stages[-1].bestModel.getRegParam()

0.01

In [100]:
(trainingData, testingData) = df.randomSplit([0.70, 0.3])
lr_model = pipeline_lr.fit(trainingData)

In [101]:
lr_summary = lr_model.stages[2].summary.predictions
lr_summary = lr_summary.select('features', 'probability' ,'prediction')
lr_summary.show(10, truncate = False)

+---------------------------------------------------------------------------+----------------------------------------+----------+
|features                                                                   |probability                             |prediction|
+---------------------------------------------------------------------------+----------------------------------------+----------+
|(28,[0,3,6,10,12,15,18,22,25],[1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0])       |[0.25516373223123756,0.7448362677687624]|1.0       |
|(28,[0,3,6,10,12,15,18,22,25],[1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0])       |[0.25516373223123756,0.7448362677687624]|1.0       |
|(28,[0,3,6,10,12,15,18,22,25],[1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0])       |[0.25516373223123756,0.7448362677687624]|1.0       |
|(28,[0,3,6,10,12,15,18,22,25],[1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0])       |[0.25516373223123756,0.7448362677687624]|1.0       |
|(28,[0,3,6,10,12,15,18,22,25],[1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0])       |[0.2551637322

In [102]:
test = lr_model.transform(testingData).withColumnRenamed("puntaje_saber11", "label")

In [103]:
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

evaluator = MulticlassClassificationEvaluator(predictionCol = 'prediction', labelCol = 'label')
evaluator.evaluate(test,  {evaluator.metricName: "accuracy"})

0.6779062299293513