In [3]:
# =========================
# Product Label → Sales: Clean, modular ML pipeline
# Paste into a new notebook cell or save as pipeline.py
# =========================

# 0. Requirements (install if needed)
# !pip install pandas numpy scikit-learn shap matplotlib seaborn xgboost category_encoders joblib

# 1. Imports
import os
import json
import numpy as np
import pandas as pd
from typing import List, Optional, Tuple, Dict

# sklearn
from sklearn.model_selection import train_test_split, cross_val_score, GridSearchCV, RandomizedSearchCV, KFold
from sklearn.pipeline import Pipeline
from sklearn.compose import ColumnTransformer
from sklearn.preprocessing import StandardScaler, OneHotEncoder, OrdinalEncoder, FunctionTransformer
from sklearn.impute import SimpleImputer
from sklearn.feature_extraction.text import TfidfVectorizer
from sklearn.linear_model import LinearRegression, Ridge, Lasso
from sklearn.ensemble import RandomForestRegressor, GradientBoostingRegressor
from sklearn.metrics import r2_score, mean_squared_error, mean_absolute_error
from sklearn.inspection import permutation_importance, PartialDependenceDisplay

# Optional XGBoost
try:
    from xgboost import XGBRegressor
    has_xgb = True
except Exception:
    has_xgb = False

# SHAP
%pip install shap
import shap

# utilities
import matplotlib.pyplot as plt
import seaborn as sns
import joblib
import warnings
warnings.filterwarnings("ignore")

Defaulting to user installation because normal site-packages is not writeable
Collecting shap
  Using cached shap-0.50.0-cp312-cp312-macosx_11_0_arm64.whl.metadata (25 kB)
Using cached shap-0.50.0-cp312-cp312-macosx_11_0_arm64.whl (555 kB)
Installing collected packages: shap
Successfully installed shap-0.50.0

[1m[[0m[34;49mnotice[0m[1;39;49m][0m[39;49m A new release of pip is available: [0m[31;49m24.3.1[0m[39;49m -> [0m[32;49m25.3[0m
[1m[[0m[34;49mnotice[0m[1;39;49m][0m[39;49m To update, run: [0m[32;49mpip3 install --upgrade pip[0m
Note: you may need to restart the kernel to use updated packages.


  from .autonotebook import tqdm as notebook_tqdm


In [4]:
# 1. Imports
import os
import json
import numpy as np
import pandas as pd
from typing import List, Optional, Tuple, Dict

# sklearn
from sklearn.model_selection import train_test_split, cross_val_score, GridSearchCV, RandomizedSearchCV, KFold
from sklearn.pipeline import Pipeline
from sklearn.compose import ColumnTransformer
from sklearn.preprocessing import StandardScaler, OneHotEncoder, OrdinalEncoder, FunctionTransformer
from sklearn.impute import SimpleImputer
from sklearn.feature_extraction.text import TfidfVectorizer
from sklearn.linear_model import LinearRegression, Ridge, Lasso
from sklearn.ensemble import RandomForestRegressor, GradientBoostingRegressor
from sklearn.metrics import r2_score, mean_squared_error, mean_absolute_error
from sklearn.inspection import permutation_importance, PartialDependenceDisplay

# Optional XGBoost
try:
    from xgboost import XGBRegressor
    has_xgb = True
except Exception:
    has_xgb = False

# SHAP
%pip install shap
import shap

# utilities
import matplotlib.pyplot as plt
import seaborn as sns
import joblib
import warnings
warnings.filterwarnings("ignore")

Defaulting to user installation because normal site-packages is not writeable

[1m[[0m[34;49mnotice[0m[1;39;49m][0m[39;49m A new release of pip is available: [0m[31;49m24.3.1[0m[39;49m -> [0m[32;49m25.3[0m
[1m[[0m[34;49mnotice[0m[1;39;49m][0m[39;49m To update, run: [0m[32;49mpip3 install --upgrade pip[0m
Note: you may need to restart the kernel to use updated packages.


In [6]:
# =========================
# 2. Utility / helper functions
# =========================

def load_data(path: str) -> pd.DataFrame:
    """Load dataset - adjust if your file format differs."""
    if path.endswith('.csv'):
        return pd.read_csv(path)
    elif path.endswith('.xlsx') or path.endswith('.xls'):
        return pd.read_excel(path)
    elif path.endswith('.parquet'):
        return pd.read_parquet(path)
    elif path.endswith('.ipynb'):
        raise ValueError("Notebook input given — load the contained CSV/Excel instead.")
    else:
        return pd.read_csv(path, low_memory=False)

def summarize_df(df: pd.DataFrame, n: int = 5):
    print("Shape:", df.shape)
    display(df.head(n))
    display(df.describe(include='all').T)

def identify_feature_groups(df: pd.DataFrame,
                            text_features: Optional[List[str]] = None,
                            numeric_features: Optional[List[str]] = None,
                            categorical_features: Optional[List[str]] = None) -> Dict[str, List[str]]:
    """
    Identify candidate features automatically if the user didn't provide them.
    This is conservative: uses dtype heuristics.
    """
    if text_features is None:
        text_features = [c for c in df.columns if df[c].dtype == "object" and df[c].nunique() > 20][:2]  # first 2 long texts
    if numeric_features is None:
        numeric_features = [c for c in df.select_dtypes(include=[np.number]).columns if c != 'Sales']  # exclude target name 'Sales' if present
    if categorical_features is None:
        categorical_features = [c for c in df.select_dtypes(include=['object', 'category']).columns if c not in text_features]
    return {"text": text_features, "num": numeric_features, "cat": categorical_features}

# Small helper to create label-specific engineered features — customize these based on your dataset
def create_label_features(df: pd.DataFrame, label_column: str = "LabelText") -> pd.DataFrame:
    """
    Example label-derived features:
     - label_text_length
     - label_word_count
     - label_has_numeric (e.g., nutritional numbers)
     - label_logo_present (if you have a boolean column or detect via presence of 'logo' in metadata)
    You should replace heuristics with actual detection if you have label images or annotations.
    """
    df = df.copy()
    if label_column in df.columns:
        df['label_text_length'] = df[label_column].fillna('').apply(len)
        df['label_word_count'] = df[label_column].fillna('').apply(lambda s: len(s.split()))
        df['label_has_digits'] = df[label_column].fillna('').apply(lambda s: any(ch.isdigit() for ch in s)).astype(int)
    else:
        # If no label text column, try heuristics on other text fields or add placeholders
        df['label_text_length'] = 0
        df['label_word_count'] = 0
        df['label_has_digits'] = 0
    return df

# Simple evaluation function
def evaluate_regression(y_true, y_pred) -> Dict[str, float]:
    rmse = mean_squared_error(y_true, y_pred, squared=False)
    mae = mean_absolute_error(y_true, y_pred)
    r2 = r2_score(y_true, y_pred)
    return {"RMSE": rmse, "MAE": mae, "R2": r2}

# Plotting helper for feature importances
def plot_feature_importances(features: List[str], importances: np.ndarray, top_n: int = 25, title: str = "Feature importances"):
    fi = pd.DataFrame({"feature": features, "importance": importances})
    fi = fi.sort_values("importance", ascending=False).head(top_n)
    plt.figure(figsize=(8, min(6, 0.25*len(fi))))
    sns.barplot(x="importance", y="feature", data=fi)
    plt.title(title)
    plt.tight_layout()
    plt.show()


In [7]:
# =========================
# 3. Preprocessing + Pipelines
# =========================

def build_preprocessor(numeric_features: List[str],
                       categorical_features: List[str],
                       text_features: List[str]) -> ColumnTransformer:
    """
    Build ColumnTransformer for numeric, categorical, and text features.
    Text features are vectorized with TF-IDF and then truncated (sparse).
    """
    # numeric pipeline
    num_pipe = Pipeline([
        ('imputer', SimpleImputer(strategy='median')),
        ('scaler', StandardScaler()),
    ])

    # categorical pipeline
    cat_pipe = Pipeline([
        ('imputer', SimpleImputer(strategy='most_frequent')),
        ('onehot', OneHotEncoder(handle_unknown='ignore', sparse=False)),
    ])

    # text pipeline: combine each text field's TFIDF into a single sparse matrix via FeatureUnion if needed.
    # We'll handle only the first text field for simplicity with TF-IDF. Extendable.
    text_transformers = []
    if text_features:
        # Use Tfidf for the first text column; if you have more, create separate transformers and combine.
        tfidf = Pipeline([
            ('imputer', FunctionTransformer(lambda X: X.fillna('').astype(str), validate=False)),
            ('tfidf', TfidfVectorizer(max_features=2000, ngram_range=(1,2)))
        ])
        # We'll use ColumnTransformer's 'remainder' to keep things simple by using a custom function below.
        # But ColumnTransformer cannot directly accept TfidfVectorizer for column names in older sklearn; so we'll handle in apply_text_features() step.
    preprocessor = ColumnTransformer(transformers=[
        ('num', num_pipe, numeric_features),
        ('cat', cat_pipe, categorical_features),
        # text will be handled separately to give more control
    ], remainder='drop', sparse_threshold=0)
    return preprocessor

# Text vectorizer wrapper to integrate with preprocessor output
from scipy.sparse import hstack
def transform_with_text(preprocessor: ColumnTransformer, df: pd.DataFrame, text_features: List[str], tfidf_vectorizer: Optional[TfidfVectorizer] = None):
    """Return (X, feature_names) combining preprocessor output (dense) and TF-IDF (sparse) for text."""
    X_pre = preprocessor.fit_transform(df)
    # feature names for numeric + categorical (OneHot)
    # Construct feature names carefully:
    feature_names = []
    # numeric names
    for t in preprocessor.transformers_:
        name, transformer, cols = t
        if name == 'num':
            feature_names.extend(list(cols))
        elif name == 'cat':
            # get feature names from encoder
            ohe = transformer.named_steps['onehot']
            # get categories if available
            ohe_features = []
            if hasattr(ohe, 'get_feature_names_out'):
                ohe_features = list(ohe.get_feature_names_out(cols))
            else:
                # fallback: use column names
                ohe_features = list(cols)
            feature_names.extend(ohe_features)
    # text
    tfidf = tfidf_vectorizer or TfidfVectorizer(max_features=2000, ngram_range=(1,2))
    if text_features:
        # combine the text columns into a single string per row
        text_series = df[text_features].fillna('').astype(str).agg(' '.join, axis=1)
        X_text = tfidf.fit_transform(text_series)
        # append text feature names
        tf_names = ["tfidf_" + t for t in tfidf.get_feature_names_out()]
        feature_names.extend(tf_names)
        # combine dense and sparse
        if hasattr(X_pre, "toarray"): X_pre = np.array(X_pre)
        try:
            from scipy import sparse
            if sparse.issparse(X_pre):
                X_full = hstack([X_pre, X_text])
            else:
                X_full = hstack([sparse.csr_matrix(X_pre), X_text])
        except Exception:
            X_full = np.hstack([X_pre.toarray() if hasattr(X_pre, "toarray") else X_pre, X_text.toarray()])
        return X_full, feature_names, tfidf
    else:
        # no text
        return X_pre, feature_names, None


In [8]:
# =========================
# 4. Modeling utilities
# =========================

def get_models(random_state: int = 42):
    models = {
        "Linear": LinearRegression(),
        "Ridge": Ridge(random_state=random_state),
        "Lasso": Lasso(random_state=random_state),
        "RandomForest": RandomForestRegressor(n_estimators=200, random_state=random_state, n_jobs=-1),
        "GBDT": GradientBoostingRegressor(n_estimators=200, random_state=random_state),
    }
    if has_xgb:
        models["XGBoost"] = XGBRegressor(n_estimators=200, random_state=random_state, n_jobs=-1, objective='reg:squarederror')
    return models

def cross_validate_models(models: Dict[str, object], X, y, cv=5, scoring='r2'):
    results = {}
    for name, model in models.items():
        print(f"Cross-validating {name} ...")
        scores = cross_val_score(model, X, y, cv=cv, scoring=scoring, n_jobs=-1)
        results[name] = {"mean_score": float(np.mean(scores)), "std": float(np.std(scores)), "all_scores": scores}
        print(f"  {name}: mean {scoring} = {results[name]['mean_score']:.4f} ± {results[name]['std']:.4f}")
    return results

# Train final model and evaluate
def train_and_evaluate(model, X_train, y_train, X_valid, y_valid, feature_names: Optional[List[str]] = None):
    model.fit(X_train, y_train)
    preds_train = model.predict(X_train)
    preds_valid = model.predict(X_valid)
    metrics_train = evaluate_regression(y_train, preds_train)
    metrics_valid = evaluate_regression(y_valid, preds_valid)
    print("Train:", metrics_train)
    print("Valid:", metrics_valid)
    if feature_names is not None and hasattr(model, "feature_importances_"):
        plot_feature_importances(feature_names, model.feature_importances_, top_n=30, title=f"{model.__class__.__name__} feature importances")
    return model, metrics_train, metrics_valid

# Permutation importance wrapper
def compute_permutation_importance(model, X_valid, y_valid, feature_names, n_repeats=10):
    r = permutation_importance(model, X_valid, y_valid, n_repeats=n_repeats, random_state=0, n_jobs=-1)
    sorted_idx = r.importances_mean.argsort()[::-1]
    importances = r.importances_mean[sorted_idx]
    names = [feature_names[i] for i in sorted_idx]
    plot_feature_importances(names, importances, top_n=30, title="Permutation importances (validation)")
    return r, names, importances

# SHAP analysis
def shap_analysis(model, X_sample, feature_names, model_name="model", n_samples=1000):
    """
    For tree-based models use TreeExplainer. For linear models KernelExplainer can be slow.
    X_sample can be a numpy array or DataFrame.
    """
    # convert to DataFrame for shap labels if possible
    if hasattr(X_sample, "toarray"):
        # might be sparse
        try:
            X_df = pd.DataFrame(X_sample.toarray(), columns=feature_names)
        except Exception:
            X_df = pd.DataFrame(X_sample.todense(), columns=feature_names)
    else:
        X_df = pd.DataFrame(X_sample, columns=feature_names)

    if model.__class__.__name__ in ["RandomForestRegressor", "GradientBoostingRegressor", "XGBRegressor"]:
        explainer = shap.TreeExplainer(model)
        shap_values = explainer.shap_values(X_df)
    else:
        # fallback
        explainer = shap.Explainer(model.predict, X_df)
        shap_values = explainer(X_df)
    # summary plot
    shap.summary_plot(shap_values, X_df, show=True)
    # dependence plot for top label-related features if they exist
    return explainer, shap_values

# Label-sensitivity scoring
def label_sensitivity_score(shap_values, feature_names, label_feature_patterns: List[str]):
    """
    Compute a simple label-sensitivity score: sum of absolute SHAP values for features matching label patterns,
    relative to total absolute SHAP values.
    """
    # shap_values: if shap returns array-like per sample
    if hasattr(shap_values, 'values'):
        sv = shap_values.values
    else:
        sv = shap_values
    abs_sv = np.abs(sv)
    total_by_feat = abs_sv.mean(axis=0)
    feature_df = pd.DataFrame({"feature": feature_names, "mean_abs_shap": total_by_feat})
    # sum features matching patterns
    mask = feature_df['feature'].str.contains('|'.join(label_feature_patterns), regex=True)
    label_importance = feature_df.loc[mask, 'mean_abs_shap'].sum()
    total = feature_df['mean_abs_shap'].sum()
    score = float(label_importance / total) if total > 0 else 0.0
    return {"label_importance": label_importance, "total_importance": total, "score": score, "matching_features": feature_df.loc[mask].sort_values('mean_abs_shap', ascending=False)}


In [9]:
# =========================
# 5. Example end-to-end usage
# =========================

def run_full_workflow(df: pd.DataFrame,
                      target: str = "Sales",
                      test_size: float = 0.2,
                      random_state: int = 42,
                      text_columns: Optional[List[str]] = None,
                      label_text_column: Optional[str] = None):
    """
    Full run: feature engineering, preprocessor build, model CV, final training, SHAP, permutation importance.
    """
    # 1) Quick summary
    summarize_df(df, n=3)

    # 2) Basic feature engineering for labels
    if label_text_column is None and 'LabelText' in df.columns:
        label_text_column = 'LabelText'
    if label_text_column:
        df = create_label_features(df, label_column=label_text_column)
    # 3) Identify features
    groups = identify_feature_groups(df, text_features=text_columns)
    # ensure target present
    if target not in df.columns:
        raise ValueError(f"Target '{target}' not found in dataframe columns.")

    # Drop rows with missing target
    df = df[df[target].notna()].copy()

    # Select features: all numeric + categorical + text-derived label features
    numeric_features = groups['num']
    categorical_features = groups['cat']
    text_features = groups['text'] if text_columns is None else text_columns

    # Ensure label-derived numeric features are in numeric_features
    potential_label_feats = [c for c in df.columns if c.startswith('label_')]
    for lf in potential_label_feats:
        if lf not in numeric_features:
            numeric_features.append(lf)

    # 4) train/valid split
    X = df[numeric_features + categorical_features + (text_features if text_features else [])]
    y = df[target]
    X_train, X_valid, y_train, y_valid = train_test_split(X, y, test_size=test_size, random_state=random_state)

    # 5) Build preprocessor
    preprocessor = build_preprocessor(numeric_features, categorical_features, text_features)
    X_train_trans, feature_names, tfidf = transform_with_text(preprocessor, X_train, text_features)
    X_valid_trans, _, _ = transform_with_text(preprocessor, X_valid, text_features, tfidf_vectorizer=tfidf)  # reuse same vectorizer

    # 6) Compare models via CV on training set (use a lightweight approach: 3-fold cv)
    models = get_models(random_state=random_state)
    cv_results = cross_validate_models(models, X_train_trans, y_train, cv=3, scoring='r2')

    # 7) Choose best model (by mean CV r2)
    best_name = max(cv_results.items(), key=lambda kv: kv[1]['mean_score'])[0]
    print("Best by CV (r2):", best_name)
    best_model = models[best_name]

    # 8) Train final model and evaluate
    trained_model, metrics_train, metrics_valid = train_and_evaluate(best_model, X_train_trans, y_train, X_valid_trans, y_valid, feature_names)

    # 9) Permutation importance on validation
    try:
        perm_res = compute_permutation_importance(trained_model, X_valid_trans, y_valid, feature_names)
    except Exception as e:
        print("Permutation importance failed:", e)
        perm_res = None

    # 10) SHAP analysis
    try:
        # sample some rows to speed up shap
        n_shap = min(500, X_valid_trans.shape[0])
        X_shap_sample = X_valid_trans[:n_shap]
        explainer, shap_values = shap_analysis(trained_model, X_shap_sample, feature_names, model_name=best_name)
        # label sensitivity
        label_patterns = ['label_', 'tfidf_', 'Color', 'color', 'logo', 'Logo']  # heuristics: extend based on your dataset
        sens = label_sensitivity_score(shap_values, feature_names, label_patterns)
        print("Label sensitivity score:", sens['score'])
        display(sens['matching_features'].head(20))
    except Exception as e:
        print("SHAP analysis failed:", e)
        shap_values = None
        sens = None

    # 11) Return objects for further analysis / saving
    return {
        "trained_model": trained_model,
        "feature_names": feature_names,
        "preprocessor": preprocessor,
        "tfidf_vectorizer": tfidf,
        "X_train_trans": X_train_trans,
        "X_valid_trans": X_valid_trans,
        "y_train": y_train,
        "y_valid": y_valid,
        "cv_results": cv_results,
        "perm_res": perm_res,
        "shap": {"explainer": explainer if 'explainer' in locals() else None, "shap_values": shap_values if 'shap_values' in locals() else None, "sensitivity": sens}
    }
