In [None]:
# ******************************************
# Ionel Rodriguez 8-764-368
# Modelos Predictivos
# ******************************************

import pandas
import numpy as np
import seaborn as sns
import matplotlib.pyplot as plt
from statsmodels.tsa.seasonal import seasonal_decompose
from scipy import stats
import warnings
import math
import sys

from statsmodels.tsa.api import ExponentialSmoothing, Holt, SimpleExpSmoothing
from sklearn.linear_model import LinearRegression, PoissonRegressor
from sklearn.preprocessing import PolynomialFeatures
from sklearn import metrics as skm
from math import sqrt, pow
import itertools

sys.path.append('./.')
from lib import comun

warnings.filterwarnings('ignore')
df_main = comun.load_base_dataframe()

In [None]:
# Se realizan descomposiciones de series de tiempo para evaluar
# tendencias y estacionalidad a lo largo del período 2022 a 2024 para cada modelo de auto.

def seasonal_main():
	recap = []

	for id,m in enumerate(df_main.modelo.cat.categories):
		sub_df = df_main[df_main.modelo_id == id].copy()
		sub_df['periodo'] = sub_df.fecha.dt.to_period('M')
		sub_df = sub_df[['periodo', 'cantidad']].groupby('periodo').sum()
		sub_df.index = sub_df.index.to_timestamp()

		#sub_df.plot(kind='line', y='cantidad', xlabel='periodo',title='{}: Cantidad por mes'.format(m), figsize=(10,2))
		result = seasonal_decompose(sub_df.cantidad, model='add', period=12)
		
		attr = ['observed', 'trend', 'seasonal', 'resid']
		fig, ax = plt.subplots(len(attr), 1, sharex=True, figsize=(8,5))

		for idx, a in enumerate(attr):
			fax = ax[idx]
			s_attr = getattr(result, a)
			s_attr.plot(ax=fax, title=m if idx == 0 else None)
			fax.set(ylabel=a)
			fax.grid(True)

			if a in ('trend', ):
				df = s_attr.to_frame()[a]
				recap.append(dict(
					modelo=m,
					min=df.min(),
					min_fecha=df.idxmin(),
					max=df.max(),
					max_fecha=df.idxmax(),
					dif=df.max() - df.min(),
				))
		
		plt.show()
	
	display(pandas.DataFrame(recap))

seasonal_main()

In [None]:
# Evaluación de los modelos predictivos escogidos para el estudio. Se utiliza la librería
# statsmodels para los modelos de Suavización Exponencial, y sklearn para las Regresiones.
# Adicionalmente se calculan los coeficientes y errores que se utilizarán para determinar
# qué modelo tiene mejor desempeño.

def best_exp_smooth(train):
	params = itertools.product(
		[dict(trend=v) for v in ('add', 'mul', None)],
		[dict(seasonal=v) for v in ('add', 'mul', None)],
		[dict(damped_trend=v) for v in (True, False)],
		[dict(use_boxcox=v) for v in (True, False)],
		[dict(initialization_method=v) for v in (None, 'estimated', 'heuristic', 'legacy-heuristic')],

		[dict(optimized=v) for v in (True, False)],
		[dict(remove_bias=v) for v in (True, False)],
	)

	best_mad = float('inf')
	best_params = None
	
	for t in params:
		instance_params = dict([(k,v) for o in t[0:5] for k,v in o.items()])
		fit_params = dict([(k,v) for o in t[5:] for k,v in o.items()])

		try:
			model = ExponentialSmoothing(train.cantidad, seasonal_periods=12, **instance_params)
			fit = model.fit(smoothing_level=0.1, smoothing_trend=0, **fit_params)
			
			dat = fit.predict(start=train.index[0], end=train.index[-1])
			MAD = skm.mean_absolute_error(train.cantidad.tolist(), dat.tolist())
		
		except:
			continue
		
		if MAD < best_mad:
			best_mad = MAD
			best_params = (instance_params, fit_params)
	
	return best_params
def model_exp_smooth(df):
	max_train = int(len(df) * 0.9)
	train = df.loc[:max_train]
	
	best_params = best_exp_smooth(train)
	if not best_params: best_params = [dict(seasonal='mul', trend='add', damped_trend=True), dict()]

	model = ExponentialSmoothing(train.cantidad, **best_params[0], seasonal_periods=12)
	fit = model.fit(smoothing_level=0.1, smoothing_trend=0, **best_params[1])

	return fit.fittedvalues

def model_simple_exp_smooth(df):
	fit = SimpleExpSmoothing(df.cantidad).fit(smoothing_level=0.9, optimized=True)
	return fit.fittedvalues[1:]

def model_holt(df):
	fit = Holt(df.cantidad).fit(smoothing_level=0.9, smoothing_trend=0.2, optimized=False)
	return fit.fittedvalues[1:]

def model_linear_regression(df):
	x = df.index.to_frame()
	y = df['cantidad']

	model = LinearRegression()
	model.fit(x, y)
	
	return model.predict(x)

def model_poly_regression(df, grado=3):
	x = df.index.to_frame()
	y = df['cantidad']

	feat = PolynomialFeatures(degree=grado)
	x_poly = feat.fit_transform(x)
	
	model = LinearRegression()
	model.fit(x_poly, y)

	x = feat.fit_transform(x)
	return model.predict(x)

def model_poisson(df):
	model = PoissonRegressor()
	x = df.index.to_frame()
	y = df['cantidad']
	
	model.fit(x,y)
	return model.predict(x)




def build_models(only_first=False):
	all_models = [
		(model_poisson, 'Poisson'),
		(model_linear_regression, 'Regresión Lineal'),
		(model_poly_regression, 'Regresión Polinomial (grado 3)', 3),
		(model_poly_regression, 'Regresión Polinomial (grado 9)', 9),
		(model_simple_exp_smooth, 'Exp. Suavizada Simple'),
		(model_holt, 'Holt'),
		(model_exp_smooth, 'Exp. Suavizada'),
	]

	all_df_errors = []

	for idx, m in enumerate(df_main.modelo.cat.categories):
		sub_df = df_main[df_main.modelo_id == idx].copy()
		sub_df.reset_index(drop=True, inplace=True)
		
		xplots = math.ceil(len(all_models)/2)
		fig, ax = plt.subplots(xplots, 2, sharex=True, figsize=(10,8))
		fig.delaxes(ax[-1][-1])
		ax = ax.flatten()
		fig.tight_layout()
		
		all_errors = []
		all_model_names = []
		dat_real = sub_df.cantidad.tolist()

		for axi, (fn, name, *rest) in enumerate(all_models):
			dat_fitted = fn(sub_df, *rest)

			all_errors.append(comun.build_err_details(dat_real[0:len(dat_fitted)], dat_fitted))
			all_model_names.append(name)
			
			fax = ax[axi]
			
			fax.plot(dat_real, color='red', label='Observado')
			fax.plot(sub_df.index[0:len(dat_fitted)], dat_fitted, label=name, linestyle='dashed')
			fax.set(title='{} - {}'.format(m, name), ylabel='cantidad')
			fax.grid()
		
		df_err = pandas.DataFrame(all_errors, index=all_model_names)
		all_df_errors.append((m, df_err))

		plt.subplots_adjust(hspace=0.5)
		plt.xlim((0, len(sub_df.index)-1))
		#fig.legend(loc='lower right')
		plt.show()
		
		if only_first: break
	
	from IPython.core.display import HTML
	a = HTML(''.join(['<h3>{0}</h3>{1}Tabla {2}. Errores de regresión para modelos predictivos de {0}'.format(m, df.to_html().replace('\n', '').replace('border="1"', ''), i) for i,(m, df) in enumerate(all_df_errors, 3)]))
	display(a)

build_models()


In [None]:
# Se utiliza la librería lazypredict para evaluar más de 40 modelos predictivos en un solo paso.
# La librería no tiene mucho espacio para configuraciones, y el resultado es un listado de los
# coeficientes y errores obtenidos para cada modelo evaluado. Solo se utiliza como referencia.

def lazypredict_main():
	from lazypredict.Supervised import LazyRegressor
	selected_models = []

	for idx, m in enumerate(df_main.modelo.cat.categories):
		sub_df = df_main[df_main.modelo_id == idx].copy()
		sub_df.reset_index(drop=True, inplace=True)
		
		x = sub_df.index.to_frame()
		offset = 24

		X_train, y_train = x[:offset], sub_df.cantidad[:offset]
		X_test, y_test = x[offset:], sub_df.cantidad[offset:]

		reg = LazyRegressor(verbose=0, ignore_warnings=True, custom_metric=None)
		models, predictions = reg.fit(X_train, X_test, y_train, y_test);
		
		selected_models.append((m, predictions.sort_values(by=['R-Squared'], ascending=False).iloc[0:6]))
	
	from IPython.core.display import HTML
	return HTML('<br><br>'.join(['<h3>{0}</h3>{1}Tabla {2}. Mejores modelos predictivos según R^2 de {0}'.format(m, df.to_html().replace('\n', '').replace('border="1"', ''), i) for i,(m, df) in enumerate(selected_models, 11)]))

from IPython.utils import io as ipio

with ipio.capture_output() as captured: a = lazypredict_main()
display(a)