<a href="https://colab.research.google.com/github/solrepresa/AQ-CABA/blob/master/RF_valencia.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>


Apache Spark y Random Forest en Python.

Importar librerías que vamos a usar.
La mayoría son pyspark.

In [8]:
!pip install --upgrade pyspark

from pyspark.ml import Pipeline
from pyspark.ml.regression import RandomForestRegressor
from pyspark.ml.tuning import ParamGridBuilder
from pyspark.ml.tuning import CrossValidator
from pyspark.ml.feature import VectorIndexer
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.context import SparkContext
from pyspark.sql.functions import col
from pyspark.sql.session import SparkSession


import pandas as pd
import numpy as np

Requirement already up-to-date: pyspark in /usr/local/lib/python3.6/dist-packages (2.4.5)


Creamos un contexto y sesión para Spark. Luego lo vamos a cerrar al final del código.

In [0]:
sc = SparkContext('local')
spark = SparkSession(sc)

Abrimos los datos. Es un 10% de mi set de datos. Está almacenado en github. Abrimos con pandas y convertimos al formato de Spark.


In [0]:
# URL datos
url = "https://raw.githubusercontent.com/solrepresa/RF-Valencia/master/prueba_datos.csv"
df1 = pd.read_csv(url)

In [0]:
type(df1)
data = spark.createDataFrame(df1)

In [16]:
type(data)


pyspark.sql.dataframe.DataFrame

In [0]:
#data = spark.read.format("csv").option("header", True).load("/variables_estacion_aod_IDW_modelo_paper.csv")

Verificamos que la tabla se encuentre cargada correctamente.

In [100]:
#display(data)
#data.show()
#data.describe().show()

data.printSchema() #aparecen las variables como string


root
 |-- PM25: string (nullable = true)
 |-- DEM: string (nullable = true)
 |-- CLC_1: string (nullable = true)
 |-- PS: string (nullable = true)
 |-- RH: string (nullable = true)
 |-- T: string (nullable = true)
 |-- U: string (nullable = true)
 |-- V: string (nullable = true)
 |-- DUSMASS: string (nullable = true)
 |-- OCSMASS: string (nullable = true)
 |-- SO2SMASS: string (nullable = true)
 |-- SO4SMASS: string (nullable = true)
 |-- SSSMASS: string (nullable = true)
 |-- PBLH: string (nullable = true)
 |-- PRECTOT: string (nullable = true)
 |-- SPEED: string (nullable = true)
 |-- CLDHGH: string (nullable = true)
 |-- CLDLOW: string (nullable = true)
 |-- H1000: string (nullable = true)
 |-- AOD: string (nullable = true)



Cambiamos el formato de string a float

In [0]:
data = data.select(*(col(c).cast("float").alias(c) for c in data.columns))

Corroboramos:

In [102]:
data.printSchema()
data.show()

root
 |-- PM25: float (nullable = true)
 |-- DEM: float (nullable = true)
 |-- CLC_1: float (nullable = true)
 |-- PS: float (nullable = true)
 |-- RH: float (nullable = true)
 |-- T: float (nullable = true)
 |-- U: float (nullable = true)
 |-- V: float (nullable = true)
 |-- DUSMASS: float (nullable = true)
 |-- OCSMASS: float (nullable = true)
 |-- SO2SMASS: float (nullable = true)
 |-- SO4SMASS: float (nullable = true)
 |-- SSSMASS: float (nullable = true)
 |-- PBLH: float (nullable = true)
 |-- PRECTOT: float (nullable = true)
 |-- SPEED: float (nullable = true)
 |-- CLDHGH: float (nullable = true)
 |-- CLDLOW: float (nullable = true)
 |-- H1000: float (nullable = true)
 |-- AOD: float (nullable = true)

+-----+---------+-----+---------+------------+---------+---------+---------+-------------+-------------+-------------+-------------+------------+---------+-------------+---------+------------+------------+----------+-----------+
| PM25|      DEM|CLC_1|       PS|          RH|       

Creamos la lista de variables q ingresan al modelo

In [0]:
feature_list = []
for col in data.columns:
    if col == 'label':
        continue
    else:
        feature_list.append(col)

In [104]:
feature_list #es una lista

['PM25',
 'DEM',
 'CLC_1',
 'PS',
 'RH',
 'T',
 'U',
 'V',
 'DUSMASS',
 'OCSMASS',
 'SO2SMASS',
 'SO4SMASS',
 'SSSMASS',
 'PBLH',
 'PRECTOT',
 'SPEED',
 'CLDHGH',
 'CLDLOW',
 'H1000',
 'AOD']

Combinamos todos nuestros datos en una sola columna:

In [0]:
assembler = VectorAssembler(inputCols=feature_list, outputCol="features")

Definimos el modelo:

In [0]:
rf = RandomForestRegressor(labelCol="PM25", featuresCol="features", numTrees=100)

Creamos el Pipeline:

In [0]:
pipeline = Pipeline(stages=[assembler, rf])

Creamos la grilla para calcular el error:

In [0]:
paramGrid = ParamGridBuilder() \
    .addGrid(rf.numTrees, [int(x) for x in np.linspace(start = 10, stop = 50, num = 10)]) \
    .addGrid(rf.maxDepth, [int(x) for x in np.linspace(start = 5, stop = 25, num = 10)]) \
    .build()

Armamos el Cross Validation:

In [0]:
crossval = CrossValidator(estimator=pipeline,
                          estimatorParamMaps=paramGrid,
                          evaluator=RegressionEvaluator(),
                          numFolds=10)

Transformamos los datos

In [0]:
from pyspark.mllib.linalg import Vectors
from pyspark.mllib.regression import LabeledPoint

transformed_df = data.rdd.map(lambda row: LabeledPoint(row[-1], Vectors.dense(row[0:-1])))


Definimos el set de entrenamiento y el de prueba (30% de los datos):

In [0]:
(trainingData, testData) = transformed_df.randomSplit([0.7, 0.3])

TypeError: ignored

Por fin, corremos el modelo con el set de entrenamiento:

In [0]:
cvModel = crossval.fit(trainingData)

AttributeError: ignored

In [0]:
predictions = cvModel.transform(testData)

De aca para abajo es "tomado".. Muestra gráficas de resultados.

In [0]:
import matplotlib.pyplot as plt

evaluator = RegressionEvaluator(labelCol="label", predictionCol="prediction", metricName="rmse")

rmse = evaluator.evaluate(predictions)

rfPred = cvModel.transform(df)

rfResult = rfPred.toPandas()

plt.plot(rfResult.label, rfResult.prediction, 'bo')
plt.xlabel('Price')
plt.ylabel('Prediction')
plt.suptitle("Model Performance RMSE: %f" % rmse)
plt.show()

In [0]:

bestPipeline = cvModel.bestModel
bestModel = bestPipeline.stages[1]

importances = bestModel.featureImportances

x_values = list(range(len(importances)))

plt.bar(x_values, importances, orientation = 'vertical')
plt.xticks(x_values, feature_list, rotation=40)
plt.ylabel('Importance')
plt.xlabel('Feature')
plt.title('Feature Importances')

In [0]:
sc.stop()