# PDP-based Feature Engineering for Wine Quality (Kaggle WineQT)
# ワイン品質予測におけるPDP特徴変換の検証（Kaggle WineQTデータ使用）

This notebook performs feature engineering based on Partial Dependence Plots (PDP) to improve linear regression performance on the Wine Quality dataset from Kaggle.

このノートブックでは、KaggleのWine Qualityデータセットを使い、PDPに基づく特徴変換で線形回帰の精度向上を試みます。
## Setup

### 1. Install dependencies

Run in your environment:dependencies:  results/.

In [None]:
pip install numpy pandas scikit-learn xgboost matplotlib scipy

### 2. Prepare data

Download the dataset from Kaggle:  
https://www.kaggle.com/datasets/yasserh/wine-quality-dataset

Place `WineQT.csv` into the `data/` folder relative to this notebook.

---

## Load libraries and data

In [None]:
import pandas as pd
import numpy as np
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler
from sklearn.linear_model import LinearRegression
from sklearn.ensemble import RandomForestRegressor
from xgboost import XGBRegressor
from sklearn.neural_network import MLPRegressor
from sklearn.metrics import r2_score, mean_squared_error
from sklearn.inspection import PartialDependenceDisplay
from scipy.optimize import curve_fit
import matplotlib.pyplot as plt

# Load dataset
df = pd.read_csv("data/WineQT.csv")  # Adjust path if needed
df = df.drop(columns=['Id'])

X = df.drop(columns=['quality'])
y = df['quality']

# Split data into train and test sets
X_train, X_test, y_train, y_test = train_test_split(
    X, y, test_size=0.2, random_state=42
)

---

## Define baseline and advanced models

In [None]:
models = {
    "Linear Regression (Baseline)": LinearRegression(),
    "Random Forest": RandomForestRegressor(n_estimators=100, random_state=42),
    "XGBoost": XGBRegressor(n_estimators=100, random_state=42, verbosity=0),
    "MLP (Neural Network)": MLPRegressor(hidden_layer_sizes=(100,), max_iter=500, random_state=42)
}

results = {}

for name, model in models.items():
    if "Linear" in name:
        scaler = StandardScaler()
        X_train_scaled = scaler.fit_transform(X_train)
        X_test_scaled = scaler.transform(X_test)
        model.fit(X_train_scaled, y_train)
        y_pred = model.predict(X_test_scaled)
    else:
        model.fit(X_train, y_train)
        y_pred = model.predict(X_test)

    r2 = r2_score(y_test, y_pred)
    rmse = np.sqrt(mean_squared_error(y_test, y_pred))
    results[name] = {"R2": r2, "RMSE": rmse, "model": model}
    print(f"{name} R2: {r2:.4f}, RMSE: {rmse:.4f}")


---

## Define candidate transformation functions used for PDP fitting

In [None]:
def f1(x, a, b): return a * x + b
def f2(x, a, b, c): return a * x**2 + b * x + c
def f3(x, a, b, c, d): return a * x**3 + b * x**2 + c * x + d
def flog(x, a, b): return a * np.log(x + 1e-9) + b

monotonic_functions = [f1, f2, f3, flog]


---

## Function to transform features with PDP-based fitting

In [None]:
def get_transformed_features(base_model, X_data, feature_names, functions):
    best_functions = {}
    for feature in feature_names:
        try:
            fig, ax = plt.subplots()
            display = PartialDependenceDisplay.from_estimator(base_model, X_data, [feature], ax=ax)
            plt.close(fig)

            pd_dict = display.pd_results[0]
            x_pd = np.array(pd_dict['grid_values']).ravel()
            y_pd = np.array(pd_dict['average']).ravel()

            mse_best = np.inf
            best_fn, best_popt = None, None

            for fn in functions:
                try:
                    p0 = np.ones(fn.__code__.co_argcount - 1)
                    popt, _ = curve_fit(fn, x_pd, y_pd, maxfev=20000, p0=p0)
                    mse = np.mean((y_pd - fn(x_pd, *popt)) ** 2)
                    if mse < mse_best:
                        mse_best = mse
                        best_fn, best_popt = fn, popt
                except Exception:
                    continue

            if best_fn is not None:
                best_functions[feature] = (best_fn, best_popt)
        except Exception:
            continue

    X_transformed = pd.DataFrame(index=X_data.index)
    for feature in feature_names:
        if feature in best_functions:
            fn, params = best_functions[feature]
            vals = fn(X_data[feature], *params)
            vals = np.clip(vals, -1e6, 1e6)
            X_transformed[feature + "_trans"] = vals
        else:
            X_transformed[feature] = X_data[feature]

    return X_transformed, best_functions


---

## Apply PDP-based transformation with base models and evaluate with Linear Regression

In [None]:
pdp_base_models = {
    "Random Forest": results["Random Forest"]["model"],
    "XGBoost": results["XGBoost"]["model"],
    "MLP (Neural Network)": results["MLP (Neural Network)"]["model"]
}

print("\n--- PDP-based Feature Engineering Results ---")
final_results = {}
for name, base_model in pdp_base_models.items():
    X_train_transformed, _ = get_transformed_features(base_model, X_train, X_train.columns, monotonic_functions)
    X_test_transformed, _ = get_transformed_features(base_model, X_test, X_test.columns, monotonic_functions)

    scaler_t = StandardScaler()
    X_train_t_scaled = scaler_t.fit_transform(X_train_transformed)
    X_test_t_scaled = scaler_t.transform(X_test_transformed)

    lr_final = LinearRegression()
    lr_final.fit(X_train_t_scaled, y_train)
    y_pred_final = lr_final.predict(X_test_t_scaled)

    r2_final = r2_score(y_test, y_pred_final)
    rmse_final = np.sqrt(mean_squared_error(y_test, y_pred_final))
    final_results[name] = {"R2": r2_final, "RMSE": rmse_final}
    print(f"Linear Regression R2 with {name} PDP-transformed features: {r2_final:.4f}, RMSE: {rmse_final:.4f}")

# 3. 最終結果まとめ表示
print("\n--- Summary of R2 and RMSE Scores ---")
print(f"Baseline Linear Regression R2: {results['Linear Regression (Baseline)']['R2']:.4f}, RMSE: {results['Linear Regression (Baseline)']['RMSE']:.4f}")
for name, score_dict in final_results.items():
    print(f"R2 and RMSE after {name} PDP-based transformation: R2={score_dict['R2']:.4f}, RMSE={score_dict['RMSE']:.4f}")


---

## Visualize PDPs and their fitted approximation curves for comparative analysis and interpretation

In [None]:
def get_transformed_features(base_model, X_data, feature_names, functions):
    best_functions = {}
    for feature in feature_names:
        try:
            fig, ax = plt.subplots()
            display = PartialDependenceDisplay.from_estimator(base_model, X_data, [feature], ax=ax)
            plt.close(fig)

            pd_dict = display.pd_results[0]
            x_pd = np.array(pd_dict['grid_values']).ravel()
            y_pd = np.array(pd_dict['average']).ravel()

            mse_best = np.inf
            best_fn, best_popt = None, None

            for fn in functions:
                try:
                    p0 = np.ones(fn.__code__.co_argcount - 1)
                    popt, _ = curve_fit(fn, x_pd, y_pd, maxfev=20000, p0=p0)
                    mse = np.mean((y_pd - fn(x_pd, *popt)) ** 2)
                    if mse < mse_best:
                        mse_best = mse
                        best_fn, best_popt = fn, popt
                except Exception:
                    continue

            if best_fn is not None:
                best_functions[feature] = (best_fn, best_popt, (x_pd, y_pd))
        except Exception:
            continue

    X_transformed = pd.DataFrame(index=X_data.index)
    for feature in feature_names:
        if feature in best_functions:
            fn, params = best_functions[feature][:2]
            vals = fn(X_data[feature], *params)
            vals = np.clip(vals, -1e6, 1e6)
            X_transformed[feature + "_trans"] = vals
        else:
            X_transformed[feature] = X_data[feature]

    return X_transformed, best_functions

feature_names = list(X_train.columns)

best_functions_dicts = {}
for name, base_model in pdp_base_models.items():
    _, best_functions = get_transformed_features(base_model, X_train, feature_names, monotonic_functions)
    best_functions_dicts[name] = best_functions

for feature in feature_names:
    fig, axs = plt.subplots(1, 3, figsize=(18, 4), sharey=True)
    for idx, (model_name, best_functions) in enumerate(best_functions_dicts.items()):
        ax = axs[idx]
        if feature in best_functions:
            fn, params = best_functions[feature][:2]
            x_pd, y_pd = best_functions[feature][2]
            ax.scatter(x_pd, y_pd, label='PDP', alpha=0.75)
            y_fit = fn(x_pd, *params)
            ax.plot(x_pd, y_fit, 'r--', label=f'Fit: {fn.__name__}')
            ax.set_title(model_name)
        else:
            ax.text(0.5, 0.5, 'N/A', fontsize=12, ha='center', va='center')
        ax.set_xlabel(feature)
        if idx == 0:
            ax.set_ylabel('Partial Dependence')
        ax.legend()
    plt.suptitle(f'PDP & Fit Comparison: {feature}')
    plt.tight_layout()
    plt.show()