In [1]:
# ============================================================================
# Price Elasticity Estimation Using Deep Learning
# Topic: Own- and cross-price elasticities, non-linear response, what-if pricing
# Input: Panel price–sales data for multiple products, competitors, and features
# Output: Elasticity curves, cross-price effects, pricing scenarios, revenue/margin
# ============================================================================

import numpy as np
import pandas as pd
import matplotlib.pyplot as plt

from sklearn.preprocessing import StandardScaler
from sklearn.model_selection import train_test_split
from sklearn.linear_model import LinearRegression
from sklearn.metrics import mean_squared_error

import tensorflow as tf
from tensorflow import keras
from tensorflow.keras import layers

import warnings
warnings.filterwarnings('ignore')

In [2]:
# ==== 1. CONFIG & CONSTANTS =================================================

RANDOM_SEED = 42
TEST_SIZE = 0.2
VALIDATION_SPLIT = 0.15

# Model hyperparameters
NN_HIDDEN_UNITS = [64, 32, 16]
NN_LEARNING_RATE = 0.001
NN_EPOCHS = 100
NN_BATCH_SIZE = 32

# Business parameters
N_PRODUCTS = 3                  # focal product + 2 related SKUs
BASE_PRICE = np.array([10.0, 9.0, 11.0])    # base prices per product
UNIT_COST = np.array([6.0, 5.0, 7.0])       # unit costs per product

# Simulation grid for what-if analysis
PRICE_GRID_STEPS = 20
PRICE_CHANGE_RANGE = (-0.3, 0.3)  # -30% to +30% around base price

np.random.seed(RANDOM_SEED)
tf.random.set_seed(RANDOM_SEED)

In [3]:
# ==== 2. SYNTHETIC DATA GENERATION ==========================================

def generate_price_elasticity_data(
    n_periods=365,
    n_stores=20,
    n_products=N_PRODUCTS
):
    """
    Create synthetic panel data with:
    - Own-price effects (negative elasticity)
    - Cross-price effects (substitution/complementarity)
    - Seasonality and store heterogeneity
    """

    periods = np.arange(n_periods)
    stores = np.arange(n_stores)

    rows = []

    # True (latent) elasticities for simulation
    # own-price elasticities per product (negative)
    own_elasticities = np.array([-1.5, -2.0, -1.2])
    # cross-price elasticities (matrix), off-diagonals only
    cross_elasticities = np.array([
        [0.0, 0.3, 0.1],
        [0.2, 0.0, 0.15],
        [0.1, 0.25, 0.0],
    ])

    for t in periods:
        # simple seasonality factor (e.g., weekly or annual)
        seasonality = 1.0 + 0.2 * np.sin(2 * np.pi * t / 30.0)

        for s in stores:
            store_factor = 1.0 + np.random.normal(0, 0.05)

            # randomized prices around base
            price_multiplier = np.random.uniform(0.7, 1.3, size=n_products)
            prices = BASE_PRICE * price_multiplier

            # competitor price (simplified, one competitor per product)
            comp_price = prices * np.random.uniform(0.9, 1.1, size=n_products)

            # underlying log-demand function to generate sales
            base_log_demand = np.log(100) + np.random.normal(0, 0.1)

            # own-price impact
            own_effect = own_elasticities * np.log(prices / BASE_PRICE)

            # cross-price impact: sum over j != i
            cross_effect = np.zeros(n_products)
            for i in range(n_products):
                for j in range(n_products):
                    if i == j:
                        continue
                    cross_effect[i] += cross_elasticities[i, j] * np.log(
                        prices[j] / BASE_PRICE[j]
                    )

            # competitor price effect (assume substitutable)
            comp_effect = 0.3 * np.log(comp_price / prices)

            # total log demand per product
            log_demand = (
                base_log_demand
                + seasonality
                + np.log(store_factor)
                + own_effect
                + cross_effect
                + comp_effect
                + np.random.normal(0, 0.1, size=n_products)  # noise
            )

            sales = np.exp(log_demand)  # ensure positive

            for i in range(n_products):
                rows.append({
                    "period": t,
                    "store": s,
                    "product_id": i,
                    "price": prices[i],
                    "comp_price": comp_price[i],
                    "sales": sales[i],
                    "seasonality": seasonality,
                    "store_factor": store_factor,
                    # relative prices of other products for cross-price features
                    "price_p0": prices[0],
                    "price_p1": prices[1],
                    "price_p2": prices[2],
                })

    df = pd.DataFrame(rows)
    return df

In [4]:
# ==== 3. FEATURE ENGINEERING ===============================================

def engineer_features(df):
    """
    Prepare features for modeling:
    - Log prices for semi-log demand models
    - Product dummies
    - Relative prices (own vs others)
    """
    df_feat = df.copy()

    # Log transforms
    df_feat["log_price"] = np.log(df_feat["price"])
    df_feat["log_comp_price"] = np.log(df_feat["comp_price"])

    # Product dummies (one-hot)
    for p in range(N_PRODUCTS):
        df_feat[f"prod_{p}"] = (df_feat["product_id"] == p).astype(int)

    # Relative prices vs. other products (cross-price signals)
    df_feat["rel_price_p0"] = np.log(df_feat["price"] / df_feat["price_p0"])
    df_feat["rel_price_p1"] = np.log(df_feat["price"] / df_feat["price_p1"])
    df_feat["rel_price_p2"] = np.log(df_feat["price"] / df_feat["price_p2"])

    # Target: log-sales (common in elasticity modeling)
    df_feat["log_sales"] = np.log(df_feat["sales"])

    return df_feat

In [5]:
# ==== 4. LINEAR ECONOMETRIC BASELINE ========================================

def train_log_linear_baseline(X_train, y_train, X_test, y_test, feature_names):
    """
    Classic log-log demand model: log(Q) = beta0 + beta1*log(P) + ...
    This is the econometric benchmark.
    """
    model = LinearRegression()
    model.fit(X_train, y_train)

    y_pred_train = model.predict(X_train)
    y_pred_test = model.predict(X_test)

    rmse_train = np.sqrt(mean_squared_error(y_train, y_pred_train))
    rmse_test = np.sqrt(mean_squared_error(y_test, y_pred_test))

    print("=== LOG-LINEAR ECONOMETRIC BASELINE ===")
    print(f"RMSE (train): {rmse_train:.3f}")
    print(f"RMSE (test) : {rmse_test:.3f}")
    print()

    # Coefficients for interpretation (approximate elasticities)
    coef_df = pd.DataFrame({
        "feature": feature_names,
        "coef": model.coef_
    })
    print("Top 10 coefficients (by magnitude):")
    print(coef_df.reindex(coef_df.coef.abs().sort_values(ascending=False).index).head(10))
    print()

    return model, y_pred_test


In [6]:
# ==== 5. NEURAL NETWORK MODEL ==============================================

def build_nn_model(input_dim):
    """
    Deep neural network for flexible, non-linear demand response.
    """
    model = keras.Sequential([
        layers.Dense(NN_HIDDEN_UNITS[0], activation="relu", input_dim=input_dim),
        layers.Dropout(0.2),
        layers.Dense(NN_HIDDEN_UNITS[1], activation="relu"),
        layers.Dropout(0.2),
        layers.Dense(NN_HIDDEN_UNITS[2], activation="relu"),
        layers.Dense(1, activation="linear")
    ])

    optimizer = keras.optimizers.Adam(learning_rate=NN_LEARNING_RATE)
    model.compile(optimizer=optimizer, loss="mse", metrics=["mae"])

    return model


def train_nn_model(X_train, y_train, X_test, y_test, X_val, y_val):
    """
    Train NN and return model and history.
    """
    model = build_nn_model(X_train.shape[1])

    history = model.fit(
        X_train, y_train,
        epochs=NN_EPOCHS,
        batch_size=NN_BATCH_SIZE,
        validation_data=(X_val, y_val),
        verbose=0
    )

    y_pred_test = model.predict(X_test, verbose=0).flatten()

    rmse_train = np.sqrt(mean_squared_error(y_train, model.predict(X_train, verbose=0).flatten()))
    rmse_test = np.sqrt(mean_squared_error(y_test, y_pred_test))

    print("=== NEURAL NETWORK DEMAND MODEL ===")
    print(f"RMSE (train): {rmse_train:.3f}")
    print(f"RMSE (test) : {rmse_test:.3f}")
    print()

    return model, y_pred_test, history

In [7]:
# ==== 6. ELASTICITY ESTIMATION ==============================================

def compute_point_elasticity(model_nn, scaler, base_row, feature_cols, product_idx, price_delta=0.01):
    """
    Local own-price elasticity from NN via finite differences:
    E = dQ/Q / dP/P ≈ [ (Q2 - Q1)/Q1 ] / [ (P2 - P1)/P1 ]
    """
    row1 = base_row.copy()
    row2 = base_row.copy()

    p1 = row1["price"]
    p2 = p1 * (1 + price_delta)

    # update price and derived log_price/rel_price features
    row2["price"] = p2
    row2["log_price"] = np.log(p2)

    # update relative prices vs. others (if using global price_pX columns)
    for p in range(N_PRODUCTS):
        row1[f"price_p{p}"] = row1[f"price_p{p}"]
        row2[f"price_p{p}"] = row1[f"price_p{p}"]
    row1["rel_price_p0"] = np.log(row1["price"] / row1["price_p0"])
    row1["rel_price_p1"] = np.log(row1["price"] / row1["price_p1"])
    row1["rel_price_p2"] = np.log(row1["price"] / row1["price_p2"])

    row2["rel_price_p0"] = np.log(row2["price"] / row2["price_p0"])
    row2["rel_price_p1"] = np.log(row2["price"] / row2["price_p1"])
    row2["rel_price_p2"] = np.log(row2["price"] / row2["price_p2"])

    X1 = scaler.transform(row1[feature_cols].values.reshape(1, -1))
    X2 = scaler.transform(row2[feature_cols].values.reshape(1, -1))

    log_q1 = model_nn.predict(X1, verbose=0).flatten()[0]
    log_q2 = model_nn.predict(X2, verbose=0).flatten()[0]

    Q1 = np.exp(log_q1)
    Q2 = np.exp(log_q2)

    dQ_over_Q = (Q2 - Q1) / Q1
    dP_over_P = (p2 - p1) / p1

    elasticity = dQ_over_Q / dP_over_P
    return elasticity


def simulate_price_curve(model_nn, scaler, base_row, feature_cols, product_idx, n_steps=PRICE_GRID_STEPS):
    """
    Generate demand, revenue, and margin curve vs. price for a given product.
    """
    base_price = base_row["price"]
    low_factor, high_factor = 1 + PRICE_CHANGE_RANGE[0], 1 + PRICE_CHANGE_RANGE[1]
    price_grid = np.linspace(base_price * low_factor, base_price * high_factor, n_steps)

    demands = []
    revenues = []
    margins = []

    for p in price_grid:
        row = base_row.copy()
        row["price"] = p
        row["log_price"] = np.log(p)

        for k in range(N_PRODUCTS):
            row[f"price_p{k}"] = row[f"price_p{k}"]

        row["rel_price_p0"] = np.log(row["price"] / row["price_p0"])
        row["rel_price_p1"] = np.log(row["price"] / row["price_p1"])
        row["rel_price_p2"] = np.log(row["price"] / row["price_p2"])

        X = scaler.transform(row[feature_cols].values.reshape(1, -1))
        log_q = model_nn.predict(X, verbose=0).flatten()[0]
        Q = np.exp(log_q)

        demands.append(Q)
        rev = p * Q
        mar = (p - UNIT_COST[product_idx]) * Q

        revenues.append(rev)
        margins.append(mar)

    curve_df = pd.DataFrame({
        "price": price_grid,
        "demand": demands,
        "revenue": revenues,
        "margin": margins
    })

    return curve_df


def simulate_cross_price_effect(
    model_nn,
    scaler,
    base_row,
    feature_cols,
    focal_product,
    cross_product,
    n_steps=PRICE_GRID_STEPS
):
    """
    Vary price of cross_product and measure demand for focal_product.
    """
    base_price_cross = base_row[f"price_p{cross_product}"]
    low_factor, high_factor = 1 + PRICE_CHANGE_RANGE[0], 1 + PRICE_CHANGE_RANGE[1]
    price_grid = np.linspace(base_price_cross * low_factor,
                             base_price_cross * high_factor,
                             n_steps)

    demands_focal = []

    for p in price_grid:
        row = base_row.copy()
        row[f"price_p{cross_product}"] = p

        X = scaler.transform(row[feature_cols].values.reshape(1, -1))
        log_q = model_nn.predict(X, verbose=0).flatten()[0]
        Q = np.exp(log_q)

        demands_focal.append(Q)

    return pd.DataFrame({
        "cross_price": price_grid,
        "focal_demand": demands_focal
    })

In [8]:
# ==== 7. VISUALIZATION & SCENARIOS ==========================================

def plot_price_response(curve_df, product_idx):
    fig, axes = plt.subplots(1, 3, figsize=(15, 4))

    axes[0].plot(curve_df["price"], curve_df["demand"])
    axes[0].set_title(f"Product {product_idx}: Demand vs Price")
    axes[0].set_xlabel("Price")
    axes[0].set_ylabel("Demand")

    axes[1].plot(curve_df["price"], curve_df["revenue"])
    axes[1].set_title(f"Product {product_idx}: Revenue vs Price")
    axes[1].set_xlabel("Price")
    axes[1].set_ylabel("Revenue")

    axes[2].plot(curve_df["price"], curve_df["margin"])
    axes[2].set_title(f"Product {product_idx}: Margin vs Price")
    axes[2].set_xlabel("Price")
    axes[2].set_ylabel("Margin")

    plt.tight_layout()
    plt.savefig(f"price_response_product_{product_idx}.png", dpi=100)
    print(f"Plots saved: price_response_product_{product_idx}.png")
    plt.close()


def plot_cross_price(df_cross, focal_product, cross_product):
    plt.figure(figsize=(6, 4))
    plt.plot(df_cross["cross_price"], df_cross["focal_demand"])
    plt.title(f"Cross-Price Effect: Focal {focal_product} vs. Product {cross_product} Price")
    plt.xlabel(f"Price of Product {cross_product}")
    plt.ylabel(f"Demand of Product {focal_product}")
    plt.grid(True, alpha=0.3)
    plt.tight_layout()
    fname = f"cross_price_effect_{focal_product}_vs_{cross_product}.png"
    plt.savefig(fname, dpi=100)
    print(f"Plot saved: {fname}")
    plt.close()

In [9]:
# ==== 8. MAIN PIPELINE ======================================================

def main():
    """
    End-to-end price elasticity modeling:
    1. Generate synthetic panel data
    2. Feature engineering
    3. Train log-linear econometric baseline
    4. Train deep neural network
    5. Estimate local elasticities
    6. Run price and cross-price scenarios
    7. Output revenue & margin-optimizing prices
    """

    print("Generating synthetic price–demand data...")
    df = generate_price_elasticity_data()

    print("Engineering features...")
    df = engineer_features(df)

    # Define feature set
    feature_cols = [
        "log_price",
        "log_comp_price",
        "seasonality",
        "store_factor",
        "rel_price_p0",
        "rel_price_p1",
        "rel_price_p2",
    ]
    # add product dummies
    for p in range(N_PRODUCTS):
        feature_cols.append(f"prod_{p}")

    X = df[feature_cols].values
    y = df["log_sales"].values

    # Scale
    scaler = StandardScaler()
    X_scaled = scaler.fit_transform(X)

    # Train/val/test split
    X_train, X_temp, y_train, y_temp = train_test_split(
        X_scaled, y, test_size=0.3, random_state=RANDOM_SEED
    )
    X_test, X_val, y_test, y_val = train_test_split(
        X_temp, y_temp, test_size=0.5, random_state=RANDOM_SEED
    )

    print("\n" + "=" * 50)
    print("MODEL TRAINING")
    print("=" * 50 + "\n")

    # Econometric baseline
    baseline_model, y_pred_lin = train_log_linear_baseline(
        X_train, y_train, X_test, y_test, feature_cols
    )

    # Neural network
    nn_model, y_pred_nn, history = train_nn_model(
        X_train, y_train, X_test, y_test, X_val, y_val
    )

    print("=" * 50)
    print("ELASTICITY & PRICING SCENARIOS")
    print("=" * 50 + "\n")

    # Choose a representative row from test set for focal product 0
    df_test = df.iloc[len(X_train):len(X_train) + len(X_test)].reset_index(drop=True)
    focal_product = 0
    base_row = df_test[df_test["product_id"] == focal_product].iloc[0].copy()

    # Own-price elasticity around base point
    own_elast = compute_point_elasticity(
        nn_model, scaler, base_row, feature_cols, product_idx=focal_product
    )
    print(f"Approx. local own-price elasticity for product {focal_product}: {own_elast:.2f}")

    # Price curve and optimal price (by margin)
    curve_df = simulate_price_curve(
        nn_model, scaler, base_row, feature_cols, product_idx=focal_product
    )
    idx_opt_revenue = curve_df["revenue"].idxmax()
    idx_opt_margin = curve_df["margin"].idxmax()

    print("\nOptimal prices from NN (simulation):")
    print(f"- Revenue-maximizing price: {curve_df.loc[idx_opt_revenue, 'price']:.2f}")
    print(f"- Margin-maximizing price : {curve_df.loc[idx_opt_margin, 'price']:.2f}")
    print()

    plot_price_response(curve_df, product_idx=focal_product)

    # Cross-price effect: impact of product 1 price on product 0 demand
    cross_product = 1
    df_cross = simulate_cross_price_effect(
        nn_model, scaler, base_row, feature_cols,
        focal_product=focal_product,
        cross_product=cross_product
    )
    plot_cross_price(df_cross, focal_product=focal_product, cross_product=cross_product)

    print("=" * 50)
    print("DONE")
    print("=" * 50 + "\n")

    return nn_model, scaler, curve_df, df_cross


if __name__ == "__main__":
    nn_model, scaler, price_curve, cross_price_df = main()
    print("\n✓ Price elasticity modeling complete!")
    print("✓ Try changing N_PRODUCTS, NN_HIDDEN_UNITS, or PRICE_CHANGE_RANGE for experiments.")

Generating synthetic price–demand data...
Engineering features...

MODEL TRAINING

=== LOG-LINEAR ECONOMETRIC BASELINE ===
RMSE (train): 0.153
RMSE (test) : 0.153

Top 10 coefficients (by magnitude):
          feature      coef
0       log_price -0.288946
2     seasonality  0.142872
8          prod_1 -0.081283
9          prod_2  0.079342
6    rel_price_p2 -0.059075
1  log_comp_price  0.058326
3    store_factor  0.049376
4    rel_price_p0 -0.040121
5    rel_price_p1 -0.021699
7          prod_0  0.001941

=== NEURAL NETWORK DEMAND MODEL ===
RMSE (train): 0.160
RMSE (test) : 0.162

ELASTICITY & PRICING SCENARIOS

Approx. local own-price elasticity for product 0: 0.22

Optimal prices from NN (simulation):
- Revenue-maximizing price: 6.25
- Margin-maximizing price : 11.61

Plots saved: price_response_product_0.png
Plot saved: cross_price_effect_0_vs_1.png
DONE


✓ Price elasticity modeling complete!
✓ Try changing N_PRODUCTS, NN_HIDDEN_UNITS, or PRICE_CHANGE_RANGE for experiments.
