#**Project: Retail Sales Trend Analysis & Forecasting using German Federal Bank Data**

# Notebook 02 ‚Äì Data Cleaning & Feature Engineering

# **Objective**

Transform raw Bundesbank retail time series into a clean, analysis-ready dataset with features for downstream analysis and forecasting.

# **1Ô∏è‚É£ Imports & Config**




In [14]:
import pandas as pd
import numpy as np
import logging
import sys

# Logging setup
logging.basicConfig(
    level=logging.INFO,
    format="%(asctime)s | %(levelname)s | %(message)s"
)
logger = logging.getLogger("Notebook02_Cleaning")


# **2Ô∏è‚É£ Load Raw Dataset**

In [15]:
df_raw = pd.read_csv(
    "/content/drive/MyDrive/retaildataanalysis/dataset/cleaned_data.csv",parse_dates=['date']

)
logger.info(f"Raw dataset loaded: {df_raw.shape[0]} rows, {df_raw.shape[1]} columns")


In [16]:
print(df_raw.dtypes)

date            datetime64[ns]
retail_index           float64
flags                   object
dtype: object


In [17]:
df_raw.head(10)

Unnamed: 0,date,retail_index,flags
0,1994-01-01,77.0,
1,1994-02-01,74.0,
2,1994-03-01,85.5,
3,1994-04-01,82.5,
4,1994-05-01,81.3,
5,1994-06-01,78.5,
6,1994-07-01,80.5,
7,1994-08-01,79.4,
8,1994-09-01,82.4,
9,1994-10-01,86.8,


# **3Ô∏è‚É£ Cleaning Pipeline**

In [18]:
def cleaning_pipeline(df_raw, logger):
    df = df_raw.copy()

    # Convert date
    df["date"] = pd.to_datetime(df["date"], errors="coerce")
    invalid_dates = df["date"].isna().sum()
    if invalid_dates > 0:
        logger.warning(f"{invalid_dates} rows have invalid dates")

    # Convert retail_index
    df["retail_index"] = pd.to_numeric(df["retail_index"], errors="coerce")
    invalid_index = df["retail_index"].isna().sum()
    if invalid_index > 0:
        logger.warning(f"{invalid_index} retail_index values could not be converted")

    # Drop invalid core values (ENFORCEMENT)
    before = len(df)
    df = df.dropna(subset=["date", "retail_index"])
    dropped = before - len(df)

    if dropped > 0:
        logger.info(f"Dropped {dropped} rows due to invalid core values")

    # üîπ Enforce unique monthly index
    if df["date"].duplicated().any():
        dup_count = df["date"].duplicated().sum()
        logger.warning(
            f"{dup_count} duplicate dates detected ‚Äî aggregating by mean"
        )

        df = (
            df
            .groupby("date", as_index=False)
            .agg({"retail_index": "mean"})
        )

    # Sort and check monotonicity
    df = df.sort_values("date").reset_index(drop=True)

    if not df["date"].is_monotonic_increasing:
        logger.error("Dates are not monotonic after sorting")

    # Negative values check (log only, do not auto-fix)
    if (df["retail_index"] < 0).any():
        logger.warning("Negative retail_index values detected")

    return df


# **4Ô∏è‚É£ Feature Engineering Pipeline**

In [19]:
def feature_engineering(df):
    df = df.copy()

    # Time features
    df['year'] = df['date'].dt.year
    df['month'] = df['date'].dt.month
    df['month_name'] = df['date'].dt.month_name()
    df['quarter'] = df['date'].dt.quarter
    df['day_of_week'] = df['date'].dt.day_name()

    # Trend smoothing
    df['rolling_12m_avg'] = df['retail_index'].rolling(12, min_periods=1).mean()
    df['ema_12m'] = df['retail_index'].ewm(span=12, adjust=False).mean()

    # Growth metrics
    df['yoy_growth_pct'] = df['retail_index'].pct_change(12) * 100
    df['mom_growth_pct'] = df['retail_index'].pct_change(1) * 100

    return df


# **5Ô∏è‚É£ Run Pipelines**

In [20]:
df_clean = cleaning_pipeline(df_raw, logger)
df_features = feature_engineering(df_clean)
logger.info("Cleaning and feature engineering completed")



# **6Ô∏è‚É£ HTML Validation Report**

In [21]:
def export_validation_report(df, path="/content/drive/MyDrive/retaildataanalysis/dataset/outputs/validation_report.html"):
    report = pd.DataFrame({
        "column": df.columns,
        "missing_values": df.isna().sum(),
        "negative_values": [(df[col] < 0).sum() if pd.api.types.is_numeric_dtype(df[col]) else 0 for col in df.columns]
    })
    report.to_html(path, index=False)
    logger.info(f"Validation report saved to {path}")

export_validation_report(df_features)


# **7Ô∏è‚É£ Unit Tests**

In [22]:
def test_cleaned_data(df):
    assert df['retail_index'].notna().all(), "Retail index has missing values"
    assert (df['retail_index'] >= 0).all(), "Negative retail index values detected"
    assert df['date'].dtype == 'datetime64[ns]', "Date column is not datetime"

def test_features(df):
    for col in ['rolling_12m_avg', 'ema_12m', 'yoy_growth_pct', 'mom_growth_pct']:
        assert col in df.columns, f"{col} not found in DataFrame"
def test_unique_dates(df):
    assert not df["date"].duplicated().any(), "Duplicate dates detected"
test_cleaned_data(df_features)
test_features(df_features)
test_unique_dates(df_features)
logger.info("All unit tests passed ‚úÖ")


# **8Ô∏è‚É£ Save Analysis-Ready Dataset**

In [23]:
df_features.to_csv("/content/drive/MyDrive/retaildataanalysis/dataset/outputs/retail_analysis_ready.csv", index=False)
logger.info("Analysis-ready dataset saved for KPI analysis and forecasting")
