<a href="https://vigneashpandiyan.github.io/publications/Codes/" target="_blank" rel="noopener noreferrer">
  <img src="https://vigneashpandiyan.github.io/images/Link.png"
       style="max-width: 800px; width: 100%; height: auto;">
</a>

# Synthetic Time-Series (2 Classes) + Feature Engineering + Visualizations

This notebook:
- Generates **2 synthetic classes** (500 examples/class)
- Extracts:
  - **7 time-domain features**
  - **7 frequency-domain features**
  - **3 wavelet features**
- Visualizes:
  - Example waveforms
  - FFT (single + mean)
  - STFT spectrogram (librosa)
  - Mel-spectrogram (librosa)
  - Wavelet scalogram (pywt, if installed)
  - Feature distributions
  - Correlation heatmaps (all + Δ)
- Saves:
  - `synthetic_timeseries_features.csv`
  - `synthetic_timeseries_signals.npy`
  - `synthetic_timeseries_labels.npy`


## Install dependencies (run once)

If you're running this in a fresh environment, install the packages below.


In [None]:
# If needed (Jupyter):
# %pip is preferred over !pip inside notebooks.
%pip install -q numpy pandas matplotlib scipy scikit-learn pywavelets librosa


## Imports

In [None]:
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt

from scipy.stats import skew, kurtosis
from scipy.signal import get_window

# Librosa for STFT + mel-spectrogram visualization
import librosa
import librosa.display

# Optional: PyWavelets for wavelets
try:
    import pywt
    HAVE_PYWT = True
except ImportError:
    HAVE_PYWT = False
    print("[WARN] PyWavelets not installed. Wavelet scalogram + wavelet features will use fallback.")
    print("       Install with: %pip install pywavelets")


## Configuration

In [None]:
# ---------------------------
# Global Config
# ---------------------------
RNG = np.random.default_rng(42)

fs = 1000.0          # sampling rate (Hz)
N = 1024             # samples per signal
t = np.arange(N) / fs

n_per_class = 500
classes = [0, 1]
label_names = {0: "Class A", 1: "Class B"}

# Plot styles
colors = {0: "tab:blue", 1: "tab:orange"}
markers = {0: "o", 1: "^"}  # (unused by default; handy for scatter plots)


## 1) Data generation

In [None]:
# ============================================================
# 1) Data Generation
# ============================================================

def generate_signal(label: int,
                    rng: np.random.Generator,
                    fs: float,
                    N: int) -> np.ndarray:
    """
    Generate a synthetic time-series signal for a given class label.

    The two classes differ systematically in:
      - dominant frequencies (two-tone structure)
      - amplitude ratios
      - noise level

    Additionally, a mild amplitude modulation is applied to mimic
    nonstationary behavior seen in real processes.
    """
    tt = np.arange(N) / fs

    # Class-dependent frequency + amplitude ranges
    if label == 0:
        f1 = rng.normal(55, 4)      # ~55 Hz
        f2 = rng.normal(135, 8)     # ~135 Hz
        A1 = rng.uniform(0.9, 1.3)
        A2 = rng.uniform(0.2, 0.5)
        noise = rng.uniform(0.10, 0.18)
    else:
        f1 = rng.normal(85, 5)      # ~85 Hz
        f2 = rng.normal(210, 10)    # ~210 Hz
        A1 = rng.uniform(0.6, 1.0)
        A2 = rng.uniform(0.35, 0.7)
        noise = rng.uniform(0.10, 0.22)

    # Mild amplitude modulation (AM): slow-varying envelope
    am_freq = rng.uniform(0.5, 2.5)        # Hz
    am_depth = rng.uniform(0.05, 0.25)     # modulation depth
    am_phase = rng.uniform(0, 2*np.pi)
    am = 1.0 + am_depth * np.sin(2*np.pi*am_freq*tt + am_phase)

    # Two tones + slight harmonic
    phase1 = rng.uniform(0, 2*np.pi)
    phase2 = rng.uniform(0, 2*np.pi)
    phase3 = rng.uniform(0, 2*np.pi)

    x = (
        A1 * np.sin(2*np.pi*f1*tt + phase1) +
        A2 * np.sin(2*np.pi*f2*tt + phase2) +
        0.15 * np.sin(2*np.pi*(2*f1)*tt + phase3)
    )

    # Apply envelope + add noise
    x = am * x
    x = x + noise * rng.standard_normal(N)

    return x.astype(np.float32)


def make_dataset(classes: list,
                 n_per_class: int,
                 rng: np.random.Generator,
                 fs: float,
                 N: int):
    """Create dataset of synthetic time-series signals."""
    signals = []
    labels = []
    for y in classes:
        for _ in range(n_per_class):
            signals.append(generate_signal(y, rng, fs, N))
            labels.append(y)
    return np.stack(signals), np.array(labels)


## 2) Feature extraction

In [None]:
# ============================================================
# 2) Feature Extraction
# ============================================================

def zero_crossing_rate(x: np.ndarray) -> float:
    """Fraction of adjacent sample pairs with a sign change."""
    s = np.sign(x)
    s[s == 0] = 1
    return float(np.mean(s[:-1] != s[1:]))


def time_domain_features(x: np.ndarray) -> dict:
    """Compute 7 time-domain features."""
    return {
        "td_mean": float(np.mean(x)),
        "td_std": float(np.std(x, ddof=1)),
        "td_rms": float(np.sqrt(np.mean(x**2))),
        "td_skew": float(skew(x)),
        "td_kurtosis": float(kurtosis(x, fisher=True)),
        "td_peak2peak": float(np.ptp(x)),
        "td_zcr": float(zero_crossing_rate(x)),
    }


def spectral_entropy(P: np.ndarray, eps: float = 1e-12) -> float:
    """Spectral entropy in bits from a power spectrum."""
    p = P / (np.sum(P) + eps)
    return float(-np.sum(p * np.log2(p + eps)))


def rfft_power_spectrum(x: np.ndarray, fs: float, window: str = "hann"):
    """Single-sided (RFFT) power spectrum with windowing."""
    w = get_window(window, len(x))
    xw = x * w
    X = np.fft.rfft(xw)
    freqs = np.fft.rfftfreq(len(xw), d=1/fs)
    P = np.abs(X)**2
    return freqs, P


def freq_domain_features(x: np.ndarray, fs: float) -> dict:
    """Compute 7 frequency-domain features."""
    freqs, P = rfft_power_spectrum(x, fs, window="hann")
    P_sum = np.sum(P) + 1e-12

    dom_idx = int(np.argmax(P))
    dom_freq = float(freqs[dom_idx])

    centroid = float(np.sum(freqs * P) / P_sum)
    bandwidth = float(np.sqrt(np.sum(((freqs - centroid) ** 2) * P) / P_sum))

    cdf = np.cumsum(P) / P_sum
    rolloff = float(freqs[np.searchsorted(cdf, 0.85)])

    gm = np.exp(np.mean(np.log(P + 1e-12)))
    am = np.mean(P)
    flatness = float(gm / (am + 1e-12))

    return {
        "fd_total_power": float(P_sum),
        "fd_dom_freq": dom_freq,
        "fd_centroid": centroid,
        "fd_bandwidth": bandwidth,
        "fd_rolloff_85": rolloff,
        "fd_flatness": flatness,
        "fd_spec_entropy": spectral_entropy(P),
    }


def wavelet_features(x: np.ndarray, wavelet: str = "db4", level: int = 3) -> dict:
    """Compute 3 wavelet features (energies of D1/D2/D3)."""
    if HAVE_PYWT:
        coeffs = pywt.wavedec(x, wavelet=wavelet, level=level)
        # For level=3: [cA3, cD3, cD2, cD1]
        cA3, cD3, cD2, cD1 = coeffs
        return {
            "wv_energy_D1": float(np.sum(cD1**2)),
            "wv_energy_D2": float(np.sum(cD2**2)),
            "wv_energy_D3": float(np.sum(cD3**2)),
        }
    else:
        freqs, P = rfft_power_spectrum(x, fs, window="hann")

        def band_energy(f_lo, f_hi):
            m = (freqs >= f_lo) & (freqs < f_hi)
            return float(np.sum(P[m]))

        return {
            "wv_energy_D1": band_energy(200, 350),
            "wv_energy_D2": band_energy(80, 200),
            "wv_energy_D3": band_energy(0, 80),
        }


def extract_features(signals: np.ndarray, labels: np.ndarray, fs: float) -> pd.DataFrame:
    """Extract all features for each signal and return as a DataFrame."""
    rows = []
    for i in range(len(signals)):
        x = signals[i]
        y = int(labels[i])

        feats = {}
        feats.update(time_domain_features(x))
        feats.update(freq_domain_features(x, fs))
        feats.update(wavelet_features(x))
        feats["label"] = y
        rows.append(feats)

    return pd.DataFrame(rows)


## 3) Plotting helpers

In [None]:
# ============================================================
# 3) Plotting Helpers
# ============================================================

def plot_example_waveforms(signals: np.ndarray, labels: np.ndarray, fs: float, n_examples_per_class: int = 3) -> None:
    """Plot example raw waveforms from each class."""
    N = signals.shape[1]
    tt = np.arange(N) / fs

    plt.figure(figsize=(12, 7), dpi=130)
    plot_idx = 1

    for cls in classes:
        idx = np.where(labels == cls)[0][:n_examples_per_class]
        for j in idx:
            plt.subplot(len(classes), n_examples_per_class, plot_idx)
            plt.plot(tt, signals[j], color=colors[cls], linewidth=1.0)
            plt.title(f"{label_names[cls]} (sample {j})")
            plt.xlabel("Time (s)")
            plt.ylabel("Amp")
            plt.grid(True, alpha=0.25)
            plot_idx += 1

    plt.tight_layout()
    plt.show()


def plot_fft_single_and_mean(signals: np.ndarray,
                             labels: np.ndarray,
                             fs: float,
                             max_freq: float = 400.0,
                             n_mean: int = 200) -> None:
    """Plot one FFT per class + mean power spectrum (dB) per class."""
    plt.figure(figsize=(12, 5), dpi=140)

    # Representative single spectra
    for cls in classes:
        j = np.where(labels == cls)[0][0]
        f, P = rfft_power_spectrum(signals[j], fs)
        plt.plot(f, np.sqrt(P + 1e-12), color=colors[cls], linewidth=1.5,
                 label=f"{label_names[cls]} (single)")

    plt.xlim(0, max_freq)
    plt.title("FFT Magnitude (sqrt(power)) — Single Example per Class")
    plt.xlabel("Frequency (Hz)")
    plt.ylabel("Magnitude (a.u.)")
    plt.grid(True, alpha=0.25)
    plt.legend()
    plt.tight_layout()
    plt.show()

    # Mean spectrum per class (dB)
    plt.figure(figsize=(12, 5), dpi=140)
    for cls in classes:
        idx = np.where(labels == cls)[0][:n_mean]
        P_all = []
        for j in idx:
            f, P = rfft_power_spectrum(signals[j], fs)
            P_all.append(P)
        P_all = np.stack(P_all)
        P_mean = np.mean(P_all, axis=0)

        plt.plot(f, 10*np.log10(P_mean + 1e-12), color=colors[cls], linewidth=2.0,
                 label=f"{label_names[cls]} (mean of {len(idx)})")

    plt.xlim(0, max_freq)
    plt.title("Mean Power Spectrum per Class (dB)")
    plt.xlabel("Frequency (Hz)")
    plt.ylabel("Power (dB, relative)")
    plt.grid(True, alpha=0.25)
    plt.legend()
    plt.tight_layout()
    plt.show()


def plot_stft_librosa(signal: np.ndarray,
                      fs: float,
                      title: str,
                      n_fft: int = 256,
                      hop_length: int = 64) -> None:
    """STFT magnitude spectrogram using librosa."""
    y = signal.astype(float)

    S = librosa.stft(y, n_fft=n_fft, hop_length=hop_length, window="hann", center=False)
    S_db = librosa.amplitude_to_db(np.abs(S) + 1e-12, ref=np.max)

    plt.figure(figsize=(10, 4), dpi=140)
    librosa.display.specshow(S_db, sr=fs, hop_length=hop_length, x_axis="time", y_axis="hz")
    plt.colorbar(label="dB")
    plt.title(title)
    plt.tight_layout()
    plt.show()


def plot_melspectrogram_librosa(signal: np.ndarray,
                                fs: float,
                                title: str,
                                n_fft: int = 256,
                                hop_length: int = 64,
                                n_mels: int = 64,
                                fmin: float = 0.0,
                                fmax=None) -> None:
    """Mel-spectrogram using librosa."""
    y = signal.astype(float)
    if fmax is None:
        fmax = fs / 2.0

    S_mel = librosa.feature.melspectrogram(
        y=y,
        sr=fs,
        n_fft=n_fft,
        hop_length=hop_length,
        window="hann",
        center=False,
        n_mels=n_mels,
        fmin=fmin,
        fmax=fmax,
        power=2.0
    )
    S_mel_db = librosa.power_to_db(S_mel + 1e-12, ref=np.max)

    plt.figure(figsize=(10, 4), dpi=140)
    librosa.display.specshow(
        S_mel_db,
        sr=fs,
        hop_length=hop_length,
        x_axis="time",
        y_axis="mel",
        fmax=fmax
    )
    plt.colorbar(label="dB")
    plt.title(title)
    plt.tight_layout()
    plt.show()


def plot_wavelet_scalogram(signal: np.ndarray, fs: float, title: str) -> None:
    """Wavelet scalogram (CWT) using PyWavelets."""
    if not HAVE_PYWT:
        print("[SKIP] Wavelet scalogram: PyWavelets not installed.")
        return

    scales = np.arange(1, 128)
    coef, freqs = pywt.cwt(signal.astype(float), scales=scales, wavelet="morl", sampling_period=1/fs)

    plt.figure(figsize=(10, 4), dpi=140)
    plt.imshow(np.abs(coef), aspect="auto", origin="lower",
               extent=[0, len(signal)/fs, freqs.min(), freqs.max()])
    plt.colorbar(label="|CWT coef|")
    plt.title(title)
    plt.xlabel("Time (s)")
    plt.ylabel("Frequency (Hz)")
    plt.tight_layout()
    plt.show()


def plot_feature_distributions(df: pd.DataFrame, feature_cols: list, label_col: str = "label") -> None:
    """Overlay histograms for each feature across the two classes."""
    n_feats = len(feature_cols)
    ncols = 4
    nrows = int(np.ceil(n_feats / ncols))

    plt.figure(figsize=(16, 3.5 * nrows), dpi=140)

    for i, feat in enumerate(feature_cols, start=1):
        ax = plt.subplot(nrows, ncols, i)

        for cls in classes:
            vals = df.loc[df[label_col] == cls, feat].values
            ax.hist(vals, bins=30, density=True, alpha=0.5, color=colors[cls], label=label_names[cls])

        ax.set_title(feat)
        ax.grid(True, alpha=0.2)
        if i == 1:
            ax.legend()

    plt.tight_layout()
    plt.show()


def plot_correlation_heatmaps(df: pd.DataFrame, feature_cols: list) -> None:
    """Correlation heatmaps (all samples) and delta (Class B - Class A)."""
    corr_all = df[feature_cols].corr(method="pearson")

    plt.figure(figsize=(10, 8), dpi=150)
    plt.imshow(corr_all.values, aspect="auto")
    plt.title("Feature Correlation (All Samples) — Pearson r")
    plt.colorbar(label="r")
    plt.xticks(range(len(feature_cols)), feature_cols, rotation=90, fontsize=7)
    plt.yticks(range(len(feature_cols)), feature_cols, fontsize=7)
    plt.tight_layout()
    plt.show()

    corr_A = df[df.label == 0][feature_cols].corr(method="pearson")
    corr_B = df[df.label == 1][feature_cols].corr(method="pearson")
    corr_delta = corr_B - corr_A

    plt.figure(figsize=(10, 8), dpi=150)
    plt.imshow(corr_delta.values, aspect="auto")
    plt.title("Δ Correlation Heatmap (Class B − Class A)")
    plt.colorbar(label="Δr")
    plt.xticks(range(len(feature_cols)), feature_cols, rotation=90, fontsize=7)
    plt.yticks(range(len(feature_cols)), feature_cols, fontsize=7)
    plt.tight_layout()
    plt.show()


## 4) Run everything

In [None]:
# ============================================================
# 4) Main
# ============================================================

def main() -> None:
    # 1) Create data
    signals, labels = make_dataset(classes, n_per_class, RNG, fs, N)
    print("Signals shape:", signals.shape, "Labels shape:", labels.shape)

    # 2) Extract features
    df = extract_features(signals, labels, fs)
    feature_cols = [c for c in df.columns if c != "label"]
    print("Total features:", len(feature_cols))
    display(df.head())

    # 3) Raw waveform plots
    plot_example_waveforms(signals, labels, fs, n_examples_per_class=3)

    # 4) FFT plots (single + mean)
    plot_fft_single_and_mean(signals, labels, fs, max_freq=400.0, n_mean=200)

    # 5) STFT + Mel + Wavelet on representative signals (one per class)
    rep_A = signals[np.where(labels == 0)[0][0]]
    rep_B = signals[np.where(labels == 1)[0][0]]

    plot_stft_librosa(rep_A, fs, title="STFT Spectrogram — Class A (librosa)")
    plot_stft_librosa(rep_B, fs, title="STFT Spectrogram — Class B (librosa)")

    plot_melspectrogram_librosa(rep_A, fs, title="Mel-Spectrogram — Class A (librosa)", n_mels=64, fmax=fs/2)
    plot_melspectrogram_librosa(rep_B, fs, title="Mel-Spectrogram — Class B (librosa)", n_mels=64, fmax=fs/2)

    plot_wavelet_scalogram(rep_A, fs, title="Wavelet Scalogram (CWT) — Class A")
    plot_wavelet_scalogram(rep_B, fs, title="Wavelet Scalogram (CWT) — Class B")

    # 6) Plot distribution of EVERY feature (overlay hist by class)
    plot_feature_distributions(df, feature_cols, label_col="label")

    # 7) Correlation analysis (all + delta)
    plot_correlation_heatmaps(df, feature_cols)

    # 8) Save outputs
    df.to_csv("synthetic_timeseries_features.csv", index=False)
    np.save("synthetic_timeseries_signals.npy", signals)
    np.save("synthetic_timeseries_labels.npy", labels)
    print("\nSaved:")
    print(" - synthetic_timeseries_features.csv")
    print(" - synthetic_timeseries_signals.npy")
    print(" - synthetic_timeseries_labels.npy")

main()
