# PySpark Assignment
## María Ferrero and Lara Monteserín


The main goal of this assignment is to check whether feature selection can improve results by removing
irrelevant variables, or at least, maintain the results but using fewer features.

We will do that with a
LinearRegression algorithm (with no HPO, in order to keep the assignment short). We will do that by
training different feature selection approaches on the training partition and comparing them on a test
set. In any case, the main aim of the assignment is technical (i.e. being able to use pyspark with a dataset).

**WHAT TO HAND IN:** A notebook with some explanations about what you are doing in each step, and also draw some short conclusions at the end of the notebook. Submit the notebook in two formats: (ipynb) and html. Please, submit also a screen capture showing (at least) the last cells of your executed script.


# LEER
Hay un problema muy importante y es que en el assignment dice que hay que usar LogisticRegression pero es un modelo de clasificación y las labels (la target) no puede ser continua (porque no funciona). Y nuestra target es por narices continua. Estoy usando Linear Regression.

## PART 0: Creating the Spark session, loading the data and preparing the dataframe for ML use.

In Google Colab, it is neccesary to install pyspark everytime. Also, we upload the file to Google Colab.

In [1]:
from google.colab import files

# Upload the CSV file
uploaded = files.upload()

Saving wind_available_second.csv to wind_available_second.csv


In [2]:
!pip install pyspark

Collecting pyspark
  Downloading pyspark-3.5.0.tar.gz (316.9 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m316.9/316.9 MB[0m [31m4.3 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Building wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.5.0-py2.py3-none-any.whl size=317425345 sha256=dd2ef47282cd48bfd97a7f01e1b83523d2797a37962ecb79289b39402bec3926
  Stored in directory: /root/.cache/pip/wheels/41/4e/10/c2cf2467f71c678cfc8a6b9ac9241e5e44a01940da8fbb17fc
Successfully built pyspark
Installing collected packages: pyspark
Successfully installed pyspark-3.5.0


Now we initialize the Spark context and create a Spark session. Then, we read the data in Pandas.

In [3]:
# SPARK CONTEXT INITIALIZATION
from pyspark.sql import SparkSession
import pandas as pd

# Create a Spark session
spark = SparkSession.builder.master("local[*]").appName("App").getOrCreate()

# Get the Spark context
sc = spark.sparkContext

In [4]:
# Read the CSV file into a Pandas Dataframe
wind_ava = pd.read_csv('wind_available_second.csv')

To first see the structure of the dataframe, we visualize the first rows.



In [22]:
wind_ava.head()


Unnamed: 0,energy,year,month,day,hour,p54_162_1,p54_162_2,p54_162_3,p54_162_4,p54_162_5,...,v100_16,v100_17,v100_18,v100_19,v100_20,v100_21,v100_22,v100_23,v100_24,v100_25
0,402.71,2005,1,2,18,2534970.0,2526864.0,2518754.0,2510648.0,2502537.0,...,-4.683596,,-4.407196,,-4.131295,-4.669626,-4.528932,-4.388736,-4.24854,-4.107846
1,696.8,2005,1,3,0,,,2521184.0,2513088.0,,...,-3.397886,-3.257192,-3.115998,-2.975304,-2.834609,-3.39639,-3.254198,-3.112506,-2.970314,
2,1591.15,2005,1,3,6,2533727.0,2525703.0,2517678.0,2509654.0,,...,-1.454105,,-1.13829,,-0.822476,-1.459094,-1.302933,-1.147271,-0.99111,-0.834949
3,1338.62,2005,1,3,12,,2526548.0,2518609.0,2510670.0,2502732.0,...,1.255015,1.370265,1.485515,1.600765,1.716015,1.210612,1.319376,1.42814,1.536405,1.645169
4,562.5,2005,1,3,18,2529543.0,,2513702.0,2505782.0,2497861.0,...,1.939031,,,2.193977,2.278793,1.873673,1.953,2.031829,2.111157,2.189986


Before preparing the dataframe for ML use, it is neccesary to treat the missing and the null values. In order to do this, we will perform imputation techniques. As we have been able to verify in the first assignment, the Iterative Imputer for this data has been the one getting better results in future predictions in most of the cases, so it is the one that we will use here. After the imputation, we transform the Pandas dataframe into a Spark dataframe.


In [5]:
from sklearn.experimental import enable_iterative_imputer
from sklearn.impute import IterativeImputer

# Usar IterativeImputer de scikit-learn para imputar valores nulos
imputer = IterativeImputer(max_iter=10, random_state=100514164)
wind_ava = pd.DataFrame(imputer.fit_transform(wind_ava), columns=wind_ava.columns)

# Convertir el DataFrame de pandas imputado a un DataFrame de PySpark
wind_ava = spark.createDataFrame(wind_ava)


Finally, we prepare the dataframe for ML use. The algorithms in Spark ML library need a dataframe with just two columns: the first one (typically named features) must contain a matrix with the input attributes, the second one must contain the output attribute (typically named label). In order to do that, VectorAssembler is going to be used to put together all the input attributes.

In [28]:
import pandas as pd
import os

'''
pandas_df = wind_ava.toPandas()

# Especifica la ruta local donde deseas guardar el archivo CSV
ruta_local = 'C:/Users/laram/Desktop/Todo/UC3M/Second Bimester/Intelligence/Assignment2/wind_ava.csv'

# Extraer la ruta del directorio sin el nombre del archivo
directorio, nombre_archivo = os.path.split(ruta_local)

# Crear los directorios si no existen
if not os.path.exists(directorio):
    os.makedirs(directorio)

# Guardar el DataFrame de pandas como un archivo CSV local
pandas_df.to_csv(ruta_local, header=True, index=False)
'''

In [9]:
wind_ava.show()

+-------+------+-----+---+----+------------------+-----------------+----------------+------------------+-----------------+-----------------+-----------------+-----------------+------------------+------------------+-----------------+------------------+------------------+----------------+------------------+------------------+------------------+-----------------+------------------+-----------------+------------------+------------------+------------------+------------------+------------------+-----------------+-----------------+-----------------+----------------+-----------------+-----------------+------------------+-----------------+------------------+------------------+----------------+-----------------+------------------+-----------------+------------------+------------------+------------------+-----------------+----------------+-----------------+------------------+------------------+----------------+------------------+------------------+--------------------+--------------------+-------

In [29]:
df = pd.read_csv('C:/Users/laram/Desktop/Todo/UC3M/Second Bimester/Intelligence/Assignment2/wind_ava.csv')
#wind_ava2 = spark.createDataFrame(df)
wind_ava = spark.createDataFrame(df)

In [30]:
wind_ava.show() #jeje ya va bien no más imputation

+-------+--------------------+
|  label|            features|
+-------+--------------------+
| 402.71|[2005.0,1.0,2.0,1...|
|  696.8|[2005.0,1.0,3.0,0...|
|1591.15|[2005.0,1.0,3.0,6...|
|1338.62|[2005.0,1.0,3.0,1...|
|  562.5|[2005.0,1.0,3.0,1...|
|  232.3|[2005.0,1.0,4.0,0...|
| 329.95|[2005.0,1.0,4.0,6...|
| 960.51|[2005.0,1.0,4.0,1...|
| 194.62|[2005.0,1.0,4.0,1...|
| 358.51|[2005.0,1.0,5.0,0...|
|  808.8|[2005.0,1.0,5.0,6...|
|  93.36|[2005.0,1.0,5.0,1...|
| 155.94|[2005.0,1.0,5.0,1...|
|   0.01|[2005.0,1.0,6.0,0...|
|   4.85|[2005.0,1.0,6.0,1...|
| 218.76|[2005.0,1.0,7.0,0...|
| 906.21|[2005.0,1.0,7.0,6...|
| 201.42|[2005.0,1.0,7.0,1...|
| 641.34|[2005.0,1.0,7.0,1...|
|1524.05|[2005.0,1.0,8.0,0...|
+-------+--------------------+
only showing top 20 rows



In [6]:
from pyspark.ml.feature import VectorAssembler

# In Pyspark, typically the response is called label
wind_ava = wind_ava.withColumnRenamed("energy", "label") #we don't do this step because when transforming the pandas dataset

ignore = ['label']

assembler = VectorAssembler(
    inputCols=[x for x in wind_ava.columns if x not in ignore],
    outputCol='features')

wind_ava = assembler.transform(wind_ava).select(['label', 'features'])

Now the first columns of the dataframe look as follows

In [13]:
wind_ava.show()

+-------+--------------------+
|  label|            features|
+-------+--------------------+
| 402.71|[2005.0,1.0,2.0,1...|
|  696.8|[2005.0,1.0,3.0,0...|
|1591.15|[2005.0,1.0,3.0,6...|
|1338.62|[2005.0,1.0,3.0,1...|
|  562.5|[2005.0,1.0,3.0,1...|
|  232.3|[2005.0,1.0,4.0,0...|
| 329.95|[2005.0,1.0,4.0,6...|
| 960.51|[2005.0,1.0,4.0,1...|
| 194.62|[2005.0,1.0,4.0,1...|
| 358.51|[2005.0,1.0,5.0,0...|
|  808.8|[2005.0,1.0,5.0,6...|
|  93.36|[2005.0,1.0,5.0,1...|
| 155.94|[2005.0,1.0,5.0,1...|
|   0.01|[2005.0,1.0,6.0,0...|
|   4.85|[2005.0,1.0,6.0,1...|
| 218.76|[2005.0,1.0,7.0,0...|
| 906.21|[2005.0,1.0,7.0,6...|
| 201.42|[2005.0,1.0,7.0,1...|
| 641.34|[2005.0,1.0,7.0,1...|
|1524.05|[2005.0,1.0,8.0,0...|
+-------+--------------------+
only showing top 20 rows



## PART 1: Split data intro train and test


In [7]:
(trainingData_sd, testData_sd) = wind_ava.randomSplit([0.7, 0.3])

## PART 2: Formulate three pipelines, train and evaluate them:
- a. Feature selection with the UnivariateFeatureSelector and the fpr strategy (least conservative)
- b. Same, with the fwe strategy (most conservative).
- c. Same, but doing PCA and using 3 components

### PIPELINE 1: Feature selection with the UnivariateFeatureSelector and the fpr strategy (least conservative)

In [18]:
from pyspark.ml.feature import UnivariateFeatureSelector
from pyspark.ml import Pipeline
#from pyspark.ml.classification import LogisticRegression #as it is the chosen algorithm for this assignment
from pyspark.ml.regression import LinearRegression
from pyspark.ml.evaluation import RegressionEvaluator

# Step 1: UnivariateFeatureSelector
selector_fpr = UnivariateFeatureSelector(
    featuresCol='features',
    outputCol='selected_features',
    labelCol='label',
    selectionMode='fpr'  # False Positive Rate strategy
)
selector_fpr.setFeatureType("continuous").setLabelType("continuous").setSelectionThreshold(0.05) # change to make it less conservative

# Step 2: Logistic Regression
lr = LinearRegression(
    labelCol='label',
    featuresCol='selected_features',
    maxIter=10,
)

# Step 3: Create the pipeline
pipeline_fpr = Pipeline(stages=[selector_fpr, lr])

# Step 4: Train the pipeline on the training data
model_fpr = pipeline_fpr.fit(trainingData_sd)

# Step 5: Make predictions on the test data
predictions = model_fpr.transform(testData_sd)

# Step 6: Evaluate the model using RMSE
evaluator = RegressionEvaluator(labelCol='label', predictionCol='prediction', metricName='rmse')
rmse_fpr = evaluator.evaluate(predictions)

# Print the RMSE
print(f"Root Mean Squared Error (RMSE): {rmse_fpr}")

Root Mean Squared Error (RMSE): 432.7580737306437


### PIPELINE 2: Same as PIPELINE 1, with the fwe strategy (most conservative).

In [19]:
# Step 1: UnivariateFeatureSelector with FWE strategy
selector_fwe = UnivariateFeatureSelector(
    featuresCol='features',
    outputCol='selected_features',
    labelCol='label',
    selectionMode='fwe'  # Family-Wise Error Rate strategy (most conservative)
)

selector_fwe.setFeatureType("continuous").setLabelType("continuous").setSelectionThreshold(0.05) # most conservative

# Step 2 is the same as before
# Step 3: Create the pipeline
pipeline_fwe = Pipeline(stages=[selector_fwe, lr])

# Step 4: Train the pipeline on the training data
model_fwe = pipeline_fwe.fit(trainingData_sd)

# Step 5: Make predictions on the test data
predictions = model_fwe.transform(testData_sd)

# Step 6: Evaluate the model using RMSE
evaluator = RegressionEvaluator(labelCol='label', predictionCol='prediction', metricName='rmse')
rmse_fwe = evaluator.evaluate(predictions)

# Print the RMSE
print(f"Root Mean Squared Error (RMSE): {rmse_fwe}")

Root Mean Squared Error (RMSE): 429.6432987151365


### PIPELINE 3: Same as PIPELINES 1 AND 2, but doing PCA and using 3 components

In [20]:
from pyspark.ml.feature import PCA

# Step 1: PCA
pca = PCA(
    k=3,  # principal components
    inputCol='features',
    outputCol='pca_features'
)

# Step 2: Linear Regression
lr = LinearRegression(
    labelCol='label',
    featuresCol='pca_features',
    maxIter=10,
)

# Step 3: Create the pipeline
pipeline_pca = Pipeline(stages=[pca, lr])

# Step 4: Train the pipeline on the training data
model_pca = pipeline_pca.fit(trainingData_sd)

# Step 5: Make predictions on the test data
predictions = model_pca.transform(testData_sd)

# Step 6: Evaluate the model using RMSE
evaluator = RegressionEvaluator(labelCol='label', predictionCol='prediction', metricName='rmse')
rmse_pca = evaluator.evaluate(predictions)

# Print the RMSE
print(f"Root Mean Squared Error (RMSE) with PCA: {rmse_pca}")

Root Mean Squared Error (RMSE) with PCA: 596.0593000590766


# PART 3: Formulate, train and evaluate another pipeline that uses features obtained from two sources:

*   Feature selection with the fpr
*   The 3 components from PCA

We use selector_fpr, that has been created before.

Then, PCA is used to perform dimensionality reduction and obtain 3 principal components from the selected features.

It is needed to use VectorAssembler again to combine features from both sources.

Finally, a new pipeline (pipeline_combined) is created with the FPR selector, PCA, assembler, and logistic regression.

In [21]:
# Step 1: PCA
pca = PCA(
    k=3,
    inputCol='selected_features',
    outputCol='pca_features'
)

# Step 3: Assemble features from both sources
assembler = VectorAssembler(
    inputCols=['selected_features', 'pca_features'],
    outputCol='combined_features'
)

# Step 4: Logistic Regression
lr = LinearRegression(
    labelCol='label',
    featuresCol='combined_features',
    maxIter=10,
)

# Step 5: Create the pipeline
pipeline_combined = Pipeline(stages=[selector_fpr, pca, assembler, lr])

# Step 6: Train the pipeline on the training data
model_combined = pipeline_combined.fit(trainingData_sd)

# Step 7: Make predictions on the test data
predictions_combined = model_combined.transform(testData_sd)

# Step 8: Evaluate the model using RMSE
evaluator_combined = RegressionEvaluator(labelCol='label', predictionCol='prediction', metricName='rmse')
rmse_combined = evaluator_combined.evaluate(predictions_combined)

# Print the RMSE
print(f"Root Mean Squared Error (RMSE) with combined features: {rmse_combined}")

Root Mean Squared Error (RMSE) with combined features: 504.7804488189157


## PART 4: Can you determine how many features are selected by fpr and fwe?

In [22]:
# Step 5: Get the selected features for FPR
selected_features_fpr = model_fpr.stages[0].selectedFeatures
print("Number of features selected by FPR:", len(selected_features_fpr))

# Step 5: Get the selected features for FWE
selected_features_fwe = model_fwe.stages[0].selectedFeatures
print("Number of features selected by FWE:", len(selected_features_fwe))


Number of features selected by FPR: 523
Number of features selected by FWE: 470


In [None]:
#The following is to stop the cluster.
#
spark.stop()

# Conclusions

- Ir cambiando thresholds y establecer un criterio para decidir al final (FPR Y FWE)
- Explicar que evaluamos con RMSE
- Revisar lo de imputation pre-assembler
- Lo del LinearRegression vs LogisticRegression
