<a href="https://colab.research.google.com/github/mgonzalz/ppd_regresion-distribuida/blob/main/regresion_distribuida.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

#  **Regresión con PySpark.**
Mediante las técnicas de programación paralela y distribuida que ofrece PySpark, debemos de determinar el índice de Rendimiento Académico (*Performance Index*) que tienen los estudiantes atendiendo a una serie de variables.

El dataset del que partimos tiene las siguientes variables independientes:

* Hours Studied: El número total de horas que ha estudiado cada estudiante.
* Previous Scores: Las notas que ha obtenido un estudiante en exámenes anteriores.
* Extracurricular Activities: Si el estudiante participa en actividades extraescolares (Yes or No).
* Sleep Hours: El número de horas que un estudiante duerme al día.
* Sample Question Papers Practiced: El número de hojas de ejercicios que el estudiante realizó.
Se trata de predecir el valor de la variable dependiente:

* Performance Index: La medida del rendimiento académico de cada estudiante. Los valores están comprendidos entre 10 y 100, indicando los valores más altos un mayor rendimiento académico.

Se valuará el modelo mediante **Root Mean Squared Error**.

## Importaciones e instalación.

Para evitar la subida de archivos manualmente en Google Colab, clonaremos directamente el repositorio donde se encuentran alojados estos archivos y luego los añadiremos al directorio actual.







In [None]:
!git clone https://github.com/mgonzalz/ppd_regresion-distribuida.git

Cloning into 'ppd_regresion-distribuida'...
remote: Enumerating objects: 17, done.[K
remote: Counting objects: 100% (17/17), done.[K
remote: Compressing objects: 100% (14/14), done.[K
remote: Total 17 (delta 4), reused 8 (delta 1), pack-reused 0[K
Receiving objects: 100% (17/17), 56.56 KiB | 815.00 KiB/s, done.
Resolving deltas: 100% (4/4), done.


In [None]:
!mv ppd_regresion-distribuida/* ./ # Mover los archivos al directorio general

In [None]:
!rm -rf ppd_regresion-distribuida # Eliminación de la carpeta vacía

El archivo `requirements.txt` contiene todas las versiones de los paquetes utilizados. A través de la siguiente secuencia de código los instalaremos e importaremos los métodos necesarios.

In [None]:
!pip install -r requirements.txt

Collecting findspark==2.0.1 (from -r requirements.txt (line 1))
  Downloading findspark-2.0.1-py2.py3-none-any.whl (4.4 kB)
Collecting pyspark==3.5.1 (from -r requirements.txt (line 2))
  Downloading pyspark-3.5.1.tar.gz (317.0 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m317.0/317.0 MB[0m [31m2.0 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Collecting jedi>=0.16 (from IPython==7.34.0->-r requirements.txt (line 3))
  Downloading jedi-0.19.1-py2.py3-none-any.whl (1.6 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m1.6/1.6 MB[0m [31m43.6 MB/s[0m eta [36m0:00:00[0m
Building wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.5.1-py2.py3-none-any.whl size=317488491 sha256=03b781719a6ff3475abcd46b272a79b11da611f939294f44759b78fa441f9027
  Stored in directory: /root/.cache/pip/wheels/80/1d/60/2c256ed38ddd

In [None]:
# Python estándar
import os
import sys
from typing import List

# Configuración de Spark
import findspark
import pyspark
from pyspark.sql import DataFrame, SparkSession
import pyspark.sql.types as T
import pyspark.sql.functions as F

# Modelado y evaluación de Spark ML
from pyspark.ml.regression import LinearRegression
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.feature import MinMaxScaler
from pyspark.ml import Pipeline

## Creación del entorno.

Se preparará el entorno para el uso de Spark, descargando este y configurando las variables de entorno necesarias. Mediante `findspark` se habilitará la accesibilidad de Spark desde el entorno de ejecución. Finalmente, se crea la sesión de Spark.

In [None]:
!sudo apt update
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
#Check this site for the latest download link https://www.apache.org/dyn/closer.lua/spark/spark-3.2.1/spark-3.2.1-bin-hadoop3.2.tgz
!wget -q https://dlcdn.apache.org/spark/spark-3.2.1/spark-3.2.1-bin-hadoop3.2.tgz
!tar xf spark-3.2.1-bin-hadoop3.2.tgz

# os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
# os.environ["SPARK_HOME"] = "/content/spark-3.2.1-bin-hadoop3.2"

findspark.init()
findspark.find()

spark= SparkSession \
       .builder \
       .appName("regresion") \
       .getOrCreate()

spark

[33m0% [Working][0m            Get:1 https://cloud.r-project.org/bin/linux/ubuntu jammy-cran40/ InRelease [3,626 B]
[33m0% [Waiting for headers] [Connecting to security.ubuntu.com (185.125.190.36)] [[0m[33m0% [Waiting for headers] [Connecting to security.ubuntu.com (185.125.190.36)] [[0m                                                                               Get:2 https://developer.download.nvidia.com/compute/cuda/repos/ubuntu2204/x86_64  InRelease [1,581 B]
[33m0% [Waiting for headers] [Connecting to security.ubuntu.com (185.125.190.36)] [[0m                                                                               Hit:3 http://archive.ubuntu.com/ubuntu jammy InRelease
Get:4 http://archive.ubuntu.com/ubuntu jammy-updates InRelease [119 kB]
Get:5 http://security.ubuntu.com/ubuntu jammy-security InRelease [110 kB]
Get:6 https://developer.download.nvidia.com/compute/cuda/repos/ubuntu2204/x86_64  Packages [784 kB]
Hit:7 http://archive.ubuntu.com/ubuntu jammy-ba

In [None]:
spark=SparkSession.builder.appName('regresion').getOrCreate()

## Estudio del rendimiento académico.

### Importación del dataset.

In [None]:
data = spark.read.csv("/content/data/Student_Performance.csv", header=True, inferSchema=True)
data.show(5)

+-------------+---------------+--------------------------+-----------+--------------------------------+-----------------+
|Hours Studied|Previous Scores|Extracurricular Activities|Sleep Hours|Sample Question Papers Practiced|Performance Index|
+-------------+---------------+--------------------------+-----------+--------------------------------+-----------------+
|            7|             99|                       Yes|          9|                               1|             91.0|
|            4|             82|                        No|          4|                               2|             65.0|
|            8|             51|                       Yes|          7|                               2|             45.0|
|            5|             52|                       Yes|          5|                               2|             36.0|
|            7|             75|                        No|          8|                               5|             66.0|
+-------------+---------

### Preparación del dataset.
Antes del ánalisis de datos, debemos de preparar los datos y tener un conocimiento sobre estos.

In [None]:
data.toPandas().isnull().sum() # Valores nulos.

Hours Studied                       0
Previous Scores                     0
Extracurricular Activities          0
Sleep Hours                         0
Sample Question Papers Practiced    0
Performance Index                   0
dtype: int64

In [None]:
data.printSchema()

root
 |-- Hours Studied: integer (nullable = true)
 |-- Previous Scores: integer (nullable = true)
 |-- Extracurricular Activities: string (nullable = true)
 |-- Sleep Hours: integer (nullable = true)
 |-- Sample Question Papers Practiced: integer (nullable = true)
 |-- Performance Index: double (nullable = true)



In [None]:
data.dtypes

[('Hours Studied', 'int'),
 ('Previous Scores', 'int'),
 ('Extracurricular Activities', 'string'),
 ('Sleep Hours', 'int'),
 ('Sample Question Papers Practiced', 'int'),
 ('Performance Index', 'double')]

Miramos la existencia de variables categóricas. Observamos que solo la columna `Extracurricular Activities` contiene de estos y realizaremos la codificación ordinal o *Label Encoding* de estos valores para poder realizar un análisis.





In [None]:
data.select('Extracurricular Activities').distinct().show()

+--------------------------+
|Extracurricular Activities|
+--------------------------+
|                        No|
|                       Yes|
+--------------------------+



En este caso al solo haber dos valores se puede hacer la codificación de manera manual, sin usar el `StringIndexer`. Identificamos lo siguiente:
- Yes equivale a 1.
- No equivale a 0.

In [None]:
data = data.withColumn('Extracurricular Activities', F.when(data["Extracurricular Activities"] == "Yes", 1).otherwise(0))

In [None]:
data.show(5)

+-------------+---------------+--------------------------+-----------+--------------------------------+-----------------+
|Hours Studied|Previous Scores|Extracurricular Activities|Sleep Hours|Sample Question Papers Practiced|Performance Index|
+-------------+---------------+--------------------------+-----------+--------------------------------+-----------------+
|            7|             99|                         1|          9|                               1|             91.0|
|            4|             82|                         0|          4|                               2|             65.0|
|            8|             51|                         1|          7|                               2|             45.0|
|            5|             52|                         1|          5|                               2|             36.0|
|            7|             75|                         0|          8|                               5|             66.0|
+-------------+---------

Consideramos que el dataset ya esta preparado. Además en caso de necesitar volver a utilizar este y no volver a ejecutar el código, se ha guardado una copia.

In [None]:
df_original = data.select(*data.columns)

### Regresión lineal.

In [None]:
feature_cols = [col for col in data.columns if col != 'Performance Index'] #identificación variables independientes

In [None]:
assembler = VectorAssembler(
    inputCols= feature_cols,
    outputCol="features")

data = assembler.transform(data)

#### MinMaxScaler

Mediante el `MinMaxScaler` transformamos las características escalándolas a un rango dado, por defecto (0,1). Este se utiliza para mejrar el modelo en caso de tener funciones con rangos muy diferentes y querer escalarlas a un rango común.

In [None]:
scaler = MinMaxScaler(inputCol="features", outputCol="scaled_features")
pipeline = Pipeline(stages=[scaler])
scalerModel = pipeline.fit(data)
scaledData = scalerModel.transform(data)

Hemos dividido los datos en un 80% para el entrenamiento y un 20% para el test.`Seed` permite establecer una semilla para la generación de números aleatorios, lo que asegura que cada vez que se ejecute este código, la división sea consistente.

In [None]:
train_data, test_data = scaledData.randomSplit([0.8, 0.2], seed=70)

Finalmente se realiza la regresión entre la variable dependiente `Performance Index` y las características ya escaladas.

In [None]:
lr = LinearRegression(featuresCol="scaled_features", labelCol="Performance Index", predictionCol="y_pred")
lr_model = lr.fit(train_data)

## Evaluación del modelo.
Para la evaluación de este, utilizaremos el RSME.

In [None]:
predictions = lr_model.transform(test_data)

evaluator = RegressionEvaluator(labelCol="Performance Index", predictionCol="y_pred", metricName="rmse")
rmse = evaluator.evaluate(predictions)
print("Root Mean Squared Error (RMSE) on test data: {:.3f}".format(rmse))

Root Mean Squared Error (RMSE) on test data: 2.068


In [None]:
predictions.show()

+-------------+---------------+--------------------------+-----------+--------------------------------+-----------------+--------------------+--------------------+------------------+
|Hours Studied|Previous Scores|Extracurricular Activities|Sleep Hours|Sample Question Papers Practiced|Performance Index|            features|     scaled_features|            y_pred|
+-------------+---------------+--------------------------+-----------+--------------------------------+-----------------+--------------------+--------------------+------------------+
|            1|             40|                         0|          4|                               3|             15.0|[1.0,40.0,0.0,4.0...|(5,[4],[0.3333333...|11.981318172072116|
|            1|             40|                         0|          5|                               6|             13.0|[1.0,40.0,0.0,5.0...|(5,[3,4],[0.2,0.6...|13.062506152473556|
|            1|             40|                         0|          6|               