## Carregar Dados

In [None]:
import os
from IPython.display import clear_output

if "notebooks" in os.getcwd():
    os.chdir("..")
    print(f"Changed working directory to {os.getcwd()}")

In [None]:
import re
import pandas as pd

from beer_color_prediction import config


def slugify(s):
    return re.sub(r"\W+", "_", s).lower().strip("_")


df = pd.read_csv(config.RAW_DATA_DIR / "dataset.csv")
df = df.drop(columns=["Unnamed: 0", "Date/Time"])
df = df.set_index("Job ID")
df.columns = [slugify(col) for col in df.columns]

O objetivo do problema é entender quais variáveis influenciam na cor da cerveja Amstel, para isso, separamos de antemão o subconjunto em dados da Heineken e da Amstel. Como a coloração não pode ser negativa e há presença de valores faltantes, removemos essas amostras.

In [None]:
df = df.dropna(subset=["color"])
df = df.query("color >= 0")
df_amstel = df.query("product == 'AMST'")
df_heineken = df.query("product == 'HNK'")

In [None]:
df_amstel

In [None]:
df_heineken

Separamos de antemão um teste (gold) contendo apenas amostras da Amstel e um treino (train) contendo amostras de ambas marcas. Esse subconjunto será utilizado nas análises da predição do modelo.

In [None]:
df_test = df_amstel.sample(frac=0.2, random_state=42)
df_amstel = df_amstel.drop(df_test.index)
df = df.drop(df_test.index)
df_test.shape, df_amstel.shape

## Treinar modelo

In [None]:
from sklearn.model_selection import train_test_split
from sklearn.dummy import DummyRegressor

from sklearn.compose import TransformedTargetRegressor

from sklearn.metrics import (
    mean_squared_error,
    r2_score,
    mean_absolute_error,
    mean_absolute_percentage_error,
    root_mean_squared_error,
)

import numpy as np


def evaluate(y_true, y_pred):
    mse = mean_squared_error(y_true, y_pred)
    rmse = root_mean_squared_error(y_true, y_pred)
    r2 = r2_score(y_true, y_pred)
    mae = mean_absolute_error(y_true, y_pred)
    mape = mean_absolute_percentage_error(y_true, y_pred)
    return mse, rmse, r2, mae, mape


def print_metrics(y_true, y_pred):
    mse, rmse, r2, mae, mape = evaluate(y_true, y_pred)
    print(f"MSE: {mse:.2f}")
    print(f"RMSE: {rmse:.2f}")
    print(f"R^2: {r2:.2f}")
    print(f"MAE: {mae:.2f}")
    print(f"MAPE: {mape:.2f}")


def preprocess_data(
    data: pd.DataFrame,
    handle_negative_values: str = "keep",
    handle_outliers: str = "keep",
    outlier_threshold: float = 1.5,
    lower_percentile: float = 0.05,
    upper_percentile: float = 0.95,
) -> pd.DataFrame:
    """Preprocesses the input DataFrame.

    Args:
        data (pd.DataFrame): Input DataFrame.
        handle_negative_values (str, optional): How to handle negative values. Defaults to "keep".
        Options: "keep", "replace_with_zero", "replace_with_nan", "drop".
        handle_outliers (str, optional): How to handle outliers. Defaults to "keep".
        Options: "keep", "clip", "replace_with_nan", "drop".
        outlier_threshold (float, optional): Threshold for outlier detection. Defaults to 1.5.
        lower_percentile (float, optional): Lower percentile for outlier detection. Defaults to 0.05.
        upper_percentile (float, optional): Upper percentile for outlier detection. Defaults to 0.95.

    Returns:
        pd.DataFrame: Preprocessed DataFrame.
    """

    data = data.drop(columns=["product", "roast_color"], errors="ignore")

    if handle_negative_values == "replace_with_zero":
        data = data.clip(lower=0)
    elif handle_negative_values == "replace_with_nan":
        data = data.where(data >= 0)
    elif handle_negative_values == "drop":
        data = data[(data >= 0).all(axis=1)]

    if (
        handle_outliers == "clip"
        or handle_outliers == "replace_with_nan"
        or handle_outliers == "drop"
    ):
        iqrs = data.quantile(upper_percentile) - data.quantile(lower_percentile)
        lower_bound = data.quantile(lower_percentile) - outlier_threshold * iqrs
        upper_bound = data.quantile(upper_percentile) + outlier_threshold * iqrs
        if handle_outliers == "clip":
            data = data.clip(lower=lower_bound, upper=upper_bound, axis=1)
        elif handle_outliers == "replace_with_nan":
            data = data.where((data >= lower_bound) & (data <= upper_bound))
        elif handle_outliers == "drop":
            data = data[((data >= lower_bound) & (data <= upper_bound)).all(axis=1)]

    return data

In [None]:
from sklearn.pipeline import Pipeline
from sklearn.experimental import enable_iterative_imputer
from sklearn.impute import SimpleImputer, KNNImputer, IterativeImputer
from sklearn import preprocessing
from sklearn.ensemble import ExtraTreesRegressor
pipe = Pipeline(
    [
        ("imputer", SimpleImputer()),
        ("scaler", preprocessing.MinMaxScaler()),
        ("regressor", ExtraTreesRegressor(random_state=42)),
    ]
)

# pipe = TransformedTargetRegressor(
#     regressor=pipe,
#     transformer=preprocessing.RobustScaler(),
# )

In [None]:
df_train = preprocess_data(
    df,
    handle_negative_values="clip",
    handle_outliers="clip",
)

X_train = df_train.drop(columns=["color"])
y_train = df_train["color"]

pipe.fit(X_train, y_train)

df_test = preprocess_data(
    df_test,
    handle_negative_values="clip",
    handle_outliers="clip",
)
X_test = df_test.drop(columns=["color"])
y_test = df_test["color"]

y_pred = pipe.predict(X_test)

print_metrics(y_test, y_pred)

## Interpretabilidade

### Feature Importance

In [None]:
pipe.named_steps["regressor"].feature_importances_

In [None]:
#feature importance
import matplotlib.pyplot as plt

importances = pipe.named_steps["regressor"].feature_importances_
indices = np.argsort(importances)[::-1]

plt.figure(figsize=(10, 5))
plt.title("Feature importances")
plt.bar(range(X_train.shape[1]), importances[indices], align="center")
plt.xticks(range(X_train.shape[1]), X_train.columns[indices], rotation=90)
plt.xlim([-1, X_train.shape[1]])


### Shapley

In [None]:
#alguns plots de shap
import shap

shap.initjs()

In [None]:
pipe.named_steps

In [None]:
X_train_transformed = pipe.named_steps["imputer"].transform(X_train)
X_train_transformed = pipe.named_steps["scaler"].transform(X_train_transformed)

In [None]:
explainer = shap.Explainer(pipe.named_steps["regressor"])
shap_values = explainer(X_train_transformed)

In [None]:
#feature importance de acordo com shap
shap.summary_plot(shap_values, X_train_transformed, plot_type="bar",feature_names=X_train.columns)

In [None]:
shap.summary_plot(shap_values, X_train_transformed,feature_names=X_train.columns)