<a href="https://colab.research.google.com/github/solrepresa/RF-Valencia/blob/master/RF_valencia.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>


Apache Spark y Random Forest en Python.

Importantísimo! Instalar JAVA. Esto instala Apache Spark 2.4.4, Java 8 y Findspark , una biblioteca que facilita a Python encontrar Spark. 

In [0]:
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q https://downloads.apache.org/spark/spark-2.4.5/spark-2.4.5-bin-hadoop2.7.tgz
!tar xf spark-2.4.5-bin-hadoop2.7.tgz
!pip install -q findspark

In [101]:
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-2.4.5-bin-hadoop2.7"

! java -version

openjdk version "11.0.6" 2020-01-14
OpenJDK Runtime Environment (build 11.0.6+10-post-Ubuntu-1ubuntu118.04.1)
OpenJDK 64-Bit Server VM (build 11.0.6+10-post-Ubuntu-1ubuntu118.04.1, mixed mode, sharing)


Importar librerías que vamos a usar.
La mayoría son pyspark.

In [102]:
!pip install --upgrade pyspark

from pyspark.ml import Pipeline
from pyspark.ml.regression import RandomForestRegressor
from pyspark.ml.tuning import ParamGridBuilder
from pyspark.ml.tuning import CrossValidator
from pyspark.ml.feature import VectorIndexer
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.context import SparkContext
from pyspark.sql.functions import col
from pyspark.sql.session import SparkSession


import pandas as pd
import numpy as np

Requirement already up-to-date: pyspark in /usr/local/lib/python3.6/dist-packages (2.4.5)


In [0]:
import findspark
findspark.init()

Creamos un contexto y sesión para Spark. Luego lo vamos a cerrar al final del código.

In [0]:
sc = SparkContext('local')
spark = SparkSession(sc)

Abrimos los datos. Es un 10% de mi set de datos. Está almacenado en github. Abrimos con pandas y convertimos al formato de Spark.


In [0]:
# URL datos
url = "https://raw.githubusercontent.com/solrepresa/RF-Valencia/master/prueba_datos.csv"
df1 = pd.read_csv(url)

In [0]:
type(df1)
data = spark.createDataFrame(df1)

In [214]:
type(data)


+-----+------------------+-----+-----------+--------------------+----------------+------------------+-------------------+--------------------+--------------------+--------------------+--------------------+--------------------+-----------------+--------------------+------------------+-------------------+-------------------+------------------+-------------------+
| PM25|               DEM|CLC_1|         PS|                  RH|               T|                 U|                  V|             DUSMASS|             OCSMASS|            SO2SMASS|            SO4SMASS|             SSSMASS|             PBLH|             PRECTOT|             SPEED|             CLDHGH|             CLDLOW|             H1000|                AOD|
+-----+------------------+-----+-----------+--------------------+----------------+------------------+-------------------+--------------------+--------------------+--------------------+--------------------+--------------------+-----------------+--------------------+-------

In [0]:
#data = spark.read.format("csv").option("header", True).load("/variables_estacion_aod_IDW_modelo_paper.csv").option("delimiter", ",")

Verificamos que la tabla se encuentre cargada correctamente.

In [215]:
#display(data)
#data.show()
#data.describe().show()

data.show(10)
data.printSchema() #aparecen las variables como string


+-----+------------------+-----+-----------+--------------------+----------------+------------------+-------------------+--------------------+--------------------+--------------------+--------------------+--------------------+-----------------+--------------------+------------------+-------------------+-------------------+------------------+-------------------+
| PM25|               DEM|CLC_1|         PS|                  RH|               T|                 U|                  V|             DUSMASS|             OCSMASS|            SO2SMASS|            SO4SMASS|             SSSMASS|             PBLH|             PRECTOT|             SPEED|             CLDHGH|             CLDLOW|             H1000|                AOD|
+-----+------------------+-----+-----------+--------------------+----------------+------------------+-------------------+--------------------+--------------------+--------------------+--------------------+--------------------+-----------------+--------------------+-------

¿El formato es correcto?

In [0]:
# data = data.select(*(col(c).cast("double").alias(c) for c in data.columns))
# data.printSchema()
# data.show()

Creamos la lista de variables q ingresan al modelo

In [0]:
feature_list = []
for col in data.columns:
    if col == 'label':
        continue
    else:
        feature_list.append(col)

In [0]:
#feature_list
feature_list = feature_list[1:25]

Creamos la columna FEATURE. Combinamos todos nuestros datos en una sola columna:

In [0]:
assembler = VectorAssembler(inputCols=feature_list, outputCol="features")

In [0]:
output = assembler.transform(data)

In [224]:
output.printSchema()
#output.show(10)
#output.select("features", "PM25").show(truncate=False)

root
 |-- PM25: double (nullable = true)
 |-- DEM: double (nullable = true)
 |-- CLC_1: double (nullable = true)
 |-- PS: double (nullable = true)
 |-- RH: double (nullable = true)
 |-- T: double (nullable = true)
 |-- U: double (nullable = true)
 |-- V: double (nullable = true)
 |-- DUSMASS: double (nullable = true)
 |-- OCSMASS: double (nullable = true)
 |-- SO2SMASS: double (nullable = true)
 |-- SO4SMASS: double (nullable = true)
 |-- SSSMASS: double (nullable = true)
 |-- PBLH: double (nullable = true)
 |-- PRECTOT: double (nullable = true)
 |-- SPEED: double (nullable = true)
 |-- CLDHGH: double (nullable = true)
 |-- CLDLOW: double (nullable = true)
 |-- H1000: double (nullable = true)
 |-- AOD: double (nullable = true)
 |-- features: vector (nullable = true)



In [0]:
#output = output.select("features", "PM25")

Definimos el set de entrenamiento y el de prueba (30% de los datos):

In [0]:
(trainingData, testData) = output.randomSplit([0.7, 0.3], 123)

In [240]:
#type(trainingData)
#trainingData.printSchema()
trainingData.take(10)

[Row(PM25=0.08, DEM=26.122510000000002, CLC_1=66.67, PS=96517.06, RH=6.357457e-05, T=282.1785, U=43.54033, V=-11.12738, DUSMASS=1.509183e-09, OCSMASS=2.244379e-10, SO2SMASS=7.037031e-10, SO4SMASS=6.266534e-10, SSSMASS=5.443616999999999e-09, PBLH=1503.381, PRECTOT=2.213341e-08, SPEED=11.666889999999999, CLDHGH=0.0, CLDLOW=0.02347503, H1000=191.373, AOD=0.07806851, features=DenseVector([26.1225, 66.67, 96517.06, 0.0001, 282.1785, 43.5403, -11.1274, 0.0, 0.0, 0.0, 0.0, 0.0, 1503.381, 0.0, 11.6669, 0.0, 0.0235, 191.373, 0.0781])),
 Row(PM25=0.08, DEM=116.0358, CLC_1=11.11, PS=92265.45, RH=2.559837e-06, T=286.1741, U=-0.6763776, V=12.62625, DUSMASS=7.032930000000001e-09, OCSMASS=4.78972e-10, SO2SMASS=9.196215e-10, SO4SMASS=2.360359e-09, SSSMASS=2.9303479999999997e-09, PBLH=1467.385, PRECTOT=8.542975000000001e-09, SPEED=2.5002020000000003, CLDHGH=0.0, CLDLOW=2.471996e-05, H1000=199.3047, AOD=0.06417276, features=DenseVector([116.0358, 11.11, 92265.45, 0.0, 286.1741, -0.6764, 12.6262, 0.0, 0.

Definimos el modelo:

In [0]:
rf = RandomForestRegressor(labelCol="PM25", featuresCol="features", numTrees=100, seed= 123)

In [241]:
model = rf.fit(trainingData)

IllegalArgumentException: ignored

Creamos el Pipeline:

In [0]:
pipeline = Pipeline(stages=[assembler, rf])

In [0]:
model = pipeline.fit(trainingData)

Creamos la grilla para calcular el error:

In [0]:
paramGrid = ParamGridBuilder() \
    .addGrid(rf.numTrees, [int(x) for x in np.linspace(start = 10, stop = 50, num = 10)]) \
    .addGrid(rf.maxDepth, [int(x) for x in np.linspace(start = 5, stop = 25, num = 10)]) \
    .build()

Armamos el Cross Validation:

In [0]:
crossval = CrossValidator(estimator = pipeline,
                          estimatorParamMaps = paramGrid,
                          evaluator = RegressionEvaluator(),
                          numFolds = 10)

Por fin, corremos el modelo con el set de entrenamiento:

In [0]:
cvModel = crossval.fit(trainingData)

Generamos las predicciones del modelo..

In [0]:
predictions = cvModel.transform(testData)

De aca para abajo es "tomado".. Muestra gráficas de resultados.

In [0]:
import matplotlib.pyplot as plt

evaluator = RegressionEvaluator(labelCol="label", predictionCol="prediction", metricName="rmse")

rmse = evaluator.evaluate(predictions)

rfPred = cvModel.transform(df)

rfResult = rfPred.toPandas()

plt.plot(rfResult.label, rfResult.prediction, 'bo')
plt.xlabel('Price')
plt.ylabel('Prediction')
plt.suptitle("Model Performance RMSE: %f" % rmse)
plt.show()

In [0]:

bestPipeline = cvModel.bestModel
bestModel = bestPipeline.stages[1]

importances = bestModel.featureImportances

x_values = list(range(len(importances)))

plt.bar(x_values, importances, orientation = 'vertical')
plt.xticks(x_values, feature_list, rotation=40)
plt.ylabel('Importance')
plt.xlabel('Feature')
plt.title('Feature Importances')

In [0]:
sc.stop()