# 07a - Linear Regression Assumptions Validation

## Purpose
Validate that our demand forecasting data meets the assumptions required for Linear Regression and implement proper preprocessing strategies.

## Linear Regression Assumptions
1. **Linearity** - Linear relationship between features and target
2. **Independence** - Observations are independent (no autocorrelation)
3. **Homoscedasticity** - Constant variance of residuals
4. **Normality** - Residuals are normally distributed
5. **No Multicollinearity** - Features are not highly correlated

## Additional Preprocessing
- Outlier detection and handling
- Feature scaling considerations
- Data transformations if needed

In [None]:
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns
from pathlib import Path
from scipy import stats
from sklearn.linear_model import LinearRegression
from sklearn.preprocessing import StandardScaler, RobustScaler
from statsmodels.stats.stattools import durbin_watson
from statsmodels.stats.diagnostic import het_breuschpagan
from statsmodels.stats.outliers_influence import variance_inflation_factor
import statsmodels.api as sm
import warnings
warnings.filterwarnings('ignore')

plt.style.use('seaborn-v0_8-whitegrid')

# Load data
DATA_DIR = Path('../..') / 'ml' / 'data' / 'processed'

orders = pd.read_csv(DATA_DIR / 'orders.csv')
order_items = pd.read_csv(DATA_DIR / 'order_items.csv')
products = pd.read_csv(DATA_DIR / 'products.csv')

orders['OrderDate'] = pd.to_datetime(orders['OrderDate'])
full_orders = orders.merge(order_items, on='OrderID').merge(products, on='ProductID')

print(f"Data loaded: {len(full_orders):,} records")

In [None]:
# Prepare daily demand data for a sample product
daily_demand = full_orders.groupby([full_orders['OrderDate'].dt.date, 'ProductID'])['Quantity'].sum().reset_index()
daily_demand.columns = ['Date', 'ProductID', 'Quantity']
daily_demand['Date'] = pd.to_datetime(daily_demand['Date'])

# Select top product for detailed analysis
top_product = full_orders.groupby('ProductID')['Quantity'].sum().idxmax()
product_data = daily_demand[daily_demand['ProductID'] == top_product].sort_values('Date').copy()

# Fill missing dates with 0
date_range = pd.date_range(product_data['Date'].min(), product_data['Date'].max())
product_data = product_data.set_index('Date').reindex(date_range, fill_value=0).reset_index()
product_data.columns = ['Date', 'ProductID', 'Quantity']
product_data['ProductID'] = top_product

# Create features
product_data['DayIndex'] = (product_data['Date'] - product_data['Date'].min()).dt.days
product_data['DayOfWeek'] = product_data['Date'].dt.dayofweek
product_data['Month'] = product_data['Date'].dt.month
product_data['WeekOfYear'] = product_data['Date'].dt.isocalendar().week.astype(int)

print(f"Analyzing product: {top_product}")
print(f"Data points: {len(product_data)}")
print(f"\nTarget variable (Quantity) statistics:")
print(product_data['Quantity'].describe())

## 1. Outlier Detection and Handling

In [None]:
# Method 1: IQR-based outlier detection
Q1 = product_data['Quantity'].quantile(0.25)
Q3 = product_data['Quantity'].quantile(0.75)
IQR = Q3 - Q1
lower_bound = Q1 - 1.5 * IQR
upper_bound = Q3 + 1.5 * IQR

outliers_iqr = product_data[(product_data['Quantity'] < lower_bound) | (product_data['Quantity'] > upper_bound)]

# Method 2: Z-score based outlier detection
z_scores = np.abs(stats.zscore(product_data['Quantity']))
outliers_zscore = product_data[z_scores > 3]

# Method 3: Modified Z-score (robust to outliers)
median = product_data['Quantity'].median()
mad = np.median(np.abs(product_data['Quantity'] - median))
modified_z = 0.6745 * (product_data['Quantity'] - median) / (mad if mad != 0 else 1)
outliers_modified_z = product_data[np.abs(modified_z) > 3.5]

print("=== OUTLIER DETECTION RESULTS ===")
print(f"\nIQR Method:")
print(f"  Lower bound: {lower_bound:.2f}")
print(f"  Upper bound: {upper_bound:.2f}")
print(f"  Outliers found: {len(outliers_iqr)} ({len(outliers_iqr)/len(product_data)*100:.2f}%)")

print(f"\nZ-Score Method (threshold=3):")
print(f"  Outliers found: {len(outliers_zscore)} ({len(outliers_zscore)/len(product_data)*100:.2f}%)")

print(f"\nModified Z-Score Method (threshold=3.5):")
print(f"  Outliers found: {len(outliers_modified_z)} ({len(outliers_modified_z)/len(product_data)*100:.2f}%)")

In [None]:
# Visualize outliers
fig, axes = plt.subplots(2, 2, figsize=(14, 10))

# Box plot
axes[0, 0].boxplot(product_data['Quantity'])
axes[0, 0].set_title('Box Plot - Quantity Distribution')
axes[0, 0].set_ylabel('Quantity')

# Histogram with normal curve
axes[0, 1].hist(product_data['Quantity'], bins=30, density=True, alpha=0.7, edgecolor='black')
xmin, xmax = axes[0, 1].get_xlim()
x = np.linspace(xmin, xmax, 100)
axes[0, 1].plot(x, stats.norm.pdf(x, product_data['Quantity'].mean(), product_data['Quantity'].std()), 'r-', linewidth=2)
axes[0, 1].set_title('Histogram with Normal Distribution Overlay')
axes[0, 1].set_xlabel('Quantity')

# Time series with outliers highlighted
axes[1, 0].plot(product_data['Date'], product_data['Quantity'], alpha=0.7)
axes[1, 0].scatter(outliers_iqr['Date'], outliers_iqr['Quantity'], color='red', s=50, label='Outliers (IQR)')
axes[1, 0].axhline(y=upper_bound, color='r', linestyle='--', alpha=0.5)
axes[1, 0].axhline(y=lower_bound, color='r', linestyle='--', alpha=0.5)
axes[1, 0].set_title('Time Series with Outliers Highlighted')
axes[1, 0].legend()

# Q-Q plot
stats.probplot(product_data['Quantity'], dist="norm", plot=axes[1, 1])
axes[1, 1].set_title('Q-Q Plot (Normality Check)')

plt.tight_layout()
plt.show()

In [None]:
# Outlier Handling Strategies
print("=== OUTLIER HANDLING STRATEGIES ===")

# Strategy 1: Remove outliers
data_no_outliers = product_data[(product_data['Quantity'] >= lower_bound) & (product_data['Quantity'] <= upper_bound)].copy()

# Strategy 2: Cap/Winsorize outliers
data_capped = product_data.copy()
data_capped['Quantity'] = data_capped['Quantity'].clip(lower=lower_bound, upper=upper_bound)

# Strategy 3: Log transformation (for right-skewed data)
data_log = product_data.copy()
data_log['Quantity_Log'] = np.log1p(data_log['Quantity'])  # log(1+x) handles zeros

# Strategy 4: Use robust scaler
robust_scaler = RobustScaler()
data_robust = product_data.copy()
data_robust['Quantity_Scaled'] = robust_scaler.fit_transform(data_robust[['Quantity']])

print(f"\nOriginal data: {len(product_data)} records")
print(f"After removing outliers: {len(data_no_outliers)} records ({len(product_data) - len(data_no_outliers)} removed)")
print(f"\nSkewness:")
print(f"  Original: {product_data['Quantity'].skew():.3f}")
print(f"  After capping: {data_capped['Quantity'].skew():.3f}")
print(f"  After log transform: {data_log['Quantity_Log'].skew():.3f}")

# Recommendation based on skewness
skewness = product_data['Quantity'].skew()
if abs(skewness) > 1:
    print(f"\n⚠️ Data is highly skewed ({skewness:.3f}). Consider log transformation.")
elif abs(skewness) > 0.5:
    print(f"\n⚠️ Data is moderately skewed ({skewness:.3f}). Consider capping outliers.")
else:
    print(f"\n✓ Data skewness is acceptable ({skewness:.3f}).")

## 2. Assumption 1: Linearity Check

In [None]:
# Check linearity between features and target
fig, axes = plt.subplots(2, 2, figsize=(14, 10))

# DayIndex vs Quantity
axes[0, 0].scatter(product_data['DayIndex'], product_data['Quantity'], alpha=0.5)
z = np.polyfit(product_data['DayIndex'], product_data['Quantity'], 1)
p = np.poly1d(z)
axes[0, 0].plot(product_data['DayIndex'], p(product_data['DayIndex']), 'r-', linewidth=2)
axes[0, 0].set_xlabel('Day Index')
axes[0, 0].set_ylabel('Quantity')
axes[0, 0].set_title('Linearity Check: DayIndex vs Quantity')

# DayOfWeek vs Quantity (categorical - use box plot)
product_data.boxplot(column='Quantity', by='DayOfWeek', ax=axes[0, 1])
axes[0, 1].set_title('Quantity by Day of Week')
axes[0, 1].set_xlabel('Day of Week (0=Mon, 6=Sun)')
plt.suptitle('')

# Month vs Quantity
product_data.boxplot(column='Quantity', by='Month', ax=axes[1, 0])
axes[1, 0].set_title('Quantity by Month')
axes[1, 0].set_xlabel('Month')
plt.suptitle('')

# Correlation heatmap
corr_cols = ['DayIndex', 'DayOfWeek', 'Month', 'Quantity']
corr_matrix = product_data[corr_cols].corr()
sns.heatmap(corr_matrix, annot=True, cmap='coolwarm', center=0, ax=axes[1, 1])
axes[1, 1].set_title('Feature Correlations')

plt.tight_layout()
plt.show()

# Calculate correlation coefficients
print("=== LINEARITY ASSESSMENT ===")
for col in ['DayIndex', 'DayOfWeek', 'Month']:
    corr, p_value = stats.pearsonr(product_data[col], product_data['Quantity'])
    print(f"{col}: r = {corr:.4f}, p-value = {p_value:.4f}")
    if p_value < 0.05:
        print(f"  → Statistically significant linear relationship")
    else:
        print(f"  → No significant linear relationship")

## 3. Fit Model and Check Residuals

In [None]:
# Fit the linear regression model
X = product_data[['DayIndex', 'DayOfWeek', 'Month']]
y = product_data['Quantity']

# Add constant for statsmodels
X_const = sm.add_constant(X)
model = sm.OLS(y, X_const).fit()

# Get predictions and residuals
predictions = model.predict(X_const)
residuals = y - predictions

print("=== MODEL SUMMARY ===")
print(model.summary())

## 4. Assumption 2: Independence (No Autocorrelation)

In [None]:
# Durbin-Watson test for autocorrelation
dw_statistic = durbin_watson(residuals)

print("=== INDEPENDENCE CHECK (Durbin-Watson Test) ===")
print(f"Durbin-Watson statistic: {dw_statistic:.4f}")
print(f"\nInterpretation:")
print(f"  DW ≈ 2: No autocorrelation (ideal)")
print(f"  DW < 2: Positive autocorrelation")
print(f"  DW > 2: Negative autocorrelation")

if 1.5 <= dw_statistic <= 2.5:
    print(f"\n✓ Durbin-Watson = {dw_statistic:.4f} is within acceptable range [1.5, 2.5]")
else:
    print(f"\n⚠️ Durbin-Watson = {dw_statistic:.4f} indicates autocorrelation issue")
    print("   Consider: Adding lag features, using time-series specific models (ARIMA)")

# Plot residuals over time
fig, axes = plt.subplots(1, 2, figsize=(14, 5))

axes[0].plot(product_data['Date'], residuals)
axes[0].axhline(y=0, color='r', linestyle='--')
axes[0].set_title('Residuals Over Time')
axes[0].set_xlabel('Date')
axes[0].set_ylabel('Residuals')

# Autocorrelation plot
pd.plotting.autocorrelation_plot(residuals, ax=axes[1])
axes[1].set_title('Autocorrelation Plot of Residuals')

plt.tight_layout()
plt.show()

## 5. Assumption 3: Homoscedasticity (Constant Variance)

In [None]:
# Breusch-Pagan test for heteroscedasticity
bp_test = het_breuschpagan(residuals, X_const)
bp_statistic, bp_pvalue, bp_f_stat, bp_f_pvalue = bp_test

print("=== HOMOSCEDASTICITY CHECK (Breusch-Pagan Test) ===")
print(f"Lagrange Multiplier statistic: {bp_statistic:.4f}")
print(f"p-value: {bp_pvalue:.4f}")
print(f"F-statistic: {bp_f_stat:.4f}")
print(f"F p-value: {bp_f_pvalue:.4f}")

if bp_pvalue > 0.05:
    print(f"\n✓ p-value = {bp_pvalue:.4f} > 0.05: Homoscedasticity assumption holds")
else:
    print(f"\n⚠️ p-value = {bp_pvalue:.4f} < 0.05: Heteroscedasticity detected")
    print("   Consider: Weighted Least Squares, log transformation, robust standard errors")

# Visual check: Residuals vs Fitted values
fig, axes = plt.subplots(1, 2, figsize=(14, 5))

axes[0].scatter(predictions, residuals, alpha=0.5)
axes[0].axhline(y=0, color='r', linestyle='--')
axes[0].set_xlabel('Fitted Values')
axes[0].set_ylabel('Residuals')
axes[0].set_title('Residuals vs Fitted Values (Homoscedasticity Check)')

# Scale-Location plot
axes[1].scatter(predictions, np.sqrt(np.abs(residuals)), alpha=0.5)
axes[1].set_xlabel('Fitted Values')
axes[1].set_ylabel('√|Residuals|')
axes[1].set_title('Scale-Location Plot')

plt.tight_layout()
plt.show()

## 6. Assumption 4: Normality of Residuals

In [None]:
# Shapiro-Wilk test for normality (use sample if data is large)
sample_size = min(5000, len(residuals))
residual_sample = residuals.sample(sample_size) if len(residuals) > 5000 else residuals
shapiro_stat, shapiro_pvalue = stats.shapiro(residual_sample)

# Jarque-Bera test
jb_stat, jb_pvalue = stats.jarque_bera(residuals)

# D'Agostino-Pearson test
dagostino_stat, dagostino_pvalue = stats.normaltest(residuals)

print("=== NORMALITY CHECK ===")
print(f"\nShapiro-Wilk Test:")
print(f"  Statistic: {shapiro_stat:.4f}")
print(f"  p-value: {shapiro_pvalue:.4f}")

print(f"\nJarque-Bera Test:")
print(f"  Statistic: {jb_stat:.4f}")
print(f"  p-value: {jb_pvalue:.4f}")

print(f"\nD'Agostino-Pearson Test:")
print(f"  Statistic: {dagostino_stat:.4f}")
print(f"  p-value: {dagostino_pvalue:.4f}")

# Interpretation
print(f"\nResidual Statistics:")
print(f"  Skewness: {stats.skew(residuals):.4f}")
print(f"  Kurtosis: {stats.kurtosis(residuals):.4f}")

if shapiro_pvalue > 0.05:
    print(f"\n✓ Residuals appear normally distributed (Shapiro p={shapiro_pvalue:.4f})")
else:
    print(f"\n⚠️ Residuals may not be normally distributed (Shapiro p={shapiro_pvalue:.4f})")
    print("   Note: With large samples, slight deviations are common.")
    print("   Check visual diagnostics for practical significance.")

In [None]:
# Visual normality checks
fig, axes = plt.subplots(1, 3, figsize=(15, 5))

# Histogram of residuals
axes[0].hist(residuals, bins=30, density=True, alpha=0.7, edgecolor='black')
xmin, xmax = axes[0].get_xlim()
x = np.linspace(xmin, xmax, 100)
axes[0].plot(x, stats.norm.pdf(x, residuals.mean(), residuals.std()), 'r-', linewidth=2)
axes[0].set_title('Histogram of Residuals')
axes[0].set_xlabel('Residuals')

# Q-Q plot
stats.probplot(residuals, dist="norm", plot=axes[1])
axes[1].set_title('Q-Q Plot of Residuals')

# KDE plot
residuals.plot(kind='kde', ax=axes[2], label='Residuals')
x = np.linspace(residuals.min(), residuals.max(), 100)
axes[2].plot(x, stats.norm.pdf(x, residuals.mean(), residuals.std()), 'r-', label='Normal')
axes[2].set_title('Kernel Density Estimation')
axes[2].legend()

plt.tight_layout()
plt.show()

## 7. Assumption 5: No Multicollinearity

In [None]:
# Calculate Variance Inflation Factor (VIF)
vif_data = pd.DataFrame()
vif_data['Feature'] = X.columns
vif_data['VIF'] = [variance_inflation_factor(X.values, i) for i in range(X.shape[1])]

print("=== MULTICOLLINEARITY CHECK (VIF) ===")
print(vif_data)
print(f"\nInterpretation:")
print(f"  VIF = 1: No correlation")
print(f"  VIF < 5: Acceptable")
print(f"  VIF > 5: Moderate multicollinearity")
print(f"  VIF > 10: High multicollinearity (problematic)")

max_vif = vif_data['VIF'].max()
if max_vif < 5:
    print(f"\n✓ All VIF values < 5: No multicollinearity issues")
elif max_vif < 10:
    print(f"\n⚠️ Max VIF = {max_vif:.2f}: Moderate multicollinearity")
else:
    print(f"\n⚠️ Max VIF = {max_vif:.2f}: High multicollinearity detected")
    print("   Consider: Removing correlated features, PCA, Ridge regression")

# Correlation matrix visualization
plt.figure(figsize=(8, 6))
sns.heatmap(X.corr(), annot=True, cmap='coolwarm', center=0)
plt.title('Feature Correlation Matrix')
plt.tight_layout()
plt.show()

## 8. Summary and Recommendations

In [None]:
print("="*70)
print("LINEAR REGRESSION ASSUMPTIONS VALIDATION SUMMARY")
print("="*70)

# Compile results
results = {
    'Linearity': 'Check correlation coefficients above',
    'Independence (Durbin-Watson)': f'{dw_statistic:.4f} (ideal ≈ 2)',
    'Homoscedasticity (Breusch-Pagan p-value)': f'{bp_pvalue:.4f} (> 0.05 is good)',
    'Normality (Shapiro-Wilk p-value)': f'{shapiro_pvalue:.4f} (> 0.05 is good)',
    'Multicollinearity (Max VIF)': f'{max_vif:.2f} (< 5 is good)'
}

print("\n=== ASSUMPTION CHECKS ===")
for assumption, result in results.items():
    print(f"{assumption}: {result}")

print("\n=== RECOMMENDATIONS ===")

recommendations = []

if dw_statistic < 1.5 or dw_statistic > 2.5:
    recommendations.append("- Autocorrelation detected: Consider adding lag features or using ARIMA")

if bp_pvalue < 0.05:
    recommendations.append("- Heteroscedasticity detected: Consider log transformation or WLS")

if shapiro_pvalue < 0.05:
    recommendations.append("- Non-normal residuals: Consider robust regression or transformation")

if max_vif > 5:
    recommendations.append("- Multicollinearity detected: Consider removing correlated features")

if abs(skewness) > 1:
    recommendations.append(f"- High skewness ({skewness:.2f}): Consider log transformation of target")

if len(outliers_iqr) > 0:
    recommendations.append(f"- {len(outliers_iqr)} outliers detected: Consider capping or robust methods")

if recommendations:
    for rec in recommendations:
        print(rec)
else:
    print("✓ All assumptions reasonably satisfied. Linear Regression is appropriate.")

print("\n=== ALTERNATIVE MODELS TO CONSIDER ===")
print("If assumptions are violated:")
print("  1. Ridge/Lasso Regression - handles multicollinearity")
print("  2. Robust Regression - handles outliers and non-normality")
print("  3. ARIMA/SARIMA - handles autocorrelation in time series")
print("  4. Random Forest - non-parametric, no assumptions required")
print("  5. Gradient Boosting - handles complex non-linear patterns")

In [None]:
# Final diagnostic plots (4-in-1)
fig, axes = plt.subplots(2, 2, figsize=(14, 10))

# 1. Residuals vs Fitted
axes[0, 0].scatter(predictions, residuals, alpha=0.5)
axes[0, 0].axhline(y=0, color='r', linestyle='--')
axes[0, 0].set_xlabel('Fitted Values')
axes[0, 0].set_ylabel('Residuals')
axes[0, 0].set_title('Residuals vs Fitted')

# 2. Q-Q Plot
stats.probplot(residuals, dist="norm", plot=axes[0, 1])
axes[0, 1].set_title('Normal Q-Q')

# 3. Scale-Location
axes[1, 0].scatter(predictions, np.sqrt(np.abs(residuals)), alpha=0.5)
axes[1, 0].set_xlabel('Fitted Values')
axes[1, 0].set_ylabel('√|Standardized Residuals|')
axes[1, 0].set_title('Scale-Location')

# 4. Residuals vs Leverage (Cook's Distance)
influence = model.get_influence()
leverage = influence.hat_matrix_diag
cooks_d = influence.cooks_distance[0]

axes[1, 1].scatter(leverage, residuals, alpha=0.5)
axes[1, 1].axhline(y=0, color='r', linestyle='--')
axes[1, 1].set_xlabel('Leverage')
axes[1, 1].set_ylabel('Residuals')
axes[1, 1].set_title('Residuals vs Leverage')

# Highlight high Cook's distance points
high_cooks = cooks_d > 4 / len(cooks_d)
if high_cooks.sum() > 0:
    axes[1, 1].scatter(leverage[high_cooks], residuals.values[high_cooks], 
                       color='red', s=100, label='High influence')
    axes[1, 1].legend()

plt.suptitle('Regression Diagnostic Plots', fontsize=14, y=1.02)
plt.tight_layout()
plt.show()

print(f"\nInfluential observations (Cook's D > 4/n): {high_cooks.sum()}")