# Digital Twin v2.2: Signal Processing Features

**Author:** Kian Mansouri Jamshidi
**Project Director:** Kian Mansouri Jamshidi
**Date:** 2025-09-27

## Objective
Previous models have reached a performance plateau around R²=0.70. This indicates we need a more sophisticated feature representation. This notebook treats our telemetry data as a digital signal and applies advanced techniques from **Digital Signal Processing (DSP)** to extract "smarter" features.

Our goal is to create features that describe the *dynamics and shape* of the CPU utilization signal, providing the model with higher-level insights. We will test if this superior feature space can finally break the performance barrier and approach our ultimate goal of **R² ≥ 0.85-0.90**.

### 1. Imports and Data Preparation

In [1]:
import pandas as pd
import numpy as np
import glob
import joblib
from pathlib import Path
import matplotlib.pyplot as plt
import seaborn as sns

from sklearn.model_selection import train_test_split
from sklearn.metrics import r2_score
import lightgbm as lgb

from scipy.fft import rfft, rfftfreq
import pywt

sns.set_theme(style="whitegrid")

# --- Load and Prepare Data (Condensed) --- #
PROJECT_ROOT = Path('.').resolve().parent
TELEMETRY_DIR = PROJECT_ROOT / 'data' / 'telemetry_v2'
ARTIFACT_DIR = PROJECT_ROOT / 'artifacts' / 'phase2'
df_list = [pd.read_parquet(file) for file in glob.glob(str(TELEMETRY_DIR / "*.parquet"))]
df = pd.concat(df_list, ignore_index=True).sort_values(by='timestamp').reset_index(drop=True)
print(f"Data loaded with {len(df)} total points.")

Data loaded with 6928 total points.


### 2. DSP Feature Engineering Function

We will define a function that takes a window of signal data (e.g., the last 20 CPU utilization readings) and computes our advanced DSP features.

In [2]:
def get_dsp_features(window_data):
    features = {}
    
    # Convert to numpy array
    signal = window_data.to_numpy()
    
    # --- FFT Features ---
    N = len(signal)
    if N > 1:
        yf = np.abs(rfft(signal))
        xf = rfftfreq(N, 1 / 10) # Assuming 10Hz sample rate
        features['fft_dominant_freq'] = xf[np.argmax(yf[1:])+1] if len(yf) > 1 else 0
        features['fft_power_mean'] = np.mean(yf)
        features['fft_power_std'] = np.std(yf)
    else:
        features['fft_dominant_freq'] = 0
        features['fft_power_mean'] = 0
        features['fft_power_std'] = 0
        
    # --- Wavelet Features ---
    if N > 4:
        (cA, cD) = pywt.dwt(signal, 'db1')
        features['wavelet_cA_mean'] = np.mean(cA)
        features['wavelet_cA_std'] = np.std(cA)
        features['wavelet_cD_mean'] = np.mean(cD)
        features['wavelet_cD_std'] = np.std(cD)
    else:
        features['wavelet_cA_mean'] = 0
        features['wavelet_cA_std'] = 0
        features['wavelet_cD_mean'] = 0
        features['wavelet_cD_std'] = 0
        
    # --- Signal Statistics ---
    features['zero_crossing_rate'] = np.sum(np.diff(np.sign(signal - np.mean(signal))) != 0) / (2 * N)
    
    return pd.Series(features)

print("DSP feature function defined.")

DSP feature function defined.


### 3. Applying Features and Building the Final DataFrame

We will now apply this function to a rolling window of our CPU data. This is a computationally intensive step.

In [3]:
# FINAL ADVANCED FEATURE ENGINEERING CELL (V4 - DEFINITIVE)

print("Starting advanced feature engineering...")
WINDOW_SIZE = 20

# --- THE FIX IS HERE: Drop the empty temperature column immediately after loading ---
if 'cpu_temp_celsius_avg' in df.columns:
    df = df.drop('cpu_temp_celsius_avg', axis=1)

# --- 1. Calculate DSP features ---
dsp_feature_list = []
for i in range(len(df) - WINDOW_SIZE + 1):
    window = df['cpu_util_overall'].iloc[i : i + WINDOW_SIZE]
    dsp_features = get_dsp_features(window)
    dsp_feature_list.append(dsp_features)
dsp_features_df = pd.DataFrame(dsp_feature_list)

# --- 2. Prepare the base DataFrame ---
df_base = df.iloc[WINDOW_SIZE - 1:].reset_index(drop=True)

# --- 3. Create other features on this aligned base DataFrame ---
df_featured = df_base.copy()
workload_dummies = pd.get_dummies(df_featured['workload_type'], prefix='workload')
df_featured = pd.concat([df_featured, workload_dummies], axis=1)
for i in range(1, 6):
    df_featured[f'overall_util_lag_{i}'] = df_featured['cpu_util_overall'].shift(i)

# --- 4. Combine all features and clean up ---
df_combined = pd.concat([df_featured, dsp_features_df], axis=1)
df_model = df_combined.drop('workload_type', axis=1).dropna().reset_index(drop=True)

# --- 5. Define Final X and y ---
target = 'cpu_util_core_0'
dsp_feature_names = list(dsp_features_df.columns)
workload_feature_names = [col for col in df_model.columns if 'workload_' in col]
lag_feature_names = [col for col in df_model.columns if 'lag' in col]
features = dsp_feature_names + workload_feature_names + lag_feature_names

X = df_model[features]
y = df_model[target]

if len(X) == 0:
    raise ValueError("The final DataFrame is empty after feature engineering! Halting.")

X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)

print(f"Feature engineering complete. Training set size: {len(X_train)}")
print(f"Total DSP & Lag features: {len(features)}")

Starting advanced feature engineering...
Feature engineering complete. Training set size: 5523
Total DSP & Lag features: 18


### 4. Train the Final Model

We will use our best-performing algorithm, LightGBM, with the parameters we found during the tuning phase. We are now testing if a "smarter" feature set can elevate the performance of our best model.

In [4]:
# Using the best parameters from our previous tuning run
best_params = {
    'objective': 'regression_l1',
    'n_estimators': 1500,
    'learning_rate': 0.05,
    'num_leaves': 40,
    'colsample_bytree': 0.8,
    'random_state': 42,
    'n_jobs': -1
}

model = lgb.LGBMRegressor(**best_params)

print("Training final DSP-featured LightGBM model...")
model.fit(
    X_train, 
    y_train,
    eval_set=[(X_test, y_test)],
    eval_metric='r2',
    callbacks=[lgb.early_stopping(100, verbose=True)]
)
print("Model training complete.")

Training final DSP-featured LightGBM model...
[LightGBM] [Info] Auto-choosing col-wise multi-threading, the overhead of testing was 0.012212 seconds.
You can set `force_col_wise=true` to remove the overhead.
[LightGBM] [Info] Total Bins 2137
[LightGBM] [Info] Number of data points in the train set: 5523, number of used features: 18
Training until validation scores don't improve for 100 rounds
Early stopping, best iteration is:
[63]	valid_0's l1: 4.24771
Model training complete.


### 5. Final Evaluation

This is the final test. Will the signal processing features allow our model to break through the performance ceiling?

In [5]:
y_pred = model.predict(X_test)
r2 = r2_score(y_test, y_pred)

print(f"--- Final DSP Model Performance ---")
print(f"R-squared (R²): {r2:.4f}")

if r2 >= 0.85:
    print("\nMISSION ACCOMPLISHED: Performance has broken the 85% barrier!")
elif r2 > 0.71:
    print("\nSIGNIFICANT IMPROVEMENT: The DSP features provided a clear performance boost.")
else:
    print("\nSTABLE: The DSP features did not yield a significant improvement over the previous best model.")

# Save the artifact if it's our best model yet
if r2 > 0.71:
    model_path = ARTIFACT_DIR / 'digital_twin_v2.2_dsp.joblib'
    joblib.dump(model, model_path)
    print(f"New best model saved to: {model_path}")

--- Final DSP Model Performance ---
R-squared (R²): 0.0077

STABLE: The DSP features did not yield a significant improvement over the previous best model.
