# Feature Engineering and Preprocessing

This notebook prepares the analytics-ready dataset for machine learning modeling using a **hybrid PySpark–pandas workflow**.

PySpark is used for scalable data loading, joining fact and dimension tables, and handling operations that benefit from distributed processing. After these steps, the dataset is converted to pandas for feature engineering and preprocessing steps that integrate naturally with common machine learning libraries.

This design allows us to demonstrate the use of big data tools while maintaining compatibility with standard modeling workflows.

## Feature Engineering Workflow

The preprocessing pipeline in this notebook follows these stages:

1. Load fact and dimension tables using PySpark
2. Join fact and dimension tables into a single Spark DataFrame
3. Remove columns with extreme missingness using Spark operations
4. Validate schema and record counts
5. Convert the Spark DataFrame to pandas for downstream processing
6. Define the target variable for modeling
7. Handle missing values and data inconsistencies
8. Transform skewed numerical features
9. Encode categorical variables
10. Split the data into training and testing sets

Each step is documented to ensure transparency, reproducibility, and alignment with both big data processing and machine learning requirements.

In [2]:
import os
import sys
import numpy as np
import pandas as pd

from pyspark.sql import SparkSession
from pyspark.sql.functions import col, when
from pyspark.sql.types import DoubleType

In [3]:
spark = (
    SparkSession.builder
    .appName("FeatureEngineering")
    .getOrCreate()
)

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
26/01/09 05:25:07 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


To avoid duplicating path-handling logic across notebooks, a helper module (`data_utils.py`) is used.

In this notebook, the `data_dir()` function is used to reliably resolve the relative path to the raw data directory. This approach improves reproducibility and keeps environment-specific logic out of the analysis code.


In [12]:
# Ensure workspace root is on sys.path so `src` package is importable
workspace_root = os.path.abspath('..')

if workspace_root not in sys.path:
    sys.path.insert(0, workspace_root)
    
import src.data_utils as du

# Resolve DATA_DIR from the package helper and expose as variable
DATA_DIR = du.data_dir()
DATA_DIR

'../data/raw'

Below, the raw fact and dimension tables are loaded from CSV files using **PySpark**.

Using Spark at this stage allows us to demonstrate scalable data ingestion and joining logic that would generalize to larger-than-memory datasets. These tables are then joined to recreate the integrated dataset used for feature engineering.


In [13]:
# Load the joined, analytics-ready dataset
fact_sales = spark.read.option("header", True).option("inferSchema", True).csv(
    os.path.join(DATA_DIR, "FactInternetSales.csv")
)

# Load key dimension tables
dim_product = spark.read.option("header", True).option("inferSchema", True).csv(
    os.path.join(DATA_DIR, "DimProduct.csv")
)

dim_customer = spark.read.option("header", True).option("inferSchema", True).csv(
    os.path.join(DATA_DIR, "DimCustomer.csv")
)

dim_date = spark.read.option("header", True).option("inferSchema", True).csv(
    os.path.join(DATA_DIR, "DimDate.csv")
)

In [14]:
# Join fact table with dimension tables
sales_df = (
    fact_sales
    .join(dim_product, on="ProductKey", how="left")
    .join(dim_customer, on="CustomerKey", how="left")
    .join(dim_date, fact_sales.OrderDateKey == dim_date.DateKey, how="left")
)

In [15]:
# Drop columns with extreme missingness (Spark-side)
row_count = sales_df.count()

missing_ratios = {
    c: sales_df.filter(col(c).isNull()).count() / row_count
    for c in sales_df.columns
}

cols_to_drop = [c for c, r in missing_ratios.items() if r >= 0.99]

sales_df = sales_df.drop(*cols_to_drop)


In [10]:
# Inspect schema and row count (Spark DataFrame)
sales_df.printSchema()
sales_df.count()


root
 |-- CustomerKey: integer (nullable = true)
 |-- ProductKey: integer (nullable = true)
 |-- OrderDateKey: integer (nullable = true)
 |-- DueDateKey: integer (nullable = true)
 |-- ShipDateKey: integer (nullable = true)
 |-- PromotionKey: integer (nullable = true)
 |-- CurrencyKey: integer (nullable = true)
 |-- SalesTerritoryKey: integer (nullable = true)
 |-- SalesOrderNumber: string (nullable = true)
 |-- SalesOrderLineNumber: integer (nullable = true)
 |-- RevisionNumber: integer (nullable = true)
 |-- OrderQuantity: integer (nullable = true)
 |-- UnitPrice: double (nullable = true)
 |-- ExtendedAmount: double (nullable = true)
 |-- UnitPriceDiscountPct: integer (nullable = true)
 |-- DiscountAmount: integer (nullable = true)
 |-- ProductStandardCost: double (nullable = true)
 |-- TotalProductCost: double (nullable = true)
 |-- SalesAmount: double (nullable = true)
 |-- TaxAmt: double (nullable = true)
 |-- Freight: double (nullable = true)
 |-- CarrierTrackingNumber: string (n

60398

In [16]:
# Convert Spark DataFrame to pandas for downstream processing
sales_df = sales_df.toPandas()

## Dataset Reload Validation

After joining the fact and dimension tables using PySpark, the resulting Spark DataFrame is inspected to confirm schema correctness and row counts.

Once this validation step is complete, the dataset is converted to a pandas DataFrame. All subsequent feature engineering and preprocessing steps are performed in pandas to ensure compatibility with standard machine learning libraries.

## Feature Categorization — How We Decide


Before performing any transformations, features are grouped into categories based on their role in the business process and their availability at prediction time.

The guiding principle used for this categorization is:

> *A feature may only be used for prediction if it would be known at the time the prediction is made.*

Each category below documents **why** certain columns belong to it.

---

### 1. Identifier and Technical Columns

**Definition:**  
Columns that uniquely identify records or entities but do not carry predictive meaning.

**Examples:**  
- ProductKey  
- CustomerKey  
- SalesOrderNumber  

**Reasoning:**  
These columns act as database identifiers rather than informative attributes. Their numeric values are arbitrary and are unlikely to contribute meaningful predictive signal. Including them would cause the model to learn meaningless patterns.

**Action:**  
Excluded from modeling.

---

### 2. Leakage-Prone (Post-Outcome) Columns

**Definition:**  
Columns that are calculated after the transaction completes or are derived directly from the target variable.

**Examples:**  
- ExtendedAmount  
- TotalProductCost  
- TaxAmt  

**Reasoning:**  
These values are only known *after* the sale has occurred and the final SalesAmount is determined. Including them would leak future information into the model, resulting in unrealistically high performance that would not generalize to real-world predictions.

**Action:**  
Excluded to prevent data leakage.

---

### 3. Numerical Predictive Features

**Definition:**  
Quantitative attributes that are known **before** or **at the time of the sale**.

**Examples:**  
- OrderQuantity  
- UnitPrice  
- UnitPriceDiscountPct  

**Reasoning:**  
These features directly influence the transaction outcome and are available at prediction time. They represent legitimate inputs that the model can reasonably use to estimate SalesAmount.

**Action:**  
Retained for modeling.

---

### 4. Categorical Descriptive Features

**Definition:**  
Non-numeric attributes describing products, customers, or time periods.

**Examples:**  
- ProductCategory  
- EnglishMonthName  
- CustomerGender  

**Reasoning:**  
Although not numerical, these features encode meaningful contextual information. They will be transformed into numerical representations using encoding techniques suitable for machine learning models.

**Action:**  
Retained and encoded in later steps.


## Target Variable Definition

The objective of this modeling task is to predict **SalesAmount** at the transaction (line-item) level based on product, customer, and time-related attributes.

`SalesAmount` is selected as the target variable because:
- It represents the primary business outcome (revenue)
- It is continuous and suitable for regression-based models
- It showed meaningful patterns and skewness during EDA
- It aligns naturally with retail and sales forecasting use cases

All remaining features will be evaluated as potential predictors of `SalesAmount`.


In [17]:
# Define target variable
TARGET_COL = "SalesAmount"

# Separate target variable
y = sales_df[TARGET_COL]

# Create initial feature set by dropping target
X = sales_df.drop(columns=[TARGET_COL])

# Basic validation
X.shape, y.shape

((60398, 77), (60398,))

## Feature Categorization — Applying the Decision Rules

The categories below illustrate how features are grouped based on the decision rules described above. At this stage, we identify representative examples rather than exhaustively classifying all columns. Final feature selection will be performed programmatically in subsequent steps.

To prepare the dataset for machine learning, features are grouped into the following categories:

### 1. Identifier and Technical Columns
These columns uniquely identify records or serve operational purposes and do not carry predictive value. Examples include:
- SalesOrderNumber
- SalesOrderLineNumber
- Surrogate keys (e.g., ProductKey, CustomerKey)

These will be excluded from modeling.

### 2. Leakage-Prone Columns
These columns are derived directly from or mathematically linked to the target variable (`SalesAmount`) and would cause data leakage if included. Examples include:
- ExtendedAmount
- TotalProductCost
- TaxAmt

These are excluded from the feature set prior to modeling.

### 3. Numerical Predictive Features
Continuous or discrete numerical attributes that may explain variations in sales, such as:
- OrderQuantity
- UnitPrice
- Discount-related fields
- Product cost attributes

These features may require transformation due to skewness or scale differences.

### 4. Categorical Features
Non-numerical attributes describing products, customers, and time, such as:
- Product categories
- Customer demographics
- Calendar attributes (Year, Month, Quarter)

These will be encoded into numerical form in later steps.

This structured categorization ensures transparent, defensible feature selection.


## Missing Value Analysis

Before applying any preprocessing steps, we assess the presence and extent of missing values across all features. This analysis helps determine whether missing data is negligible, requires imputation, or warrants column removal.

Understanding missingness patterns ensures that subsequent preprocessing decisions are data-driven and defensible.


In [18]:
# Calculate missing values per column
missing_counts = sales_df.isna().sum()

# Calculate percentage of missing values
missing_percent = (missing_counts / len(sales_df)) * 100

# Combine into a summary DataFrame
missing_summary = (
    pd.DataFrame({
        "missing_count": missing_counts,
        "missing_percent": missing_percent
    })
    .query("missing_count > 0")
    .sort_values(by="missing_percent", ascending=False)
)

missing_summary

Unnamed: 0,missing_count,missing_percent
AddressLine2,59293,98.170469
EndDate,54970,91.012947
WeightUnitMeasureCode,45193,74.825325
SizeUnitMeasureCode,45193,74.825325
Weight,45193,74.825325
Class,38946,64.482268
Size,37549,62.169277
Style,36092,59.756946
MiddleName,25495,42.211663
SpanishProductName,13135,21.747409


## Missing Value Interpretation and Handling Strategy

The missing value analysis reveals distinct patterns driven by the data warehouse schema and business logic rather than random data quality issues.

Key observations:

- Several columns exhibit near-total missingness (≈ 100%), particularly operational fields and descriptive date attributes. These columns provide no usable signal and will be removed.

- Product-related attributes such as size, weight, style, and class show high but systematic missingness, reflecting optional or category-specific characteristics. These features will be evaluated individually and either dropped or encoded with explicit missing indicators.

- A small number of attributes exhibit moderate missingness and may be retained through imputation or categorical encoding.

Based on these findings, features with extreme missingness will be removed prior to modeling, while others will be handled using controlled preprocessing techniques.


In [19]:
# Drop columns with extremely high missingness (>= 99%)
missing_threshold = 0.99

cols_to_drop = (
    missing_summary[missing_summary["missing_percent"] >= missing_threshold * 100]
    .index
    .tolist()
)

# Drop from feature set only (not target)
X = X.drop(columns=cols_to_drop)

X.shape

(60398, 77)

## Handling Remaining Missing Values

After removing columns with extreme missingness, the remaining missing values represent meaningful absence rather than data corruption.

These missing values are handled differently depending on feature type:

- **Numerical features**: Missing values are imputed using the median to preserve distribution robustness in the presence of skewness.
- **Categorical features**: Missing values are treated as an explicit category ("Unknown") to preserve information about absence.

This approach ensures no rows are dropped while maintaining model compatibility and interpretability.


In [20]:
# Identify remaining missing values after column removal
remaining_missing = X.isna().sum()
remaining_missing = remaining_missing[remaining_missing > 0]

remaining_missing.sort_values(ascending=False)


AddressLine2             59293
EndDate                  54970
WeightUnitMeasureCode    45193
SizeUnitMeasureCode      45193
Weight                   45193
Class                    38946
Size                     37549
Style                    36092
MiddleName               25495
SpanishProductName       13135
FrenchProductName        13135
Status                    5428
dtype: int64

In [21]:
# Separate numerical and categorical columns with missing values
num_missing_cols = X.select_dtypes(include=[np.number]).columns
num_missing_cols = [col for col in num_missing_cols if X[col].isna().any()]

cat_missing_cols = X.select_dtypes(exclude=[np.number]).columns
cat_missing_cols = [col for col in cat_missing_cols if X[col].isna().any()]

num_missing_cols, cat_missing_cols

(['Weight'],
 ['WeightUnitMeasureCode',
  'SizeUnitMeasureCode',
  'SpanishProductName',
  'FrenchProductName',
  'Size',
  'Class',
  'Style',
  'EndDate',
  'Status',
  'MiddleName',
  'AddressLine2'])

### Missing Value Treatment Strategy

Remaining missing values fall into two categories:

**Numerical Features**
- Missingness is handled using median imputation.
- Median is preferred over mean due to skewed distributions observed in EDA.

**Categorical Features**
- Missing values are replaced with the category "Unknown".
- This preserves information that the value was absent rather than discarding records.

This strategy avoids data loss while maintaining statistical stability.


In [22]:
# Impute numerical features with median
for col in num_missing_cols:
    X[col] = X[col].fillna(X[col].median())

# Impute categorical features with explicit label
for col in cat_missing_cols:
    X[col] = X[col].fillna("Unknown")

# Validate no remaining missing values
X.isna().sum().sum()

np.int64(0)

In [23]:
# Drop identifier, surrogate key, and leakage-prone columns
cols_to_drop = [
    # Leakage-prone
    "ExtendedAmount",
    "TotalProductCost",
    "TaxAmt",

    # Identifiers / surrogate keys
    "ProductKey",
    "CustomerKey",
    "PromotionKey",
    "CurrencyKey",
    "SalesTerritoryKey",
    "SalesOrderLineNumber",

    # Date keys (handled via DimDate attributes instead)
    "OrderDateKey",
    "DueDateKey",
    "ShipDateKey"
]

X = X.drop(columns=[col for col in cols_to_drop if col in X.columns])

X.shape


(60398, 65)

## Numerical Feature Transformation

After converting the dataset to pandas, numerical features are examined for skewness, as observed during the exploratory data analysis (EDA) phase.

To improve model performance and numerical stability, selected skewed numerical features are transformed using a logarithmic transformation.

Log transformation:

- Reduces skewness
- Compresses extreme values
- Improves compatibility with linear and distance-based models

Only strictly positive numerical features are transformed to avoid mathematical issues. Identifier fields and leakage-prone variables are excluded from transformation.


In [24]:
# Identify numerical features
numerical_cols = X.select_dtypes(include=[np.number]).columns

len(numerical_cols), numerical_cols

(21,
 Index(['RevisionNumber', 'OrderQuantity', 'UnitPrice', 'UnitPriceDiscountPct',
        'DiscountAmount', 'ProductStandardCost', 'Freight',
        'ProductSubcategoryKey', 'StandardCost', 'SafetyStockLevel',
        'ReorderPoint', 'ListPrice', 'Weight', 'DaysToManufacture',
        'DealerPrice', 'GeographyKey', 'YearlyIncome', 'TotalChildren',
        'NumberChildrenAtHome', 'HouseOwnerFlag', 'NumberCarsOwned'],
       dtype='object'))

In [25]:
# Calculate skewness of numerical features
skewness = X[numerical_cols].skew().sort_values(ascending=False)

skewness

ProductStandardCost      1.950547
StandardCost             1.950547
Freight                  1.927515
ListPrice                1.927515
UnitPrice                1.927515
DealerPrice              1.927515
NumberChildrenAtHome     1.281429
DaysToManufacture        1.144009
ReorderPoint             1.126946
SafetyStockLevel         1.126946
YearlyIncome             0.783957
GeographyKey             0.734562
Weight                   0.550952
TotalChildren            0.463057
NumberCarsOwned          0.402790
OrderQuantity            0.000000
RevisionNumber           0.000000
DiscountAmount           0.000000
UnitPriceDiscountPct     0.000000
ProductSubcategoryKey   -0.688218
HouseOwnerFlag          -0.823695
dtype: float64

### Skewness-Based Transformation Criteria

Numerical features with an absolute skewness greater than 1 are considered highly skewed and are selected for transformation.

This threshold balances correction of extreme distributions while preserving natural variability in approximately symmetric features.


In [26]:
# Select highly skewed numerical features
skewed_cols = skewness[abs(skewness) > 1].index.tolist()

len(skewed_cols), skewed_cols

(10,
 ['ProductStandardCost',
  'StandardCost',
  'Freight',
  'ListPrice',
  'UnitPrice',
  'DealerPrice',
  'NumberChildrenAtHome',
  'DaysToManufacture',
  'ReorderPoint',
  'SafetyStockLevel'])

In [27]:
# Apply log1p transformation to skewed numerical features
for col in skewed_cols:
    X[col] = np.log1p(X[col])

In [28]:
# Recalculate skewness after transformation
post_skewness = X[skewed_cols].skew().sort_values(ascending=False)

post_skewness

Freight                 1.169011
DaysToManufacture       1.144009
StandardCost            0.807867
ProductStandardCost     0.807867
NumberChildrenAtHome    0.807117
DealerPrice             0.774139
UnitPrice               0.740682
ListPrice               0.740682
ReorderPoint            0.296094
SafetyStockLevel        0.289817
dtype: float64

## Categorical Feature Encoding

After completing Spark-based data preparation and pandas-based feature engineering, categorical features are encoded into numerical form so they can be used by machine learning models.

Encoding strategy:

- Categorical features are one-hot encoded using pandas
- High-cardinality identifiers and free-text fields have already been removed
- Missing values have already been handled using explicit categories

This approach preserves categorical information while avoiding unintended ordinal relationships.

In [29]:
# Identify categorical features
categorical_cols = X.select_dtypes(exclude=[np.number]).columns

len(categorical_cols), categorical_cols


(44,
 Index(['SalesOrderNumber', 'CarrierTrackingNumber', 'CustomerPONumber',
        'OrderDate', 'DueDate', 'ShipDate', 'ProductAlternateKey',
        'WeightUnitMeasureCode', 'SizeUnitMeasureCode', 'EnglishProductName',
        'SpanishProductName', 'FrenchProductName', 'FinishedGoodsFlag', 'Color',
        'Size', 'SizeRange', 'ProductLine', 'Class', 'Style', 'ModelName',
        'EnglishDescription', 'StartDate', 'EndDate', 'Status',
        'CustomerAlternateKey', 'FirstName', 'MiddleName', 'LastName',
        'NameStyle', 'BirthDate', 'MaritalStatus', 'Gender', 'EmailAddress',
        'EnglishEducation', 'SpanishEducation', 'FrenchEducation',
        'EnglishOccupation', 'SpanishOccupation', 'FrenchOccupation',
        'AddressLine1', 'AddressLine2', 'Phone', 'DateFirstPurchase',
        'CommuteDistance'],
       dtype='object'))

In [30]:
cols_to_drop_cat = [
    "SalesOrderNumber",
    "ProductAlternateKey",
    "CustomerAlternateKey",
    "EmailAddress",
    "Phone",
    "OrderDate",
    "DueDate",
    "ShipDate",
    "StartDate",
    "EndDate",
    "BirthDate",
    "DateFirstPurchase",
    "EnglishDescription",
    "ModelName",
    "AddressLine1",
    "AddressLine2",
    "FirstName",
    "MiddleName",
    "LastName"
]

X = X.drop(columns=cols_to_drop_cat)
X.shape

(60398, 46)

In [31]:
# Re-identify categorical columns after cleanup
categorical_cols = X.select_dtypes(exclude=[np.number]).columns

len(categorical_cols), categorical_cols


(25,
 Index(['CarrierTrackingNumber', 'CustomerPONumber', 'WeightUnitMeasureCode',
        'SizeUnitMeasureCode', 'EnglishProductName', 'SpanishProductName',
        'FrenchProductName', 'FinishedGoodsFlag', 'Color', 'Size', 'SizeRange',
        'ProductLine', 'Class', 'Style', 'Status', 'NameStyle', 'MaritalStatus',
        'Gender', 'EnglishEducation', 'SpanishEducation', 'FrenchEducation',
        'EnglishOccupation', 'SpanishOccupation', 'FrenchOccupation',
        'CommuteDistance'],
       dtype='object'))

In [32]:
# One-hot encode categorical features
X_encoded = pd.get_dummies(
    X,
    columns=categorical_cols,
    drop_first=True
)

X_encoded.shape

(60398, 464)

## Train–Test Split

The final step in preprocessing is to split the dataset into training and testing subsets.

This ensures that model performance is evaluated on unseen data and helps prevent overfitting.

A fixed random state is used to ensure reproducibility.

In [33]:
from sklearn.model_selection import train_test_split

X_train, X_test, y_train, y_test = train_test_split(
    X_encoded,
    y,
    test_size=0.2,
    random_state=42
)

X_train.shape, X_test.shape, y_train.shape, y_test.shape

((48318, 464), (12080, 464), (48318,), (12080,))

## Persist Preprocessed Data

To ensure reproducibility and separation of concerns, the final training and testing datasets are saved to disk and reloaded in the modeling notebook.


In [34]:
# Create processed data directory if it doesn't exist
processed_dir = os.path.join(DATA_DIR, "..", "processed")
os.makedirs(processed_dir, exist_ok=True)

# Save train-test splits
X_train.to_csv(os.path.join(processed_dir, "X_train.csv"), index=False)
X_test.to_csv(os.path.join(processed_dir, "X_test.csv"), index=False)
y_train.to_csv(os.path.join(processed_dir, "y_train.csv"), index=False)
y_test.to_csv(os.path.join(processed_dir, "y_test.csv"), index=False)

## Preprocessing Summary and Next Steps

This notebook prepared the raw sales data into a clean, model-ready feature matrix using a **hybrid big data and machine learning workflow**.

### Key preprocessing outcomes:

- **Big data processing**:
  - Data loaded and joined using PySpark
  - Columns with extreme missingness removed at the Spark level
- **Transition to pandas**:
  - Dataset converted to pandas for feature engineering and modeling compatibility
- **Target variable**:
  - `SalesAmount` (continuous, line-item revenue)
- **Feature selection**:
  - Removed identifier and surrogate key columns
  - Excluded leakage-prone, post-outcome variables
  - Removed high-cardinality and non-predictive text fields
- **Missing value handling**:
  - Numerical features imputed using median values
  - Categorical features imputed using an explicit `"Unknown"` category
  - No rows were dropped during preprocessing
- **Numerical transformations**:
  - Skewed numerical features transformed using `log1p`
- **Categorical encoding**:
  - One-hot encoding applied with `drop_first=True`
  - Final encoded feature space contains **464 features**
- **Train–test split**:
  - 80% training / 20% testing
  - Fixed random state for reproducibility

### Final dataset shapes:

- Training set: `(48,318, 464)`
- Test set: `(12,080, 464)`
