In [40]:
from pyspark.sql.types import *
import pandas as pd
import pyspark 
import os 
from pyspark.sql import SparkSession
from pyspark.sql.functions import regexp_replace, col, udf, rank, asc, sum as spark_sum
from pyspark.sql import functions as F
import matplotlib.pyplot as plt
import seaborn as sns
from pyspark.sql.window import Window

from pyspark.ml.feature import VectorAssembler
from pyspark.ml.regression import LinearRegression, DecisionTreeRegressor, RandomForestRegressor
from pyspark.ml.evaluation import RegressionEvaluator


In [41]:
# Start a Spark session
spark = SparkSession.builder.appName("s33ding").getOrCreate()

# Read the Parquet file into a DataFrame
df = spark.read.parquet("dataset/enem.parquet")

In [42]:

# Select the relevant columns
selected_cols = ["NOTA_CH_CIENCIAS_HUMANAS", "NOTA_LC_LINGUAGENS_E_CODIGOS", "NOTA_MT_MATEMATICA", "NOTA_REDACAO"]
df_selected = df.select(selected_cols)

# Split the dataset into training, validation, and testing sets
train_data, validation_data, test_data = df_selected.randomSplit([0.6, 0.2, 0.2], seed=42)

# Prepare the feature vector and the target column for the training, validation, and testing sets
assembler = VectorAssembler(inputCols=["NOTA_LC_LINGUAGENS_E_CODIGOS", "NOTA_MT_MATEMATICA", "NOTA_REDACAO"], outputCol="features")

train_data = assembler.transform(train_data).select("features", "NOTA_CH_CIENCIAS_HUMANAS")
validation_data = assembler.transform(validation_data).select("features", "NOTA_CH_CIENCIAS_HUMANAS")
test_data = assembler.transform(test_data).select("features", "NOTA_CH_CIENCIAS_HUMANAS")

# Train the machine learning models using the training and validation sets
lr = LinearRegression(labelCol="NOTA_CH_CIENCIAS_HUMANAS")
lr_model = lr.fit(train_data)

dt = DecisionTreeRegressor(labelCol="NOTA_CH_CIENCIAS_HUMANAS")
dt_model = dt.fit(train_data)

rf = RandomForestRegressor(labelCol="NOTA_CH_CIENCIAS_HUMANAS")
rf_model = rf.fit(train_data)

# Make predictions using the trained models on the validation set
lr_predictions = lr_model.transform(validation_data)
dt_predictions = dt_model.transform(validation_data)
rf_predictions = rf_model.transform(validation_data)

23/06/14 00:27:13 WARN Instrumentation: [4e50c318] regParam is zero, which might cause numerical instability and overfitting.
                                                                                

In [44]:
# Create temporary views for the predictions
lr_predictions.createOrReplaceTempView("lr_predictions")
dt_predictions.createOrReplaceTempView("dt_predictions")
rf_predictions.createOrReplaceTempView("rf_predictions")

joined_predictions = spark.sql("""
    SELECT lr_predictions.*, dt_predictions.prediction AS dt_prediction, rf_predictions.prediction AS rf_prediction
    FROM lr_predictions
    JOIN dt_predictions ON lr_predictions.features = dt_predictions.features
    JOIN rf_predictions ON lr_predictions.features = rf_predictions.features
    ORDER BY NOTA_CH_CIENCIAS_HUMANAS
""")

# Get a sample from the joined_predictions DataFrame
# Get a sample from the joined_predictions DataFrame
sample_joined_predictions = joined_predictions.sample(fraction=0.1, seed=42)

# Save the sample as a Parquet file
sample_joined_predictions.write.mode('overwrite').parquet('data_for_dashboards/models/joined_predictions.parquet', mode='overwrite')

# Show the joined predictions
joined_predictions.show()



                                                                                

+--------------------+------------------------+------------------+------------------+------------------+
|            features|NOTA_CH_CIENCIAS_HUMANAS|        prediction|     dt_prediction|     rf_prediction|
+--------------------+------------------------+------------------+------------------+------------------+
| [413.0,444.0,360.0]|                   311.6| 420.0014176407547| 429.0985439250563|432.19727563223535|
|[345.399993896484...|                   314.7| 371.4723714050142| 415.3153703219153|423.97934549882996|
|[326.700012207031...|                   319.5| 366.4658276543193| 415.3153703219153| 435.4225155561647|
|[385.200012207031...|                   319.8|  386.307863024124| 415.3153703219153|426.84003556090846|
|[334.600006103515...|                   323.1|393.97663016879727|439.61351666538144|453.77265998940663|
|[434.200012207031...|                   323.7|  467.787728390522|439.61351666538144| 467.5242495362507|
|[498.0,458.299987...|                   324.5|482.8692

In [45]:
from pyspark.ml.evaluation import RegressionEvaluator

# Define the evaluator
evaluator = RegressionEvaluator(labelCol="NOTA_CH_CIENCIAS_HUMANAS", predictionCol="prediction")

# Calculate MSE for linear regression
lr_mse = evaluator.evaluate(lr_predictions, {evaluator.metricName: "mse"})

# Calculate MSE for decision tree regression
dt_mse = evaluator.evaluate(dt_predictions, {evaluator.metricName: "mse"})

# Calculate MSE for random forest regression
rf_mse = evaluator.evaluate(rf_predictions, {evaluator.metricName: "mse"})

# Calculate MAE for linear regression
lr_mae = evaluator.evaluate(lr_predictions, {evaluator.metricName: "mae"})

# Calculate MAE for decision tree regression
dt_mae = evaluator.evaluate(dt_predictions, {evaluator.metricName: "mae"})

# Calculate MAE for random forest regression
rf_mae = evaluator.evaluate(rf_predictions, {evaluator.metricName: "mae"})

# Calculate MSE and MAE for each model
metrics = [("Linear Regression - MSE", evaluator.evaluate(lr_predictions, {evaluator.metricName: "mse"})),
           ("Decision Tree Regression - MSE", evaluator.evaluate(dt_predictions, {evaluator.metricName: "mse"})),
           ("Random Forest Regression - MSE", evaluator.evaluate(rf_predictions, {evaluator.metricName: "mse"})),
           ("Linear Regression - MAE", evaluator.evaluate(lr_predictions, {evaluator.metricName: "mae"})),
           ("Decision Tree Regression - MAE", evaluator.evaluate(dt_predictions, {evaluator.metricName: "mae"})),
           ("Random Forest Regression - MAE", evaluator.evaluate(rf_predictions, {evaluator.metricName: "mae"}))]

# Create the DataFrame with "Type" column
df_comparing_models = spark.createDataFrame(metrics, ["Model", "Metric"]).withColumn("Type", udf(lambda model: "MSE" if "MSE" in model else "MAE", StringType())("Model"))

# Create a window specification to partition by the type and order by the metric in ascending order
window_spec = Window.partitionBy("Type").orderBy(asc("Metric"))

# Add a "Best" column to determine the best model for each type
df_comparing_models = df_comparing_models.withColumn("Best", rank().over(window_spec) == 1)

# Show the DataFrame
df_comparing_models.show(truncate=False)
tmp = df_comparing_models.toPandas()
tmp.to_parquet('data_for_dashboards/eda/models')

                                                                                

In [None]:
# Specify the path to save the model
model_path = "models/linear_regression_model"

# Save the Linear Regression model
lr_model.save(model_path)