#Manejo de datos y modelos de Machine Learning con Dask.

El objetivo principal de este trabajo busca poner en práctica las herramientas aprendidas en el curso para el manejo de grandes volúmenes de datos. Puntualmente se busca integrar Dask con el trabajo del módulo pasado de machine learning el cual se concentraba en predecir y clasificar el nivel de tráfico de acuerdo a ciertas características como el volumen de tráfico de carros, camiones, bicicletas o el horario y el día.

Instalamos Dask

In [None]:
!pip install dask[complete] h5py



In [None]:
!pip install dask-ml



# Librerias y dataset.


In [None]:
#Librerias
import dask.dataframe as dd
import pandas as pd
import dask.array as da
import dask_ml.preprocessing as dask_pre
from dask_ml.model_selection import train_test_split
import plotly.express as px
import warnings
warnings.filterwarnings("ignore", category=FutureWarning)

In [None]:
ddf = dd.read_csv("Traffi.csv")
ddf

Unnamed: 0_level_0,Time,Date,CarCount,BikeCount,BusCount,TruckCount,Total,Day,Traffic_Situation
npartitions=1,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1,Unnamed: 7_level_1,Unnamed: 8_level_1,Unnamed: 9_level_1
,float64,int64,int64,int64,int64,int64,int64,int64,int64
,...,...,...,...,...,...,...,...,...


Como se mencionó anteriormente usaremos una base de datos “traffic.csv” usada en el taller de machine learning

# Exploración de datos con Spark


In [None]:
ddf.head(10)


Unnamed: 0,Time,Date,Day of the week,CarCount,BikeCount,BusCount,TruckCount,Total,Traffic Situation
0,12:00:00 AM,10,Tuesday,31,0,4,4,39,low
1,12:15:00 AM,10,Tuesday,49,0,3,3,55,low
2,12:30:00 AM,10,Tuesday,46,0,3,6,55,low
3,12:45:00 AM,10,Tuesday,51,0,2,5,58,low
4,1:00:00 AM,10,Tuesday,57,6,15,16,94,normal
5,1:15:00 AM,10,Tuesday,44,0,5,4,53,low
6,1:30:00 AM,10,Tuesday,37,0,1,4,42,low
7,1:45:00 AM,10,Tuesday,42,4,4,5,55,low
8,2:00:00 AM,10,Tuesday,51,0,9,7,67,low
9,2:15:00 AM,10,Tuesday,34,0,4,7,45,low


In [None]:
print(type(ddf.head(10)))

<class 'pandas.core.frame.DataFrame'>


In [None]:
print(ddf.shape[0].compute(), ddf.shape[1])

2976 9


In [None]:
print(ddf.columns)

Index(['Time', 'Date', 'Day of the week', 'CarCount', 'BikeCount', 'BusCount',
       'TruckCount', 'Total', 'Traffic Situation'],
      dtype='object')


In [None]:
print(ddf.dtypes)

Time                 object
Date                  int64
Day of the week      object
CarCount              int64
BikeCount             int64
BusCount              int64
TruckCount            int64
Total                 int64
Traffic Situation    object
dtype: object


In [None]:
print(ddf.info())

<class 'dask.dataframe.core.DataFrame'>
Columns: 9 entries, Time to Traffic Situation
dtypes: object(3), int64(6)None


Al observar de que forma están compuestos los datos, podemos darnos cuenta en primer lugar que no hay datos faltantes. En segundo lugar podemos darnos cuenta que hay dos variables que son discretas: "Traffic situation" y "Day of the Week". También tenemos un varaible que aunque esta códificada como entero en realidad es de tipo fecha, la cual es "Time".

Debido a la anterior, más adelante debemos transformar los datos a niveles que puedan ser procesados por los modelos.

In [None]:
desc = ddf.describe(include="all")
desc.compute()

Unnamed: 0,Time,Date,Day of the week,CarCount,BikeCount,BusCount,TruckCount,Total,Traffic Situation
unique,96,,7,,,,,,4
count,2976,2976.0,2976,2976.0,2976.0,2976.0,2976.0,2976.0,2976
top,10:00:00 AM,,Thursday,,,,,,normal
freq,31,,480,,,,,,1669
mean,,16.0,,68.696573,14.917339,15.27957,15.324933,114.218414,
std,,8.945775,,45.850693,12.847518,14.341986,10.603833,60.190627,
min,,1.0,,6.0,0.0,0.0,0.0,21.0,
25%,,8.0,,19.0,5.0,1.0,6.0,55.0,
50%,,16.0,,64.0,12.0,12.0,14.0,109.0,
75%,,24.0,,107.0,22.0,25.0,23.0,164.0,



##Datos nulos




In [None]:
print(ddf.isnull().any().compute())

Time                 False
Date                 False
Day of the week      False
CarCount             False
BikeCount            False
BusCount             False
TruckCount           False
Total                False
Traffic Situation    False
dtype: bool


# Entendimiento de los datos a partir de un análisis gráfico


En esta sección se pretende darle una mirada gráfica a los datos, con el objetivo de poder entender la forma en que se comportan los datos y poder extraer posible información relevante que nos ayude en el momento de obtener los resultados de los modelos, a poder contrastar la información obetenida en este punto con la que obtendremos más adelante.

In [None]:
fig = px.bar(ddf.compute(), x=ddf["Total"], y=ddf["Day of the week"], color="Traffic Situation", title="Trafico por día")
fig.show()

Aquí pasa algo muy interesante. Podemos darnos cuenta que la situación del tráfico no depende en una forma deterministica de la cantidad de automotores circulando en la vía. Es decir, podemos fijarnos en la cantidad exacta de automotores en la cual el tráfico pasa de estar en un clase a otra (low, normal, heavy, high), la cual es diferente para cada día.

Lo anterior puede deberse a que la diferencia que puede tener un carro, entre un bus o un camión, puede tener incidencia en el tráfico que puede ser generado. Esto es importante tenerlo en cuenta, porque en el momento que deseemos crear, evaluar y predecir nuestro modelo, podemos tener una idea sobre cuales son las posilbes variables que más lo están afectando.

In [None]:
print(ddf['Date'].value_counts().compute())

1     96
17    96
30    96
29    96
28    96
27    96
26    96
25    96
24    96
23    96
22    96
21    96
20    96
19    96
18    96
16    96
2     96
15    96
14    96
13    96
12    96
11    96
10    96
9     96
8     96
7     96
6     96
5     96
4     96
3     96
31    96
Name: Date, dtype: int64


In [None]:
dia_10 = ddf[ddf['Date'] == 10].compute()

In [None]:
dias_unicos = ddf['Date'].unique().compute()
dias_unicos

0     10
1     11
2     12
3     13
4     14
5     15
6     16
7     17
8     18
9     19
10    20
11    21
12    22
13    23
14    24
15    25
16    26
17    27
18    28
19    29
20    30
21    31
22     1
23     2
24     3
25     4
26     5
27     6
28     7
29     8
30     9
Name: Date, dtype: int64

In [None]:
for dia in dias_unicos:
    dia_data = ddf[ddf['Date'] == dia].compute()

    dia_semana = dia_data['Day of the week'].iloc[0]  # Obtiene el valor de 'Day of the week' para el día actual

    fig = px.line(dia_data, x='Time', y='Total', title=f'Gráfico para el Día {dia}, {dia_semana}')
    fig.show()

En esta ocasión, podemos darnos cuenta de que el tráfico no se comporta de forma constante a lo largo de los días. Esto no es algo nuevo, ya que todos podemos intuir que cada día no es para nada igual al siguiente. Lo que si es importante, es ver que los días como viernes o sabádos tienen un comportamiento similar, esto puede deberse al comportamiento social ya que días como viernes o sabádos, son días de alta fricción social sobretodo en horas de la tarde hasta la madrugada.

# Procesamiento de los datos


# Construcción del modelo

En este apartado, modificaremos todos los datos necesarios de nuestro data frame para posteriormente ser procesados y modelados.

Además, implentaremos las tecnicas de spark para observar cuál es el efecto del tiempo sobre el procesamiento.


In [None]:
#Construcción del modelo
columnas_ = ['Time', 'Date', 'CarCount', 'BikeCount', 'BusCount', 'TruckCount',
              'Total', 'Day']
columnas2 = ["Traffic_Situation"]
X = ddf.loc[:, columnas_]
y = ddf.loc[:, columnas2]
X_train, X_test, y_train, y_test = train_test_split(X, y,
                                                    test_size=0.3,
                                                    random_state=42,
                                                    shuffle=False)

X_train_arr = X_train.to_dask_array(lengths=True)
y_train_arr = y_train.to_dask_array(lengths=True)
X_test_arr = X_test.to_dask_array(lengths=True)
y_test_arr = y_test.to_dask_array(lengths=True)

In [None]:

y_train_arr = y_train_arr.reshape(-1)

In [None]:
y_test_arr = y_test_arr.reshape(-1)
y_test_arr

Unnamed: 0,Array,Chunk
Bytes,7.15 kiB,7.15 kiB
Shape,"(915,)","(915,)"
Dask graph,1 chunks in 5 graph layers,1 chunks in 5 graph layers
Data type,int64 numpy.ndarray,int64 numpy.ndarray
"Array Chunk Bytes 7.15 kiB 7.15 kiB Shape (915,) (915,) Dask graph 1 chunks in 5 graph layers Data type int64 numpy.ndarray",915  1,

Unnamed: 0,Array,Chunk
Bytes,7.15 kiB,7.15 kiB
Shape,"(915,)","(915,)"
Dask graph,1 chunks in 5 graph layers,1 chunks in 5 graph layers
Data type,int64 numpy.ndarray,int64 numpy.ndarray


In [None]:
X_train_arr.shape

(2061, 8)

In [None]:
from dask_ml.linear_model import LogisticRegression
from dask_ml.model_selection import GridSearchCV
from sklearn.metrics import f1_score, precision_score, recall_score

In [None]:
lr = LogisticRegression()
lr.get_params()

{'C': 1.0,
 'class_weight': None,
 'dual': False,
 'fit_intercept': True,
 'intercept_scaling': 1.0,
 'max_iter': 100,
 'multi_class': 'ovr',
 'n_jobs': 1,
 'penalty': 'l2',
 'random_state': None,
 'solver': 'admm',
 'solver_kwargs': None,
 'tol': 0.0001,
 'verbose': 0,
 'warm_start': False}

In [None]:
param_grid_lr = {
    'C': [0.001, 0.01, 0.1, 1, 10, 100],
    'solver': ['newton', 'admm', 'gradient_descent', 'lbfgs', 'proximal_grad']
}

In [None]:
grid_search_lr = GridSearchCV(estimator=lr, param_grid=param_grid_lr, cv=5)
grid_search_lr.fit(X_train_arr, y_train_arr)


overflow encountered in exp


overflow encountered in exp


overflow encountered in exp


overflow encountered in exp


overflow encountered in exp


overflow encountered in exp


overflow encountered in exp


overflow encountered in exp


overflow encountered in exp


overflow encountered in exp


overflow encountered in exp


overflow encountered in exp


overflow encountered in exp


overflow encountered in exp


overflow encountered in exp


overflow encountered in exp


overflow encountered in exp


overflow encountered in exp


overflow encountered in exp


overflow encountered in exp


overflow encountered in exp


overflow encountered in exp


Configuration key "fuse_ave_width" has been deprecated. Please use "optimization.fuse.ave-width" instead


overflow encountered in exp


Configuration key "fuse_ave_width" has been deprecated. Please use "optimization.fuse.ave-width" instead


overflow encountered in exp


Configuration key "fuse_ave_width" has been deprecated. Please us

In [None]:
grid_search_lr.best_params_

{'C': 100, 'solver': 'lbfgs'}

In [None]:
grid_search_lr.best_score_

0.3158660844250364

In [None]:
prediction_lr = grid_search_lr.predict(X_test_arr)


#SVC



In [None]:
import dask_ml.model_selection as dcv
from dask_ml.wrappers import ParallelPostFit
from sklearn.svm import SVC
m_svc = SVC()
m_svc.get_params()

{'C': 1.0,
 'break_ties': False,
 'cache_size': 200,
 'class_weight': None,
 'coef0': 0.0,
 'decision_function_shape': 'ovr',
 'degree': 3,
 'gamma': 'scale',
 'kernel': 'rbf',
 'max_iter': -1,
 'probability': False,
 'random_state': None,
 'shrinking': True,
 'tol': 0.001,
 'verbose': False}

In [None]:
param_grid = {'C': [10,20,30],
              'kernel': ['linear'],
              'gamma': [1000,1500,2000]}

grid_clf = GridSearchCV(m_svc,
                        param_grid=param_grid,
                        return_train_score=True)


grid_clf.fit(X_train_arr, y_train_arr)

In [None]:
grid_clf.best_params_

{'C': 10, 'gamma': 1000, 'kernel': 'linear'}

In [None]:
grid_clf.best_score_

0.8874332848131975

In [None]:
prediction_svc = grid_clf.predict(X_test_arr)

# Random Forest

In [None]:
from sklearn.ensemble import RandomForestClassifier
rf = RandomForestClassifier()
rf.get_params()

{'bootstrap': True,
 'ccp_alpha': 0.0,
 'class_weight': None,
 'criterion': 'gini',
 'max_depth': None,
 'max_features': 'sqrt',
 'max_leaf_nodes': None,
 'max_samples': None,
 'min_impurity_decrease': 0.0,
 'min_samples_leaf': 1,
 'min_samples_split': 2,
 'min_weight_fraction_leaf': 0.0,
 'n_estimators': 100,
 'n_jobs': None,
 'oob_score': False,
 'random_state': None,
 'verbose': 0,
 'warm_start': False}

In [None]:
param_grid = {
    'n_estimators': [300, 400, 500],
}


grid_search_2 = GridSearchCV(estimator=rf,
                           param_grid=param_grid,
                           cv=5)

grid_search_2.fit(X_train_arr, y_train_arr)

In [None]:
grid_search_2.best_params_

{'n_estimators': 300}

In [None]:
grid_search_2.best_score_

0.9917515769044153

In [None]:
prediction_RF = grid_search_2.predict(X_test_arr)
prediction_RF

array([3, 2, 2, 2, 2, 2, 3, 3, 3, 0, 0, 0, 0, 1, 3, 3, 3, 3, 1, 2, 3, 3,
       3, 0, 0, 0, 0, 3, 1, 3, 1, 3, 3, 3, 3, 3, 2, 3, 3, 3, 3, 3, 0, 0,
       0, 1, 3, 3, 3, 3, 3, 1, 2, 1, 3, 3, 3, 3, 3, 3, 3, 2, 3, 3, 3, 3,
       3, 3, 3, 0, 1, 3, 3, 3, 3, 1, 3, 3, 3, 2, 3, 3, 3, 3, 2, 3, 3, 2,
       3, 2, 3, 2, 3, 0, 0, 0, 0, 0, 0, 0, 0, 0, 3, 2, 2, 2, 3, 3, 2, 3,
       3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 0, 0, 0, 0, 1, 3, 3, 3, 2, 3, 3, 3,
       3, 3, 0, 3, 2, 3, 1, 3, 3, 3, 3, 3, 3, 2, 3, 3, 2, 0, 1, 3, 0, 0,
       1, 3, 0, 0, 0, 1, 3, 1, 3, 3, 3, 3, 2, 3, 3, 3, 3, 3, 2, 3, 1, 3,
       0, 0, 0, 3, 3, 3, 1, 2, 2, 3, 1, 2, 0, 0, 1, 3, 1, 1, 2, 2, 3, 3,
       3, 3, 3, 3, 0, 0, 1, 0, 1, 3, 3, 3, 3, 3, 3, 1, 1, 0, 1, 0, 0, 3,
       3, 3, 3, 3, 3, 3, 3, 2, 3, 3, 3, 3, 3, 3, 3, 3, 0, 0, 3, 3, 3, 3,
       3, 3, 3, 0, 0, 0, 0, 0, 3, 1, 3, 3, 2, 3, 3, 3, 3, 3, 1, 1, 0, 0,
       0, 0, 0, 2, 3, 3, 3, 3, 3, 2, 3, 3, 3, 0, 0, 0, 0, 1, 3, 3, 1, 3,
       3, 3, 2, 3, 3, 3, 2, 3, 3, 2, 3, 2, 0, 0, 0,

In [None]:
from sklearn.metrics import f1_score, precision_score, recall_score

print(f"Precisión ponderada de LR: {precision_score(y_test_arr, prediction_lr, average='weighted'):.4f}")
print(f"Recall ponderada de LR: {recall_score(y_test, prediction_lr, average='weighted'):.4f}")
print(f"Precisión ponderada de LR: {precision_score(y_test_arr, prediction_lr, average='weighted'):.4f}\n")

print(f"Precisión ponderada de RF: {precision_score(y_test_arr, prediction_RF, average='weighted'):.4f}")
print(f"Recall ponderada de RF: {recall_score(y_test_arr, prediction_RF, average='weighted'):.4f}")
print(f"Precisión ponderada de RF: {precision_score(y_test_arr, prediction_RF, average='weighted'):.4f}\n")

print(f"Precisión ponderada de SVC: {precision_score(y_test_arr, prediction_svc, average='weighted'):.4f}")
print(f"Recall ponderada de SVC: {recall_score(y_test_arr, prediction_svc, average='weighted'):.4f}")
print(f"F1 ponderada de SVC: {f1_score(y_test_arr, prediction_svc, average='weighted'):.4f}")



Precision is ill-defined and being set to 0.0 in labels with no predicted samples. Use `zero_division` parameter to control this behavior.



Precisión ponderada de LR: 0.2205
Recall ponderada de LR: 0.3060
Precisión ponderada de LR: 0.2205




Precision is ill-defined and being set to 0.0 in labels with no predicted samples. Use `zero_division` parameter to control this behavior.



Precisión ponderada de RF: 0.9967
Recall ponderada de RF: 0.9967
Precisión ponderada de RF: 0.9967

Precisión ponderada de SVC: 0.9243
Recall ponderada de SVC: 0.9257
F1 ponderada de SVC: 0.9247


# Metricas.

Al momento de verificar las metricas resultante de los modelos, podemos observar que los resultados entre el modelo random forest y logistic regression son identicos. A diferencia de los resultados para el modelo de support vector machine que es minimamente más bajo.

Ahora, al enfocarnos en el tiempo que se demoran en correr los modelos, vemos un resultado increíblemente positivo. Para que el lector tenga un contexto de lo dicho anteriormente, en el taller de machine learning al correr el modelo SVC se demoró 4 horas en calibrarse a diferencia de ahora que gracias a Dask tan solo se demoró 6 minutos. Un resultado increíble