### Importo

In [None]:
from sklearn.ensemble import RandomForestRegressor
import seaborn as sns
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
from sklearn.model_selection import train_test_split
from sklearn.model_selection import GridSearchCV
from sklearn.metrics import r2_score
from pyspark.sql.functions import col,sum
from pyspark.ml.stat import Correlation
import datetime
from pyspark.sql.functions import year, month, dayofmonth, hour

### Cargo archivos

In [None]:
df_train = spark.read.csv('dbfs:/FileStore/tables/train.csv', inferSchema=True, header=True)
df_test = spark.read.csv('dbfs:/FileStore/tables/test.csv', inferSchema=True, header=True)

### Check dataset

In [None]:
df_train.show(5)

+-------------------+------+-------+----------+-------+----+------+--------+---------+------+----------+-----+
|           datetime|season|holiday|workingday|weather|temp| atemp|humidity|windspeed|casual|registered|count|
+-------------------+------+-------+----------+-------+----+------+--------+---------+------+----------+-----+
|2011-01-01 00:00:00|     1|      0|         0|      1|9.84|14.395|      81|      0.0|     3|        13|   16|
|2011-01-01 01:00:00|     1|      0|         0|      1|9.02|13.635|      80|      0.0|     8|        32|   40|
|2011-01-01 02:00:00|     1|      0|         0|      1|9.02|13.635|      80|      0.0|     5|        27|   32|
|2011-01-01 03:00:00|     1|      0|         0|      1|9.84|14.395|      75|      0.0|     3|        10|   13|
|2011-01-01 04:00:00|     1|      0|         0|      1|9.84|14.395|      75|      0.0|     0|         1|    1|
+-------------------+------+-------+----------+-------+----+------+--------+---------+------+----------+-----+
o

- Información de mis columnas

In [None]:
df_train.describe()

Out[295]: DataFrame[summary: string, season: string, holiday: string, workingday: string, weather: string, temp: string, atemp: string, humidity: string, windspeed: string, casual: string, registered: string, count: string]

### Check missings

In [None]:
df_train.select(*(sum(col(c).isNull().cast("int")).alias(c) for c in df_train.columns)).show()

+--------+------+-------+----------+-------+----+-----+--------+---------+------+----------+-----+
|datetime|season|holiday|workingday|weather|temp|atemp|humidity|windspeed|casual|registered|count|
+--------+------+-------+----------+-------+----+-----+--------+---------+------+----------+-----+
|       0|     0|      0|         0|      0|   0|    0|       0|        0|     0|         0|    0|
+--------+------+-------+----------+-------+----+-----+--------+---------+------+----------+-----+



### Check correlation

In [None]:
list_columns = df_train.columns
list_columns.remove('datetime')

- Elimino los valores menores 0.11 en 'count'

In [None]:
for c in list_columns:
    corr = df_train.corr('count',c)
    if abs(corr) < 0.11:
        df_train = df_train.drop(c)
        df_test = df_test.drop(c)

- Obveservo mis datos df_train

In [None]:
df_train.display()

datetime,season,weather,temp,atemp,humidity,casual,registered,count
2011-01-01T00:00:00.000+0000,1,1,9.84,14.395,81,3,13,16
2011-01-01T01:00:00.000+0000,1,1,9.02,13.635,80,8,32,40
2011-01-01T02:00:00.000+0000,1,1,9.02,13.635,80,5,27,32
2011-01-01T03:00:00.000+0000,1,1,9.84,14.395,75,3,10,13
2011-01-01T04:00:00.000+0000,1,1,9.84,14.395,75,0,1,1
2011-01-01T05:00:00.000+0000,1,2,9.84,12.88,75,0,1,1
2011-01-01T06:00:00.000+0000,1,1,9.02,13.635,80,2,0,2
2011-01-01T07:00:00.000+0000,1,1,8.2,12.88,86,1,2,3
2011-01-01T08:00:00.000+0000,1,1,9.84,14.395,75,1,7,8
2011-01-01T09:00:00.000+0000,1,1,13.12,17.425,76,8,6,14


- Observo mis datos df_test

In [None]:
df_test.display()

datetime,season,weather,temp,atemp,humidity
2011-01-20T00:00:00.000+0000,1,1,10.66,11.365,56
2011-01-20T01:00:00.000+0000,1,1,10.66,13.635,56
2011-01-20T02:00:00.000+0000,1,1,10.66,13.635,56
2011-01-20T03:00:00.000+0000,1,1,10.66,12.88,56
2011-01-20T04:00:00.000+0000,1,1,10.66,12.88,56
2011-01-20T05:00:00.000+0000,1,1,9.84,11.365,60
2011-01-20T06:00:00.000+0000,1,1,9.02,10.605,60
2011-01-20T07:00:00.000+0000,1,1,9.02,10.605,55
2011-01-20T08:00:00.000+0000,1,1,9.02,10.605,55
2011-01-20T09:00:00.000+0000,1,2,9.84,11.365,52


### Eliminación de columnas no deseadas

In [None]:
df_train = df_train.drop("casual", "registered", "atemp")
df_test = df_test.drop("atemp")

### Convierto datetime to year, month, day and hour

In [None]:
df_train = df_train.withColumn('year', year('datetime'))
df_train = df_train.withColumn('month', month('datetime'))
df_train = df_train.withColumn('day', dayofmonth('datetime'))
df_train = df_train.withColumn('hour', hour('datetime'))

In [None]:
df_test = df_test.withColumn('year', year('datetime'))
df_test = df_test.withColumn('month', month('datetime'))
df_test = df_test.withColumn('day', dayofmonth('datetime'))
df_test = df_test.withColumn('hour', hour('datetime'))

### Elimino datetime

In [None]:
df_train = df_train.drop('datetime')
df_test = df_test.drop('datetime')

### Resultado Final

In [None]:
df_train.show(5)

+------+-------+----+--------+-----+----+-----+---+----+
|season|weather|temp|humidity|count|year|month|day|hour|
+------+-------+----+--------+-----+----+-----+---+----+
|     1|      1|9.84|      81|   16|2011|    1|  1|   0|
|     1|      1|9.02|      80|   40|2011|    1|  1|   1|
|     1|      1|9.02|      80|   32|2011|    1|  1|   2|
|     1|      1|9.84|      75|   13|2011|    1|  1|   3|
|     1|      1|9.84|      75|    1|2011|    1|  1|   4|
+------+-------+----+--------+-----+----+-----+---+----+
only showing top 5 rows



In [None]:
df_test.show(5)

+------+-------+-----+--------+----+-----+---+----+
|season|weather| temp|humidity|year|month|day|hour|
+------+-------+-----+--------+----+-----+---+----+
|     1|      1|10.66|      56|2011|    1| 20|   0|
|     1|      1|10.66|      56|2011|    1| 20|   1|
|     1|      1|10.66|      56|2011|    1| 20|   2|
|     1|      1|10.66|      56|2011|    1| 20|   3|
|     1|      1|10.66|      56|2011|    1| 20|   4|
+------+-------+-----+--------+----+-----+---+----+
only showing top 5 rows



### Creo variables X e y

In [None]:
X = np.array(df_train.drop("count").collect())
y = np.array(df_train.select("count").collect())

- Divide en tran y test

In [None]:
import mlflow.sklearn

X_train, X_test, y_train, y_test = train_test_split(X, y, test_size = 0.3 ,random_state=42)

- Feature importance RandomForest y entreno modelo

In [None]:
param_grid = {
    'bootstrap': [True],
    'max_depth': [80, 90, 100, 110],
    'max_features': [2, 3],
    'min_samples_leaf': [3, 4],
    'min_samples_split': [8, 10],
    'n_estimators': [100, 200, 300]}
rf = RandomForestRegressor()
grid_search = GridSearchCV(estimator = rf, param_grid = param_grid, 
                              cv = 3, n_jobs = -1, verbose = 2)
grid_search.fit(X_train, y_train)

Fitting 3 folds for each of 96 candidates, totalling 288 fits
  original_result = original(self, *args, **kwargs)
Out[14]: GridSearchCV(cv=3, estimator=RandomForestRegressor(), n_jobs=-1,
             param_grid={'bootstrap': [True], 'max_depth': [80, 90, 100, 110],
                         'max_features': [2, 3], 'min_samples_leaf': [3, 4],
                         'min_samples_split': [8, 10],
                         'n_estimators': [100, 200, 300]},
             verbose=2)

- Obtengo resultado

In [None]:
y_pred = grid_search.predict(X_test)
r2_score(y_test, y_pred)

Out[15]: 0.7818002604549164

- Guardo

In [None]:
import pickle

pickle.dump(grid_search, open("rf_model.pkl", "wb"))