In [49]:
import numpy as np
import pandas as pd

from sklearn.base import BaseEstimator, TransformerMixin
from sklearn.compose import make_column_selector, ColumnTransformer
from sklearn.impute import SimpleImputer
from sklearn.linear_model import LinearRegression
from sklearn.pipeline import FeatureUnion, Pipeline
from sklearn.preprocessing import FunctionTransformer, OneHotEncoder

from statsmodels.tsa.deterministic import CalendarFourier, DeterministicProcess

# Functions for loading CSV files as DataFrames:
Each of these functions below reads the data from a CSV file, processes it (e.g., parsing dates, setting index columns), and returns a Pandas DataFrame containing the data for further analysis. These functions provide a convenient way to load and prepare various types of data for analysis in a structured format.







In [50]:
def load_train_df(filepath, onpromotion=False) -> pd.DataFrame:
    if onpromotion:
        _usecols = ["id", "store_nbr", "family", "date", "sales", "onpromotion"]
    else:
        _usecols = ["id", "store_nbr", "family", "date", "sales"]

    df = (
        pd.read_csv(filepath, usecols=_usecols, parse_dates=["date"], index_col=["store_nbr", "family", "date"],
                    dtype={"store_nbr": np.uint8,"sales": np.float32,})
        .sort_index(axis=0)
        .sort_index(axis=1))
    return df


def load_test_df(filepath, onpromotion=False) -> pd.DataFrame:
    if onpromotion:
        _usecols = ["id", "store_nbr", "family", "date", "onpromotion"]
    else:
        _usecols = ["id", "store_nbr", "family", "date"]

    df = (
        pd.read_csv(filepath, usecols=_usecols, parse_dates=["date"], index_col=["store_nbr", "family", "date"],
                       dtype={"store_nbr": np.uint8,"sales": np.float32,})
        .sort_index(axis=0)
        .sort_index(axis=1))
    return df

def load_transactions_df(filepath) -> pd.DataFrame:
    df = (
        pd.read_csv(filepath, parse_dates=["date"],
                    dtype={"store_nbr": np.uint8,"transactions": np.float32,})
        .pipe(lambda df: df.assign(date=df.loc[:, "date"].dt.to_period('D'))).set_index(["store_nbr", "date"])
        .sort_index(axis=0)
        .sort_index(axis=1))
    return df


def load_oil_df(filepath) -> pd.DataFrame:
    df = (
        pd.read_csv(filepath, parse_dates=["date"],index_col=["date"],
                    dtype={"dcoilwtico": np.float32,})
        .fillna(method="bfill")   # fills value for 2013-01-01 only!
        .sort_index(axis=0))
    return df


def load_stores_df(filepath) -> pd.DataFrame:
    df = (
        pd.read_csv(filepath, index_col=["store_nbr"],
                    dtype={"store_nbr": np.uint8,"cluster": np.uint8,})
        .rename(columns={"type": "store_type", "cluster": "store_cluster"})
        .sort_index(axis=0)
        .sort_index(axis=1))
    return df


def load_holidays_events_df(filepath) -> pd.DataFrame:
    df = (
        pd.read_csv(filepath, parse_dates=["date"],)
        .sort_index(axis=0)
        .sort_index(axis=1)
    )
    return df

# **Data Preprocessing Pipeline**

***Pipeline for cleaning the training data:***
The quality of the data prior to mid-2015 is poor for many of the data series so I decided to drop it; many of the remaining series have significant numbers of leading zero values for sales. These leading zero sales values are better coded as missing values. Replace all the leading zero values for sales and then drop all the missing values.

In [51]:
def _all_false_mask(df):
    mask_df = pd.DataFrame(False, columns=df.columns, index=df.index)
    return mask_df


def _create_leading_zero_sales_mask(df):
    mask_df = _all_false_mask(df)
    sales_all_zero = df.loc[:, 'sales'].eq(0).all()
    if not sales_all_zero:
        mask_df['sales'] = df.loc[:, 'sales'].eq(0).cummin()
    return mask_df


def _drop_leading_zero_sales(df):
    masked_df = _mask_leading_zero_sales(df)
    without_leading_zeros_df = (masked_df.dropna(how="any", subset=["sales"], axis=0))
    return without_leading_zeros_df


def _mask_leading_zero_sales(df):
    cond = df.groupby(level=["store_nbr", "family"], group_keys=False).apply(lambda df: _create_leading_zero_sales_mask(df))
    masked_df = df.mask(cond, np.nan)
    return masked_df


def _use_only_recent_data(df, start):
    recent_df = df.loc[pd.IndexSlice[:, :, start:]]
    return recent_df

def _make_data_cleaning_pipeline(start, verbose=True) -> Pipeline:
    pipeline = Pipeline(
        steps=[
            (f"Use only data after {start}", FunctionTransformer(func=_use_only_recent_data, kw_args={"start": start})),
            ("Drop leading zero sales values", FunctionTransformer(func=_drop_leading_zero_sales,)),],
        verbose=verbose,).set_output(transform="pandas")

    return pipeline

# _make_data_cleaning_pipeline(start, verbose=True):

 This function creates a data cleaning pipeline using scikit-learn's Pipeline class.
It includes two steps in the pipeline: The first step uses the _use_only_recent_data function to filter the DataFrame to include only data
after the start date. The second step uses the _drop_leading_zero_sales function to drop rows with leading zero sales values.
The verbose parameter controls whether the pipeline prints verbose information. The pipeline is configured to output the result as a Pandas DataFrame.


---


# Pipeline for joining the data sets


After cleaning the training data, join all the test.csv, stores.csv, oil.csv, and transactions.csv. Leave the holidays_events.csv alone for now as it will be added later during the feature engineering step.

In [52]:
def _load_and_join_oil_df(df, filepath):
    oil_df = load_oil_df(filepath)
    joined_df = (df.join(oil_df, how="left", on=["date"]).sort_index(axis=0).sort_index(axis=1))

    return joined_df


def _load_and_join_stores_df(df, filepath):
    stores_df = load_stores_df(filepath)
    joined_df = (df.join(stores_df, how="left", on=["store_nbr"]).sort_index(axis=0).sort_index(axis=1))
    return joined_df


def _load_and_concat_test_df(df, filepath, onpromotion):
    test_df = load_test_df(filepath, onpromotion)
    concat_df = (pd.concat([df, test_df], axis=0, sort=True,).sort_index(axis=0))
    return concat_df


def _load_and_join_transactions_df(df, filepath):
    transactions_df = load_transactions_df(filepath)
    joined_df = (df.join(transactions_df, how="left", on=["store_nbr", "date"]).sort_index(axis=0).sort_index(axis=1))
    return joined_df


def _make_oil_joining_pipeline(filepath, verbose):
    pipeline = Pipeline(
        steps=[
            ("Load and join the data", FunctionTransformer(func=_load_and_join_oil_df, kw_args={"filepath": filepath})),
            ("Impute missing values", ColumnTransformer(transformers=[("Forward fill missing dcoilwtico values",
                            FunctionTransformer(lambda df: df.fillna(method="ffill")),["dcoilwtico"]),],
                                                        n_jobs=1, remainder="passthrough",
                                                        verbose=verbose, verbose_feature_names_out=False)),],
        verbose=verbose)
    return pipeline


def _make_transactions_joining_pipeline(filepath, verbose):
    pipeline = Pipeline(
        steps=[
            ("Load and join the data", FunctionTransformer(func=_load_and_join_transactions_df, kw_args={"filepath": filepath}),),
            ("Impute missing values", ColumnTransformer(transformers=[("Fill missing transactions with zeros",
                            SimpleImputer(strategy="constant", fill_value=0.0), ["transactions"])],
                                                        n_jobs=1, remainder="passthrough",
                                                        verbose=verbose, verbose_feature_names_out=False,))],
        verbose=verbose)
    return pipeline


def _make_data_joining_pipeline(onpromotion, join_oil, join_transactions, verbose):

    steps = []
    if join_transactions:
        _pipeline = _make_transactions_joining_pipeline("transactions.csv", verbose)
        steps.append(("Load and join the transactions data", _pipeline))

    if join_oil:
        _pipeline = _make_oil_joining_pipeline("oil.csv", verbose)
        steps.append(("Load and join the oil price data", _pipeline))

    # step to concatenate the test data
    steps.append(("Load and concatenate the test data",
            FunctionTransformer(
                func=_load_and_concat_test_df,
                kw_args={
                    "filepath": "test.csv",
                    "onpromotion": onpromotion,})))

    # step to join the stores data
    steps.append(("Load and join the stores data",
            FunctionTransformer(
                func=_load_and_join_stores_df,
                kw_args={"filepath": "stores.csv"})))

    pipeline = Pipeline(steps, verbose=verbose).set_output(transform="pandas")

    return pipeline

# Full preprocessing pipeline

The full preprocessing pipeline combines the cleaning steps, the joining steps, with a final step which clips outliers.


In [53]:
def _clip_outliers(df, q):
    expanding_quantiles_df = (
        df.groupby(level=["store_nbr", "family"], group_keys=False)
          .apply(lambda _: _compute_expanding_quantiles(_, q))
    )
    mask = _all_false_mask(df)
    is_outlier = df.gt(expanding_quantiles_df).to_dict()
    cond = mask.assign(**is_outlier)
    clipped_df = df.mask(cond, expanding_quantiles_df)
    return clipped_df

def _compute_expanding_quantiles(df, q):
    expanding_quantiles = (
        df.expanding(min_periods=1)
          .quantile(q)
          .to_dict()
    )
    expanding_quantile_df = (
        df.assign(**expanding_quantiles)
          .astype(np.float32)
    )
    return expanding_quantile_df

# Transformers and Pipelines

def _make_clip_outliers_transformer(onpromotion, dcoilwtico, transactions, q, n_jobs, verbose):
    transformers = []

    if onpromotion:
        transformers.append(
            (
                "Clip onpromotion outliers",
                FunctionTransformer(
                    func=_clip_outliers,
                    kw_args={'q': q}
                ),
                ["onpromotion"]
            ))
    if dcoilwtico:
        transformers.append(
            ("Clip dcoilwtico outliers",
                FunctionTransformer(
                    func=_clip_outliers,
                    kw_args={'q': q}
                ),
                ["dcoilwtico"]
            ))
    if transactions:
        transformers.append(
            ("Clip transactions outliers",
                FunctionTransformer(
                    func=_clip_outliers,
                    kw_args={'q': q}
                ),
                ["transactions"]))

    transformer = ColumnTransformer(
        transformers,
        n_jobs=n_jobs,
        remainder="passthrough",
        verbose=verbose,
        verbose_feature_names_out=False
    ).set_output(transform="pandas")

    return transformer

def make_data_preprocessing_pipeline(onpromotion=False, join_oil=True, join_transactions=True,
                                     start="20150701", q=0.99, n_jobs=-1, verbose=True) -> Pipeline:
    pipeline = Pipeline(
        steps=[
            ("Clean the training data",
                _make_data_cleaning_pipeline(start, verbose)
            ),
            ("Join the additional data sets",
                _make_data_joining_pipeline(
                    onpromotion,
                    join_oil,
                    join_transactions,
                    verbose
                )),
            ("Clip outliers",
                _make_clip_outliers_transformer(
                    onpromotion,
                    join_oil,
                    join_transactions,
                    q,
                    n_jobs,
                    verbose
                )),
            (
                "Sort the columns",
                FunctionTransformer(
                    func=lambda df: df.sort_index(axis=1)
                )
            )
        ],
        verbose=verbose
    ).set_output(transform="pandas")

    return pipeline

def _load_and_join_oil_df(df, filepath):
    oil_df = load_oil_df(filepath)
    joined_df = (df.join(oil_df, how="left", on=["date"]).sort_index(axis=0).sort_index(axis=1))
    return joined_df

def _load_and_join_stores_df(df, filepath):
    stores_df = load_stores_df(filepath)
    joined_df = (df.join(stores_df, how="left", on=["store_nbr"]).sort_index(axis=0).sort_index(axis=1))
    return joined_df

def _load_and_concat_test_df(df, filepath, onpromotion):
    test_df = load_test_df(filepath, onpromotion)
    concat_df = (pd.concat([df, test_df], axis=0, sort=True,).sort_index(axis=0))
    return concat_df

def _load_and_join_transactions_df(df, filepath):
    transactions_df = load_transactions_df(filepath)
    joined_df = (df.join(transactions_df, how="left", on=["store_nbr", "date"]).sort_index(axis=0).sort_index(axis=1))
    return joined_df

# Pipelines for Data Joining

def _make_oil_joining_pipeline(filepath, verbose):
    pipeline = Pipeline(
        steps=[
            ("Load and join the data", FunctionTransformer(func=_load_and_join_oil_df, kw_args={"filepath": filepath})),
            ("Impute missing values", ColumnTransformer(transformers=[("Forward fill missing dcoilwtico values",
                FunctionTransformer(lambda df: df.fillna(method="ffill")),["dcoilwtico"]),],
                n_jobs=1, remainder="passthrough", verbose=verbose, verbose_feature_names_out=False)),
        ],
        verbose=verbose
    )
    return pipeline

def _make_transactions_joining_pipeline(filepath, verbose):
    pipeline = Pipeline(
        steps=[
            ("Load and join the data", FunctionTransformer(func=_load_and_join_transactions_df, kw_args={"filepath": filepath}),),
            ("Impute missing values", ColumnTransformer(transformers=[("Fill missing transactions with zeros",
                SimpleImputer(strategy="constant", fill_value=0.0), ["transactions"])],
                n_jobs=1, remainder="passthrough", verbose=verbose, verbose_feature_names_out=False,)
            )
        ],
        verbose=verbose
    )
    return pipeline

def _make_data_joining_pipeline(onpromotion, join_oil, join_transactions, verbose):
    steps = []

    if join_transactions:
        _pipeline = _make_transactions_joining_pipeline("transactions.csv", verbose)
        steps.append(("Load and join the transactions data", _pipeline))

    if join_oil:
        _pipeline = _make_oil_joining_pipeline("oil.csv", verbose)
        steps.append(("Load and join the oil price data", _pipeline))

    steps.append(("Load and concatenate the test data",
        FunctionTransformer(
            func=_load_and_concat_test_df,
            kw_args={"filepath": "test.csv", "onpromotion": onpromotion}
        )
    ))

    steps.append(("Load and join the stores data",
        FunctionTransformer(
            func=_load_and_join_stores_df,
            kw_args={"filepath": "stores.csv"}
        )
    ))

    pipeline = Pipeline(steps, verbose=verbose).set_output(transform="pandas")
    return pipeline




_clip_outliers(df, q): This function clips outliers in the input DataFrame df based on a quantile q. It first computes expanding quantiles for each group defined by the "store_nbr" and "family" columns and then masks the outliers in the original DataFrame with the corresponding quantile values.

_compute_expanding_quantiles(df, q): This function computes expanding quantiles for the input DataFrame df based on the specified quantile q. It calculates the quantiles in an expanding window fashion and returns a DataFrame with quantile values.

_make_clip_outliers_transformer(onpromotion, dcoilwtico, transactions, q, n_jobs, verbose): This function creates a transformer for clipping outliers in specific columns of a DataFrame. The columns to be processed are determined by the input parameters (onpromotion, dcoilwtico, transactions). For each specified column, it adds a transformer to clip outliers using the _clip_outliers function.

make_data_preprocessing_pipeline(onpromotion, join_oil, join_transactions, start, q, n_jobs, verbose): This is the main function that creates the overall data preprocessing pipeline. It consists of several steps:

"Clean the training data": Uses _make_data_cleaning_pipeline to clean and filter the data.
"Join the additional data sets": Uses _make_data_joining_pipeline to join additional data sets such as oil prices, transactions, and store information.
"Clip outliers": Uses _make_clip_outliers_transformer to clip outliers in specified columns.
"Sort the columns": Sorts the columns of the final DataFrame.
The pipeline is configured to output the final preprocessed DataFrame.



---

# Example usage


In [54]:
_train_df = load_train_df(
    "train.csv",
    onpromotion=True,
)
_pipeline = make_data_preprocessing_pipeline()
_preprocessed_df = _pipeline.fit_transform(_train_df)



[Pipeline]  (step 1 of 2) Processing Use only data after 20150701, total=   0.0s
[Pipeline]  (step 2 of 2) Processing Drop leading zero sales values, total=   2.6s
[Pipeline]  (step 1 of 4) Processing Clean the training data, total=   2.7s
[Pipeline]  (step 1 of 2) Processing Load and join the data, total=   5.1s
[ColumnTransformer]  (1 of 2) Processing Fill missing transactions with zeros, total=   0.0s
[ColumnTransformer] ..... (2 of 2) Processing remainder, total=   0.0s
[Pipeline]  (step 2 of 2) Processing Impute missing values, total=   0.0s
[Pipeline]  (step 1 of 4) Processing Load and join the transactions data, total=   5.1s
[Pipeline]  (step 1 of 2) Processing Load and join the data, total=   0.3s
[ColumnTransformer]  (1 of 2) Processing Forward fill missing dcoilwtico values, total=   0.0s
[ColumnTransformer] ..... (2 of 2) Processing remainder, total=   0.0s
[Pipeline]  (step 2 of 2) Processing Impute missing values, total=   0.0s
[Pipeline]  (step 2 of 4) Processing Load an



[Pipeline] ..... (step 3 of 4) Processing Clip outliers, total=  48.9s
[Pipeline] .. (step 4 of 4) Processing Sort the columns, total=   0.1s


Note that the future values for dcoilwtico, onpromotion, and transactions would not be available at the time that we need to generate our sales forecasts in a real forecasting setting. For the purposes of the competition we are given the values for dcoilwtico and onpromotion for the test window. We are not given the transactions values and will need to forecast them.

In [55]:
# values for dcoilwtico, onpromotion, and transactions are missing
_preprocessed_df.info()

<class 'pandas.core.frame.DataFrame'>
MultiIndex: 1353218 entries, (1, 'AUTOMOTIVE', Timestamp('2015-07-01 00:00:00')) to (54, 'SEAFOOD', Timestamp('2017-08-31 00:00:00'))
Data columns (total 9 columns):
 #   Column         Non-Null Count    Dtype  
---  ------         --------------    -----  
 0   city           1353218 non-null  object 
 1   dcoilwtico     1324706 non-null  float32
 2   id             1353218 non-null  int64  
 3   onpromotion    1324706 non-null  float64
 4   sales          1324706 non-null  float32
 5   state          1353218 non-null  object 
 6   store_cluster  1353218 non-null  uint8  
 7   store_type     1353218 non-null  object 
 8   transactions   1324706 non-null  float32
dtypes: float32(3), float64(1), int64(1), object(3), uint8(1)
memory usage: 73.6+ MB


#Feature Engineering Pipeline

---

***Date Features***

In [56]:
def _create_date_features(df):
    _df = df.reset_index("date")
    date_features = {
        "year": _df.loc[:, "date"].dt.year,
        "month": _df.loc[:, "date"].dt.month.astype(np.uint8),
        "day": _df.loc[:, "date"].dt.day,
        "day_of_week": _df.loc[:, "date"].dt.day_of_week.astype(np.uint8),
        "is_month_start": _df.loc[:, "date"].dt.is_month_start.astype(np.uint8),
        "is_month_middle": _df.loc[:, "date"].dt.day.eq(15).astype(np.uint8),
        "is_month_end": _df.loc[:, "date"].dt.is_month_end.astype(np.uint8),
        "is_weekend": _df.loc[:, "date"].dt.day_of_week.floordiv(5).astype(np.uint8),
    }
    with_date_features_df = (
        _df.assign(**date_features)
           .set_index("date", append=True)
           .sort_index(axis=0)
           .sort_index(axis=1)
    )
    return with_date_features_df


def make_date_features_transformer(verbose=True):
    transformer = ColumnTransformer(
        transformers=[("Engineer date features",
                       FunctionTransformer(func=_create_date_features,), ["id"])],
        remainder="drop",
        verbose=verbose,
        verbose_feature_names_out=False
        ).set_output(transform="pandas")
    return transformer

_create_date_features(df): This function takes a DataFrame as input and creates several date-related features from the "date" column. It does the following:

Resets the index of the DataFrame to include "date" as a regular column.
Extracts various date-related features such as year, month, day, day of the week, and others from the "date" column using the .dt accessor provided by Pandas.
Assigns these new features to the DataFrame as additional columns.
Sets the "date" column back as the index and sorts the DataFrame.
The resulting DataFrame contains the original data along with the newly engineered date-related features.

make_date_features_transformer(verbose=True): This function creates a transformer that can be used within a data preprocessing pipeline to add date-related features to a DataFrame. It uses scikit-learn's ColumnTransformer and FunctionTransformer to apply the _create_date_features function to the DataFrame. The "id" column is specified as the input column, although it is not used for the feature engineering. The remainder parameter is set to "drop," which means that all columns other than the specified input column will be dropped.

The transformer is configured to output the transformed DataFrame.

Overall, these functions are useful for adding date-related features to a DataFrame, which can be valuable for time series analysis and modeling tasks. The make_date_features_transformer function provides a convenient way to incorporate this feature engineering step into a data preprocessing pipeline.


---

#Holidays and Events Features

I make heavy use of Pandas string processing methods to extract a unique description for each holiday at the national, regional, and local level. I will use these descriptions to create indicator/dummy variables and add them as features in our model.

In [57]:
_HOLIDAYS_EVENTS_DF = load_holidays_events_df("holidays_events.csv")


HOLIDAY_TYPES = ["Additional", "Bridge", "Event", "Holiday", "Transfer", "Work Day"]


LOCAL_HOLIDAY_DESCRIPTIONS = (
    _HOLIDAYS_EVENTS_DF.query("(locale == 'Local') & (type in @HOLIDAY_TYPES) & (~transferred)")
                       .loc[:, "description"]
                       .str
                       .removeprefix("Traslado ")
                       .str
                       .strip("-+0123456789")
                       .unique()
                       .tolist()
)

NATIONAL_HOLIDAY_DESCRIPTIONS = (
    _HOLIDAYS_EVENTS_DF.query("(locale == 'National') & (type in @HOLIDAY_TYPES) & (~transferred)")
                       .loc[:, "description"]
                       .str
                       .removeprefix("Traslado ")
                       .str
                       .removeprefix("Puente ")
                       .str
                       .removeprefix("Inauguracion ")
                       .str
                       .replace(":.*", '', regex=True)
                       .str
                       .replace("puente", "Puente")
                       .str
                       .replace("primer", "Primer")
                       .str
                       .strip("-+0123456789")
                       .unique()
                       .tolist()
)


REGIONAL_HOLIDAY_DESCRIPTIONS = (
    _HOLIDAYS_EVENTS_DF.query("(locale == 'Regional') & (type in @HOLIDAY_TYPES) & (~transferred)")
                       .loc[:, "description"]
                       .unique()
                       .tolist()
)

I define some functions which will use the unique descriptions and create an indicator for each individual holiday. I need separate functions for the different locales as the joining logic is different.

In [58]:
def _create_some_local_holiday_indicator(df, description):
    is_local_holiday = (
        _HOLIDAYS_EVENTS_DF.query(f"(locale == 'Local') & (type in {HOLIDAY_TYPES}) & (~transferred)")
                           .loc[:, "description"]
                           .str
                           .contains(description, case=False)
    )
    indicator_name = f"is_{description.lower().replace(' ', '_')}"
    is_local_holiday_df = (
        _HOLIDAYS_EVENTS_DF.query(f"(locale == 'Local') & (type in {HOLIDAY_TYPES}) & (~transferred)")
                           .loc[is_local_holiday, ["date", "locale_name"]]
                           .rename(columns={"locale_name": "city"})
                           .assign(**{indicator_name: 1})
                           .drop_duplicates()
                           .set_index(["date", "city"])
    )
    with_local_holiday_indicator_df = (
        df.join(
            is_local_holiday_df,
            how="left",
            on=["date", "city"]
        ).fillna(
            value={indicator_name: 0}
        ).astype(
            {indicator_name: np.uint8}
        ).sort_index(
            axis=1
        )
    )
    return with_local_holiday_indicator_df


def _create_some_national_holiday_indicator(df, description):
    is_national_holiday = (
        _HOLIDAYS_EVENTS_DF.query(f"(locale == 'National') & (type in {HOLIDAY_TYPES}) & (~transferred)")
                           .loc[:, "description"]
                           .str
                           .contains(description, case=False)
    )
    indicator_name = f"is_{description.lower().replace(' ', '_')}"
    is_national_holiday_df = (
        _HOLIDAYS_EVENTS_DF.query(f"(locale == 'National') & (type in {HOLIDAY_TYPES}) & (~transferred)")
                           .loc[is_national_holiday, ["date"]]
                           .assign(**{indicator_name: 1})
                           .drop_duplicates()
                           .set_index("date")
    )
    with_national_holiday_indicator_df = (
        df.join(
            is_national_holiday_df,
            how="left",
            on=["date"]
        ).fillna(
            value={indicator_name: 0}
        ).astype(
            {indicator_name: np.uint8}
        ).sort_index(
            axis=1
        )
    )
    return with_national_holiday_indicator_df


def _create_some_regional_holiday_indicator(df, description):
    is_regional_holiday = (
        _HOLIDAYS_EVENTS_DF.query(f"(locale == 'Regional') & (type in {HOLIDAY_TYPES}) & (~transferred)")
                           .loc[:, "description"]
                           .str
                           .contains(description, case=False)
    )
    indicator_name = f"is_{description.lower().replace(' ', '_')}"
    is_regional_holiday_df = (
        _HOLIDAYS_EVENTS_DF.query(f"(locale == 'Regional') & (type in {HOLIDAY_TYPES}) & (~transferred)")
                           .loc[is_regional_holiday, ["date", "locale_name"]]
                           .rename(columns={"locale_name": "state"})
                           .assign(**{indicator_name: 1})
                           .drop_duplicates()
                           .set_index(["date", "state"])
    )
    with_regional_holiday_indicator_df = (
        df.join(
            is_regional_holiday_df,
            how="left",
            on=["date", "state"]
        ).fillna(
            value={indicator_name: 0}
        ).astype(
            {indicator_name: np.uint8}
        ).sort_index(
            axis=1
        )
    )
    return with_regional_holiday_indicator_df


These functions are used to create holiday indicators based on specific descriptions and types of holidays from a holiday events DataFrame (_HOLIDAYS_EVENTS_DF). These holiday indicators are added to a given DataFrame and help capture whether a particular holiday is present or not for each date or date and location (city or state). Let's break down what each function does:

_create_some_local_holiday_indicator(df, description): This function creates a local holiday indicator based on a specific holiday description. It does the following:

Filters the holiday events DataFrame _HOLIDAYS_EVENTS_DF to select holidays of type "Local" that match the specified description and are not transferred.
Constructs the name of the indicator column based on the description.
Creates a DataFrame (is_local_holiday_df) with columns "date," "city," and the indicator column.
Joins this indicator DataFrame with the input DataFrame df on "date" and "city."
Fills missing values with 0 (indicating no holiday) and converts the indicator column to uint8 data type.
Sorts the resulting DataFrame.
_create_some_national_holiday_indicator(df, description): This function is similar to the previous one but creates a national holiday indicator instead. It filters holidays of type "National" in the holiday events DataFrame and follows a similar process to create and join the indicator.

_create_some_regional_holiday_indicator(df, description): This function creates a regional holiday indicator based on the specified description. It filters holidays of type "Regional" in the holiday events DataFrame and follows a similar process to create and join the indicator. The indicator is associated with a specific state.

Each of these functions allows you to create holiday indicators for different types of holidays (local, national, regional) and specific holiday descriptions. These indicators are useful in time series analysis and modeling to account for the impact of holidays on the data.

It's important to note that these functions work with a pre-defined holiday events DataFrame (_HOLIDAYS_EVENTS_DF) and are designed to be used within a larger data preprocessing pipeline to enhance the feature set of the data with holiday-related information.


---
Now I define functions that build column transformers for each locale's holidays and events. Using column transformers allows my to parallelize the feature creation process.

In [59]:
def _make_local_holidays_transformer(n_jobs=-1, verbose=True):
    transformers = []
    for description in LOCAL_HOLIDAY_DESCRIPTIONS:
        transformers.append(
            (
                f"Create {description} indicator",
                FunctionTransformer(
                    func=_create_some_local_holiday_indicator,
                    kw_args={"description": description}
                ),
                ["city"]
            )
        )

    transformer = ColumnTransformer(
        transformers,
        n_jobs=n_jobs,
        remainder="drop",
        verbose=verbose,
        verbose_feature_names_out=False
    ).set_output(transform="pandas")

    return transformer


def _make_national_holidays_transformer(n_jobs=-1, verbose=True):
    transformers = []
    for description in NATIONAL_HOLIDAY_DESCRIPTIONS:
        transformers.append(
            (
                f"Create {description} indicator",
                FunctionTransformer(
                    func=_create_some_national_holiday_indicator,
                    kw_args={"description": description}
                ),
                ["id"]
            )
        )

    transformer = ColumnTransformer(
        transformers,
        n_jobs=n_jobs,
        remainder="drop",
        verbose=verbose,
        verbose_feature_names_out=False
    ).set_output(transform="pandas")

    return transformer


def _make_regional_holidays_transformer(n_jobs=-1, verbose=True):
    transformers = []
    for description in REGIONAL_HOLIDAY_DESCRIPTIONS:
        transformers.append(
            (
                f"Create {description} indicator",
                FunctionTransformer(
                    func=_create_some_regional_holiday_indicator,
                    kw_args={"description": description}
                ),
                ["state"]
            )
        )

    transformer = ColumnTransformer(
        transformers,
        n_jobs=n_jobs,
        remainder="drop",
        verbose=verbose,
        verbose_feature_names_out=False
    ).set_output(transform="pandas")

    return transformer

Next, I use a FeatureUnion to join together the three ColumnTransformer objects. This adds a further level of parallelism because now each of these three transformers can be computed in parallel.

In [60]:
def _create_local_holiday_indicator(df):
    indicator = (
        df.select_dtypes(include=np.uint8)
          .max(axis=1)
    )
    with_indicator_df = (
        df.assign(is_local_holiday=indicator)
    )
    return with_indicator_df


def _create_national_holiday_indicator(df):
    indicator = (
        df.select_dtypes(include=np.uint8)
          .max(axis=1)
    )
    with_indicator_df = (
        df.assign(is_national_holiday=indicator)
    )
    return with_indicator_df


def _create_regional_holiday_indicator(df):
    indicator = (
        df.select_dtypes(include=np.uint8)
          .max(axis=1)
    )
    with_indicator_df = (
        df.assign(is_regional_holiday=indicator)
    )
    return with_indicator_df


def _remove_duplicated_columns(df):
    return df.loc[:, ~df.columns.duplicated()]


def make_holidays_events_feature_union(n_jobs=-1, verbose=True):
    feature_union = FeatureUnion(
        transformer_list=[
            (
                "Engineer local holiday features",
                Pipeline(
                    steps=[
                        (
                            "Engineer individual local holiday indicators",
                            _make_local_holidays_transformer(
                                n_jobs=-1,
                                verbose=verbose,
                            )
                        ),
                        (
                            "Remove any duplicated columns",
                            FunctionTransformer(
                                func=_remove_duplicated_columns
                            )
                        ),
                        (
                            "Engineer local holiday indicator",
                            FunctionTransformer(
                                func=_create_local_holiday_indicator
                            )
                        )
                    ],
                    verbose=verbose
                )
            ),
            (
                "Engineer national holiday and event features",
                Pipeline(
                    steps=[
                        (
                            "Engineer indivdual national holiday and event indicators",
                            _make_national_holidays_transformer(
                                n_jobs=-1,
                                verbose=verbose
                            )
                        ),
                        (
                            "Remove any duplicated columns",
                            FunctionTransformer(
                                func=_remove_duplicated_columns
                            )
                        ),
                        (
                            "Engineer national holiday and event indicator",
                            FunctionTransformer(
                                func=_create_national_holiday_indicator
                            )
                        )
                    ],
                    verbose=verbose
                )
            ),
            (
                "Engineer regional holiday features",
                Pipeline(
                    steps=[
                        (
                            "Engineer individual regional holiday indicators",
                            _make_regional_holidays_transformer(
                                n_jobs=-1,
                                verbose=verbose
                            )
                        ),
                        (
                            "Remove any duplicated columns",
                            FunctionTransformer(
                                func=_remove_duplicated_columns
                            )
                        ),
                        (
                            "Engineer regional holiday indicator",
                            FunctionTransformer(
                                func=_create_regional_holiday_indicator
                            )
                        )
                    ],
                    verbose=verbose
                )
            )
        ],
        n_jobs=n_jobs,
        verbose=verbose
    ).set_output(transform="pandas")

    return feature_union

The code you provided defines several functions and a feature union that is used to create holiday-related indicators and features from a given DataFrame. Here's a breakdown of the key components and what this code is doing:

_create_local_holiday_indicator(df): This function creates a binary indicator for local holidays. It checks for columns with uint8 data type in the input DataFrame df and takes the maximum value across rows. The result is a binary indicator column "is_local_holiday" that is added to the DataFrame.

_create_national_holiday_indicator(df): Similar to the previous function, this one creates a binary indicator for national holidays based on uint8 columns in the input DataFrame. It adds an "is_national_holiday" column to the DataFrame.

_create_regional_holiday_indicator(df): This function creates a binary indicator for regional holidays using uint8 columns in the input DataFrame. It adds an "is_regional_holiday" column to the DataFrame.

_remove_duplicated_columns(df): This utility function removes duplicated columns from the input DataFrame df. It ensures that there are no duplicate columns after adding holiday indicators.

make_holidays_events_feature_union(n_jobs=-1, verbose=True): This function creates a feature union using scikit-learn's FeatureUnion class. The feature union combines multiple pipelines for engineering holiday-related features. It consists of three main parts:

"Engineer local holiday features": This pipeline first generates individual local holiday indicators using _make_local_holidays_transformer, removes duplicated columns, and then creates an overall "is_local_holiday" indicator using _create_local_holiday_indicator.
"Engineer national holiday and event features": This pipeline follows a similar pattern to create national holiday and event indicators.
"Engineer regional holiday features": This pipeline is dedicated to creating regional holiday indicators.
The feature union combines these three pipelines, and the resulting feature union can be used to engineer holiday-related features from a DataFrame. The n_jobs and verbose parameters control parallel processing and verbosity during feature engineering.


---

#Example Usage

In [61]:
_feature_union = make_holidays_events_feature_union(
    n_jobs=-1,
    verbose=True
)
_df = _feature_union.fit_transform(_preprocessed_df)



#Lagged Feature Engineering Pipeline
Notice that in order to properly create the lagged features from the transactions data that we will need to train our model and then forecast sales, we will need a model to forecast the missing transactions over the test window. Typically we would also need models for other features like dcoilwtico and onpromotion (and any other features we use during training that are not known at the time we make our forecast).

In [62]:
def _totals_by_store_nbr(df, onpromotion, transactions):
    features = ["sales"]
    if onpromotion:
        features.append("onpromotion")
    if transactions:
        features.append(transactions)
    totals_df = (
        df.loc[:, features]
          .groupby(["store_nbr", "date"])
          .sum()
          .rename(columns={feature: f"total_{feature}" for feature in features})
    )
    return totals_df


def _sales_per_transaction(df):
    totals_df = _totals_by_store_nbr(df, onpromotion=False, transactions=True)
    sales_per_transaction = (
        totals_df.loc[:, "sales"]
                 .div(totals_df.loc[:, "transactions"])
    )
    return sales_per_transaction

In [63]:
def _create_lagged_features(s, lags, drop):
    feature = s.name
    lagged_features = {}
    for lag in lags:
        lagged_features[f"{feature}_lag{lag:02d}"] = (
            s.groupby(["store_nbr", "family"])
             .shift(periods=lag)
             .fillna(method="bfill")
             .astype(np.float32)
        )
    if drop:
        with_lagged_features_df = (
            s.to_frame()
             .assign(**lagged_features)
             .drop(feature, axis=1)
             .sort_index(axis=1)
        )
    else:
        with_lagged_features_df = (
            s.to_frame()
             .assign(**lagged_features)
             .sort_index(axis=1)
        )
    return with_lagged_features_df


def _create_rolling_mean(s, lag, window):
    rolling_mean = {
        f"{s.name}_{window:02d}_day_rolling_mean": (
            s.groupby(["store_nbr", "family"])
             .rolling(min_periods=1, window=window)
             .mean()
             .astype(np.float32)
             .droplevel([0, 1])
        )
    }
    with_rolling_mean_df = (
        s.to_frame()
         .assign(**rolling_mean)
         .sort_index(axis=1)
    )
    return with_rolling_mean_df


def _fillna_with_forecast(df, forecast_df):
    with_forecast_df = (
        df.fillna(forecast_df)
    )
    return with_forecast_df


def _make_fillna_with_forecast_transformer(
    dcoilwtico_forecast_df,
    onpromotion_forecast_df,
    transactions_forecast_df,
    n_jobs,
    verbose):

    transformer = ColumnTransformer(
        transformers=[
            (
                "Insert dcoilwtico forecast",
                FunctionTransformer(
                    func=_fillna_with_forecast,
                    kw_args={
                        "forecast_df": dcoilwtico_forecast_df
                    }
                ),
                ["dcoilwtico"]
            ),
            (
                "Insert onpromotion forecast",
                FunctionTransformer(
                    func=_fillna_with_forecast,
                    kw_args={
                        "forecast_df": onpromotion_forecast_df
                    }
                ),
                ["onpromotion"]
            ),
            (
                "Insert transactions forecast",
                FunctionTransformer(
                    func=_fillna_with_forecast,
                    kw_args={
                        "forecast_df": transactions_forecast_df
                    }
                ),
                ["transactions"]
            )
        ],
        n_jobs=n_jobs,
        remainder="drop",
        verbose=verbose,
        verbose_feature_names_out=False
    ).set_output(transform="pandas")

    return transformer


def _make_lagged_feature_transformer(
    max_lags,
    n_jobs,
    verbose):
    transformer = ColumnTransformer(
        transformers=[
            (
                "Engineer lagged dcoilwtico features",
                FunctionTransformer(
                    func=_create_lagged_features,
                    kw_args={
                        "lags": range(1, max_lags + 1),
                        "drop": True,
                    }
                ),
                "dcoilwtico"
            ),
            (
                "Engineer lagged onpromotion features",
                FunctionTransformer(
                    func=_create_lagged_features,
                    kw_args={
                        "lags": range(1, max_lags + 1),
                        "drop": True,
                    }
                ),
                "onpromotion"
            ),
            (
                "Engineer lagged transactions features",
                FunctionTransformer(
                    func=_create_lagged_features,
                    kw_args={
                        "lags": range(1, max_lags + 1),
                        "drop": True,
                    }
                ),
                "transactions"
            ),
        ],
        n_jobs=n_jobs,
        remainder="drop",
        verbose=verbose,
        verbose_feature_names_out=False
    )
    return transformer


def _make_rolling_means_transformer(
    lag,
    window,
    n_jobs,
    verbose) -> ColumnTransformer:
    transformer = ColumnTransformer(
        transformers=[
            (
                f"Engineer dcoilwtico_lag{lag:02d} {window:02d}-day rolling mean",
                FunctionTransformer(
                    func=_create_rolling_mean,
                    kw_args={
                        "lag": lag,
                        "window": window
                    }
                ),
                f"dcoilwtico_lag{lag:02d}"
            ),
            (
                f"Engineer onpromotion_lag{lag:02d} {window:02d}-day rolling mean",
                FunctionTransformer(
                    func=_create_rolling_mean,
                    kw_args={
                        "lag": lag,
                        "window": window
                    }
                ),
                f"onpromotion_lag{lag:02d}"
            ),
            (
                f"Engineer transactions_lag{lag:02d} {window:02d}-day rolling mean",
                FunctionTransformer(
                    func=_create_rolling_mean,
                    kw_args={
                        "lag": lag,
                        "window": window
                    }
                ),
                f"transactions_lag{lag:02d}"
            ),
        ],
        n_jobs=n_jobs,
        remainder="passthrough",
        verbose=verbose,
        verbose_feature_names_out=False
    ).set_output(transform="pandas")

    return transformer


def make_lagged_feature_engineering_pipeline(
    max_lags,
    dcoilwtico_forecast_df,
    onpromotion_forecast_df,
    transactions_forecast_df,
    n_jobs=-1,
    verbose=True) -> Pipeline:
    pipeline = Pipeline(
        steps=[
            (
                "Insert test window forecasts",
                _make_fillna_with_forecast_transformer(
                    dcoilwtico_forecast_df,
                    onpromotion_forecast_df,
                    transactions_forecast_df,
                    n_jobs,
                    verbose,
                ),
            ),
            (
                "Create lagged features",
                _make_lagged_feature_transformer(
                    max_lags,
                    n_jobs,
                    verbose
                )
            ),
            (
                "Create weekly rolling means",
                _make_rolling_means_transformer(
                    1,
                    7,
                    n_jobs,
                    verbose
                )
            ),
            (
                "Create monthly rolling means",
                _make_rolling_means_transformer(
                    1,
                    28,
                    n_jobs,
                    verbose
                )
            ),
        ],
        verbose=verbose,
    ).set_output(transform="pandas")

    return pipeline

The provided code defines a set of functions and a pipeline for feature engineering and lagging for time series data. Here's an explanation of what each function does and how they contribute to the pipeline:

_create_lagged_features(s, lags, drop): This function creates lagged features for a given time series column s. It generates lagged versions of the column with different time periods specified in the lags list. If drop is True, it drops the original column and only keeps the lagged features.

_create_rolling_mean(s, lag, window): This function calculates rolling mean features for a given time series column s. It calculates the rolling mean over a specified window size and assigns the result to a new column with a descriptive name.

_fillna_with_forecast(df, forecast_df): This function fills missing values in a DataFrame df with corresponding values from a forecast DataFrame forecast_df.

_make_fillna_with_forecast_transformer(dcoilwtico_forecast_df, onpromotion_forecast_df, transactions_forecast_df, n_jobs, verbose): This function creates a transformer to fill missing values in specific columns of a DataFrame using forecast data. It handles columns like "dcoilwtico," "onpromotion," and "transactions."

_make_lagged_feature_transformer(max_lags, n_jobs, verbose): This function creates a transformer to generate lagged features for columns such as "dcoilwtico," "onpromotion," and "transactions" up to a maximum number of lags specified by max_lags.

_make_rolling_means_transformer(lag, window, n_jobs, verbose): This function creates a transformer to calculate rolling mean features for columns with lagged data. It computes rolling means with a specific window size.

make_lagged_feature_engineering_pipeline(max_lags, dcoilwtico_forecast_df, onpromotion_forecast_df, transactions_forecast_df, n_jobs=-1, verbose=True): This function creates a feature engineering pipeline that consists of the following steps:

Filling missing values with forecast data using _make_fillna_with_forecast_transformer.
Generating lagged features using _make_lagged_feature_transformer.
Creating weekly rolling mean features using _make_rolling_means_transformer.
Creating monthly rolling mean features using _make_rolling_means_transformer.
The pipeline allows for the inclusion of forecast DataFrames for "dcoilwtico," "onpromotion," and "transactions." It also supports parallel processing (n_jobs) and verbosity control (verbose).


---
# Example Usgae

We are given data on dcoilwtico and onpromotion for the test window. However, for a real deployment of our forecasting pipeline we would NOT know the future values for either the price of oil nor number of products on promotion nor transactions. If we want to use these series to create features for training our forecasting pipeline, then we need to also be able to generate forecasts of these variables prior to generating predictions from our model.


In [64]:
def forecast_dcoilwtico():
    test_df = load_test_df("test.csv")
    oil_df = load_oil_df("oil.csv")
    forecast_df = (
        test_df.join(oil_df, how="left", on=["date"])
               .groupby(["store_nbr", "family"])
               .fillna(method="ffill")
               .loc[:, ["dcoilwtico"]]
    )
    return forecast_df


def forecast_onpromotion():
    test_df = load_test_df("test.csv", onpromotion=True)
    forecast_df = (
        test_df.loc[:, ["onpromotion"]]
    )
    return forecast_df


def forecast_transactions():
    test_df = load_test_df("test.csv")
    train_df = load_train_df("train.csv")
    transactions_df = load_transactions_df("transactions.csv")

    joined_df = (
        train_df.join(
            transactions_df,
            how="left",
            on=["store_nbr", "date"]
        )
    )
    concat_df = (
        pd.concat(
            [joined_df, test_df],
            axis=0,
            sort=True,
        ).sort_index(axis=0)
    )
    forecast_df = (
        concat_df.groupby(["store_nbr", "family"])
                 .fillna(method="ffill")
                 .loc[pd.IndexSlice[:, :, "2017-08-16":], ["transactions"]]
    )
    return forecast_df


The provided code defines three functions for forecasting values for different features using the test dataset. Each function reads the test data and performs a forecast for a specific feature:

forecast_dcoilwtico(): This function forecasts the "dcoilwtico" feature. It loads the test data and the oil price data, joins them based on the "date" column, and fills missing values using forward fill (method="ffill"). Then, it selects the "dcoilwtico" column and returns a DataFrame with the forecasted values for "dcoilwtico."

forecast_onpromotion(): This function forecasts the "onpromotion" feature. It loads the test data with "onpromotion" information, and then it selects only the "onpromotion" column. The function returns a DataFrame with the forecasted values for "onpromotion."

forecast_transactions(): This function forecasts the "transactions" feature. It loads the train data, transactions data, and the test data. It joins the train data and transactions data based on the "store_nbr" and "date" columns. It concatenates the train and test data, sorts it, and fills missing values using forward fill (method="ffill"). Finally, it selects the "transactions" column for dates starting from "2017-08-16" and returns a DataFrame with the forecasted values for "transactions."

In [None]:
_pipeline = make_lagged_feature_engineering_pipeline(
    max_lags=28,
    dcoilwtico_forecast_df=forecast_dcoilwtico(),
    onpromotion_forecast_df=forecast_onpromotion(),
    transactions_forecast_df=forecast_transactions(),
    n_jobs=-1,
    verbose=True
)
_df = _pipeline.fit_transform(_preprocessed_df)



[Pipeline]  (step 1 of 4) Processing Insert test window forecasts, total=  12.5s
[Pipeline]  (step 2 of 4) Processing Create lagged features, total=  22.0s
[Pipeline]  (step 3 of 4) Processing Create weekly rolling means, total=   6.0s
[Pipeline]  (step 4 of 4) Processing Create monthly rolling means, total=   6.2s


In [None]:
_df.info()

<class 'pandas.core.frame.DataFrame'>
MultiIndex: 1353218 entries, (1, 'AUTOMOTIVE', Timestamp('2015-07-01 00:00:00')) to (54, 'SEAFOOD', Timestamp('2017-08-31 00:00:00'))
Data columns (total 90 columns):
 #   Column                                  Non-Null Count    Dtype  
---  ------                                  --------------    -----  
 0   dcoilwtico_lag01                        1353218 non-null  float32
 1   dcoilwtico_lag01_28_day_rolling_mean    1353218 non-null  float32
 2   onpromotion_lag01                       1353218 non-null  float32
 3   onpromotion_lag01_28_day_rolling_mean   1353218 non-null  float32
 4   transactions_lag01                      1353203 non-null  float32
 5   transactions_lag01_28_day_rolling_mean  1353218 non-null  float32
 6   dcoilwtico_lag01_07_day_rolling_mean    1353218 non-null  float32
 7   onpromotion_lag01_07_day_rolling_mean   1353218 non-null  float32
 8   transactions_lag01_07_day_rolling_mean  1353209 non-null  float32
 9   dcoilwtic

#Pipeline to encode categorical features


In [None]:
def make_feature_encoding_transformer(n_jobs=-1, verbose=True):
    transformer = ColumnTransformer(
        transformers=[
            (
                "One-hot encode city feature",
                OneHotEncoder(
                    sparse_output=False,
                    dtype=np.uint8
                ),
                ["city"]
            ),
            (
                "One-hot encode state feature",
                OneHotEncoder(
                    sparse_output=False,
                    dtype=np.uint8
                ),
                ["state"]
            ),
            (
                "One-hot encode store_cluster feature",
                OneHotEncoder(
                    sparse_output=False,
                    dtype=np.uint8
                ),
                ["store_cluster"]
            ),
            (
                "One-hot encode store_type feature",
                OneHotEncoder(
                    sparse_output=False,
                    dtype=np.uint8
                ),
                ["store_type"]
            )
        ],
        n_jobs=n_jobs,
        remainder="drop",
        verbose=verbose,
        verbose_feature_names_out=False,
    ).set_output(transform="pandas")

    return transformer

#Combined Feature Engineering Pipeline


In [None]:
def make_feature_engineering_feature_union(
    max_lags,
    dcoilwtico_forecast_df,
    onpromotion_forecast_df,
    transactions_forecast_df,
    n_jobs=-1,
    verbose=True) -> FeatureUnion:

    feature_union = FeatureUnion(
        transformer_list=[
            (
                "Passthrough sales target",
                ColumnTransformer(
                    transformers=[
                        (
                            "Identity transform",
                            FunctionTransformer(
                                func=lambda df: df
                            ),
                            ["sales"]
                        )
                    ],
                    n_jobs=1,
                    remainder="drop",
                    verbose=verbose,
                    verbose_feature_names_out=False,
                ),
            ),
            (
                "Engineer date features",
                make_date_features_transformer(
                    verbose
                )
            ),
            (
                "Engineer holidays and events features",
                make_holidays_events_feature_union(
                    n_jobs,
                    verbose
                )
            ),
            (
                "Engineer lagged features",
                make_lagged_feature_engineering_pipeline(
                    max_lags,
                    dcoilwtico_forecast_df,
                    onpromotion_forecast_df,
                    transactions_forecast_df,
                    n_jobs,
                    verbose
                )
            ),
            (
                "Encode categorical features",
                make_feature_encoding_transformer(
                    n_jobs=n_jobs,
                    verbose=verbose
                )
            ),
        ],
        n_jobs=n_jobs,
        verbose=verbose
    ).set_output(transform="pandas")

    return feature_union


In [None]:
_feature_union_kwargs = {
    "max_lags": 28,
    "dcoilwtico_forecast_df": forecast_dcoilwtico(),
    "onpromotion_forecast_df": forecast_onpromotion(),
    "transactions_forecast_df": forecast_transactions(),
    "n_jobs": -1,
    "verbose": True
}
_feature_union = make_feature_engineering_feature_union(
    **_feature_union_kwargs
)
_df = _feature_union.fit_transform(_preprocessed_df)
_df.info()



<class 'pandas.core.frame.DataFrame'>
MultiIndex: 1353218 entries, (1, 'AUTOMOTIVE', Timestamp('2015-07-01 00:00:00')) to (54, 'SEAFOOD', Timestamp('2017-08-31 00:00:00'))
Columns: 211 entries, sales to store_type_E
dtypes: float32(91), int64(4), object(2), uint8(114)
memory usage: 684.0+ MB


#Pipeline for converting to Nixlats format

In [None]:
def _create_unique_id(df):
    unique_ids = (
        df.loc[:, ["store_nbr", "family"]]
          .apply(lambda feature: feature.cat.codes, axis=0)
          .apply(lambda cs: '_'.join(str(c) for c in cs), axis=1)
          .astype("category")
    )
    return unique_ids


def make_nixtlats_preparation_pipeline(verbose=True) -> Pipeline:
    pipeline = Pipeline(
        steps=[
            (
                "Reset multi-index",
                FunctionTransformer(
                    func=lambda df: df.reset_index()
                )
            ),
            (
                "Convert store_nbr and family to category dtype",
                FunctionTransformer(
                    func=lambda df: df.astype({"store_nbr": "category", "family": "category"})
                )
            ),
            (
                "Create unique_id column using store_nbr and family",
                FunctionTransformer(
                    func=lambda df: df.assign(unique_id=_create_unique_id(df))
                )
            ),
            (
                "Encode remaining categorical features",
                ColumnTransformer(
                    transformers=[
                        (
                            "One-hot encode store_nbr and family",
                            OneHotEncoder(
                                sparse_output=False,
                                dtype=np.uint8
                            ),
                            ["store_nbr", "family"]
                        )
                    ],
                    remainder="passthrough",
                    verbose=verbose,
                    verbose_feature_names_out=False,
                )
            ),
            (
                "Rename columns for use with nixlats libraries",
                FunctionTransformer(
                    func=lambda df: df.rename(columns={"date": "ds", "sales": 'y'})
                )
            ),
            (
                "Sort the columns",
                FunctionTransformer(
                    func=lambda df: df.sort_index(axis=1)
                )
            ),
            (
                "Split into train and test data sets",
                FunctionTransformer(
                    func=lambda df: (df.query("ds < 20170816"), df.query("ds >= 20170816"))
                )
            )
        ],
        verbose=verbose
    ).set_output(transform="pandas")

    return pipeline


#Complete Pipeline



In [None]:
def prepare_sales_forecasting_data(
    onpromotion=True,
    join_oil=True,
    join_transactions=True,
    start="20150701",
    q=0.99,
    max_lags=28,
    dcoilwtico_forecast_df=None,
    onpromotion_forecast_df=None,
    transactions_forecast_df=None,
    n_jobs=-1,
    verbose=True):

    # load the training, test, and transactions data sets
    _train_df = load_train_df("train.csv")

    # preprocess the data
    data_preprocessing_pipeline = make_data_preprocessing_pipeline(
        onpromotion,
        join_oil,
        join_transactions,
        start,
        q,
        verbose
    )
    _preprocessed_df = data_preprocessing_pipeline.fit_transform(_train_df)

    # forecast future values of exogenous variables
    if dcoilwtico_forecast_df is None:
        dcoilwtico_forecast_df = forecast_dcoilwtico()
    if onpromotion_forecast_df is None:
        onpromotion_forecast_df = forecast_onpromotion()
    if transactions_forecast_df is None:
        transactions_forecast_df = forecast_transactions()

    # engineer features
    feature_engineering_feature_union = make_feature_engineering_feature_union(
        max_lags,
        dcoilwtico_forecast_df,
        onpromotion_forecast_df,
        transactions_forecast_df,
        n_jobs,
        verbose
    )
    _with_engineered_features_df = feature_engineering_feature_union.fit_transform(
        _preprocessed_df
    )

    # prepate for nixlats
    nixtlats_preparation_pipeline = make_nixtlats_preparation_pipeline(
        verbose
    )
    train_df, test_df = nixtlats_preparation_pipeline.fit_transform(_with_engineered_features_df)

    return train_df, test_df

#Helper Function for Extracting Static Feature Names


In [None]:
def get_static_feature_names(df) -> list:
    is_static_feature = (
        df.columns.str.startswith("city_") +
        df.columns.str.startswith("state_") +
        df.columns.str.startswith("store_nbr_") +
        df.columns.str.startswith("store_type_") +
        df.columns.str.startswith("store_cluster") +
        df.columns.str.startswith("family_")
    )
    static_feature_names = (
        df.columns[is_static_feature]
          .to_list()
    )
    return static_feature_names


#Example Usage




In [None]:
train_df, test_df = prepare_sales_forecasting_data()
train_df.info()
test_df.info()