## Paralelizar el entrenamiento

La idea es dividir los datos por países y entrenar cada grupo de países en paralelo. Si por ejemplo tienes 100 países, divides los datos en 4 grupos de países y entrenas todos los modelos para todos los productos de cada grupo en paralelo. 

A cada grupo se le asigna un core. La clave es encontrar el número óptimo de grupos y cores ya que no siempre aumentando el número de grupos y cores el entrenamiento va a ir más rápido (puede incluso ir bastante más lento), es un equilibrio entre tener grupos suficientemente grandes y añadir más cores para paralelizarlo más.


He hecho algunas pruebas con varios escenarios para que te hagas una idea de qué puede funcionar mejor dependiendo del volumen de datos que tengas:

- 50 países / 36 productos por país -> 6 grupos/cores.
- 50 países / 50 productos por país -> 6 grupos/cores, aunque también se podrían considerar 8.
- 70 países / 500 productos por país -> 8 grupos/cores.
- 100 países / 100 productos por país -> 10 grupos/cores

Las pruebas las he hecho con los siguientes modelos: LinearRegression, RandomForestRegressor (este es el modelo más lento con diferencia), GradientBoostingRegressor, XGBRegressor, LGBMRegressor.

Ten en cuenta que si tienes un volumen de datos todavía mayor (+100 países / +100 productos por país) puede que usar más de 10 cores sea factible, pero en mi PC no lo he podido probar.

Para paralelizar el entrenamiento puedes usar:

- **Multiprocessing**: https://docs.python.org/3.9/library/multiprocessing.html
- **Joblib**: https://joblib.readthedocs.io/en/latest/parallel.html

En este caso he optado por usar joblib, ya que es más sencillo de usar (acuérdate de `pip install joblib`).

In [6]:
import numpy as np
import pandas as pd
from joblib import Parallel, delayed
from lightgbm import LGBMRegressor
from pandas.tseries.offsets import MonthBegin
from skforecast.ForecasterAutoreg import ForecasterAutoreg
from sklearn.ensemble import RandomForestRegressor, GradientBoostingRegressor
from sklearn.linear_model import LinearRegression
from xgboost import XGBRegressor

Esta es la función de entrenamiento. Le pasas la lista de modelos y el subset de los datos de cada grupo de países. Los datos se agrupan por país y producto y se itera por cada modelo y grupo, añadiendo las predicciones a un diccionario que luego se convierte en DataFrame.

In [10]:
def train_predict(models: list, data: pd.DataFrame) -> pd.DataFrame:
    """
    Trains multiple models on the given data and makes predictions.
    The data is grouped by country and product, and each model is trained on each group.
    The predictions are stored in a DataFrame along with the model name, country, product,
    and prediction date.

    Args:
        models: A list of models to be trained.
        data: A DataFrame containing the data to train the models on.

    Returns:
        A DataFrame containing the predictions.
    """
    # Predecimos 6 meses en el futuro
    prediction_horizon = 6

    # Creamos un diccionario para guardar las predicciones.
    predictions = {
        "country": [],
        "product": [],
        "model": [],
        "date": [],
        "prediction": [],
    }

    # Agrupamos los datos por país y producto para luego iterar sobre los grupos.
    grouped = data.groupby(["country", "product"])

    # Iteramos sobre los modelos y los grupos de datos, entrenamos cada modelo para cada grupo.
    # Luego hacemos las predicciones y las guardamos en el diccionario.
    for model in models:
        for (country, product), group in grouped:
            forecaster = ForecasterAutoreg(regressor=model, lags=4)
            forecaster.fit(y=group["sales"])
            predictions = forecaster.predict(steps=prediction_horizon)

            # Generamos las fechas de las predicciones
            prediction_dates = [
                group["date"].max() + MonthBegin(i)
                for i in range(1, prediction_horizon + 1)
            ]

            for date, predicted_sales in zip(prediction_dates, predictions):
                predictions["country"].append(country)
                predictions["product"].append(product)
                predictions["model"].append(type(model).__name__)
                predictions["date"].append(date)
                predictions["prediction"].append(predicted_sales)

    # Convertimos el diccionario en un DataFrame
    predictions_dataframe = pd.DataFrame(predictions)

    return predictions_dataframe

Voy a suponer que tienes los datos de todos los países y productos en el mismo DataFrame

Esta es la función en la que se ejecuta `train_predict()` de forma paralela. Tienes dos opciones:

- **Definir manualmente los grupos**: lo ideal es que los grupos estén lo más balanceados posible en términos de volumen de datos, por lo que si sabes que hay países que tienen más productos, puedes pensar en qué países agrupar juntos para que todos los grupos tengan más o menos el mismo número de datos. Ten en cuenta que los grupos tampoco pueden ser muy pequeños porque entonces puede que no merezca la pena paralelizar. En este caso le pasamos una lista de DataFrames, en el que cada DataFrame es un grupo que ya has creado previamente (o puedes dividirlos dentro de este función, como veas).

In [11]:
def parallel_train_manual(input_data: list[pd.DataFrame], models: list, n_cores: int) -> pd.DataFrame:
    """Trains models on multiple dataframes in parallel using the specified number of cores.

    Args:
        input_data: A list of DataFrames to train the models on.
        models: A list of models to be trained.
        n_cores: The number of cores to use for parallel processing.

    Returns:
        A DataFrame containing the predictions from all models and dataframes.

    Raises:
        AssertionError: If the number of dataframes does not match the number of cores.
    """
    # Primero comprobamos que el número de dataframes sea igual al número de cores.
    assert len(input_data) == n_cores, "The number of dataframes must match the number of cores."

    # Luego usamos la Parallel de joblib para entrenar los modelos en paralelo.
    predictions = Parallel(n_jobs=n_cores)(delayed(train_predict)(models, data) for data in input_data)

    # Finalmente concatenamos los resultados en un solo DataFrame.
    predictions_dataframe = pd.concat([result for result in predictions])
    return predictions_dataframe

- **Dividir los grupos en base al número de cores que quieres utilizar**: esta opción es más "automática" pero también puede ser más problemática ya que pueden crearse grupos demasiado pequeños. Si siempre tienes los mismos países y casi todos los países tienen más o menos el mismo volumen de datos, puedes decantarte por esta opción. En este caso se crean los grupos en base al número de cores que definas, simplemente dividiendo la lista de países entre el número de cores y creando una lista de dataframes filtrados por grupo.

In [12]:
def parallel_train_auto(input_data: pd.DataFrame, models: list, n_cores: int) -> pd.DataFrame:
    """
    Automatically groups the input data by country and trains models on these groups
    in parallel using the specified number of cores.

    Args:
        input_data: Input DataFrame to train the models on.
        models: A list of models to be trained.
        n_cores: The number of cores to use for parallel processing.

    Returns:
        A DataFrame containing the predictions from all models and groups.

    Raises:
        AssertionError: If the number of groups does not match the number of cores.
    """
    # Dividimos el DataFrame en grupos por país, en base al número de cores,
    # mapeamos cada país a un grupo para que todos los productos de un mismo país
    # siempre estén en el mismo grupo.
    country_list = input_data["country"].unique()
    country_to_group = {country: i % n_cores for i, country in enumerate(country_list)}
    input_data["group"] = input_data["country"].map(country_to_group)
    dataframes = [group.drop(columns="group") for _, group in input_data.groupby("group")]

    # Comprobamos que el número de grupos sea igual al número de cores.
    assert len(dataframes) == n_cores, "The number of dataframes must match the number of cores."

    # Luego usamos Parallel de joblib para entrenar los modelos en paralelo.
    predictions = Parallel(n_jobs=n_cores)(delayed(train_predict)(models, data) for data in dataframes)

    # Finalmente concatenamos los resultados en un solo DataFrame.
    predictions_dataframe = pd.concat([result for result in predictions])
    return predictions_dataframe

Mi recomendación es que inicialmente definas manualmente los grupos, puedes empezar con 2 o 4 grupos, agrupas los países para que queden más o menos balanceados, si te funciona bien puedes ir probando a hacer más grupos hasta que veas que ya no te merece la pena añadir más cores. 

Comprueba por si acaso cuántos cores tienes disponibles.

In [2]:
from joblib import cpu_count

print(f'Number of available cores: {cpu_count()}')

Number of available cores: 12


### Notas

- Lo ideal es que muevas el preprocesamiento de los datos fuera de la paralelización, realizando el procesamiento con `pyspark`, y en la función de entrenamiento ya metas los datos listos para entrenar.
- Para los modelos que acepten `n_jobs` es mejor fijarlo en 1 para evitar problemas cuando se usan muchos cores (sobretodo `lightgbm`).