## Sensor Data Frequency Analysis Toolkit

- Author: Rui Pestana (NESTER), Pedro Costa (ISEL)
- Project: NESTER
- Date: 15/05/2025
- Copyright © 2025 Rui Pestana (NESTER), Pedro Costa (ISEL) - MIT License

Description:
Performs comprehensive frequency domain analysis of sensor data including:
- Fundamental frequency estimation via autocorrelation
- Harmonic analysis using FFT
- Autocorrelation Function (ACF) visualization

Key Functionalities:
1. Automated fundamental frequency detection
2. Harmonic component identification
3. Spectral leakage reduction through windowing
4. Confidence intervals for ACF analysis
5. Multi-sensor comparative analysis

Inputs:
- CSV file with columns: ["ABS_MU", "ABS_AM1", "ABS_AM2", "ABS_DM1"]
- Time-series data sampled at 15-minute intervals

Outputs:
- Autocorrelation plots with peak markers
- FFT spectra with harmonic annotations
- ACF plots with 95% confidence bands
- Console summary of fundamental frequencies

Parameters:
- Temporal:
  - T = 900s (15-minute sampling interval)
  - Analysis window = full dataset
- Spectral:
  - Padding factor = 8×
  - Nyquist frequency = 0.0005556 Hz (1/(2*900s))
  - Harmonic search range = ±0.00001 Hz
- Statistical:
  - ACF lags = 100
  - Confidence level = 95%

Dependencies:
- pandas >=1.3.0
- numpy >=1.21.0
- matplotlib >=3.5.0
- scipy >=1.9.0
- statsmodels >=0.13.0

Usage:
1. Configure file path and columns
2. Execute script
3. Review generated plots
4. Check console for frequency summary

In [None]:
# Import required libraries
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
from scipy.fft import fft, fftfreq
from scipy.signal import find_peaks
from statsmodels.tsa.stattools import acf
from matplotlib import pyplot as plt  # Redundant import (already imported as plt)

# === DATA LOADING ===
# Configure data source and columns
file_path = r".\Dados\Dados_DL1\all_variables-DL1-rp1_A1P-REF-ABS.csv"
columns = ["ABS_MU", "ABS_AM1", "ABS_AM2", "ABS_DM1"]  # Sensor measurement columns

# Read and preprocess data
df = pd.read_csv(file_path)
# Clean data by removing missing values and resetting index
data = df[columns].dropna().reset_index(drop=True)

# === ANALYSIS PARAMETERS ===
T = 900  # Sampling interval in seconds (15 minutes)
N = len(data)  # Total number of samples
padding_factor = 8  # Zero-padding multiplier for FFT resolution
nyquist_freq = 1 / (2 * T)  # Nyquist frequency in Hz
frequencies = {}  # Dictionary to store fundamental frequencies

# === FUNDAMENTAL FREQUENCY ESTIMATION ===
def estimate_fundamental_frequency(signal, label):
    """Estimate fundamental frequency using autocorrelation peak detection.
    
    Args:
        signal: Input time series data
        label: Sensor name for plot labeling
    
    Returns:
        f0: Estimated fundamental frequency in Hz
    """
    N = len(signal)
    # Signal preprocessing
    signal = signal - np.mean(signal)  # Remove DC offset
    signal = signal * np.hamming(N)    # Apply window function to reduce leakage

    # Compute autocorrelation manually
    autocorr = [np.dot(signal[:-lag], signal[lag:]) for lag in range(1, N // 2)]
    autocorr = np.array(autocorr)

    # Peak detection parameters
    peaks, _ = find_peaks(autocorr, distance=5)  # Minimum 5-sample peak separation
    threshold = 0.5 * np.max(autocorr)  # 50% of max amplitude threshold

    # Find first significant peak above threshold
    for peak in peaks:
        if autocorr[peak] >= threshold:
            lag_peak = peak + 1  # Add 1 to account for 1-based indexing
            break
    else:  # Fallback if no peaks meet threshold
        lag_peak = peaks[0] + 1 if len(peaks) > 0 else np.argmax(autocorr) + 1

    # Calculate fundamental frequency
    f0 = 1 / (lag_peak * T)  # Convert lag to frequency

    # Plot autocorrelation results
    plt.figure(figsize=(10, 4))
    plt.plot(range(1, len(autocorr) + 1), autocorr, label="Autocorrelation (Hamming)")
    plt.axvline(x=lag_peak, color='red', linestyle='--', label=f"Peak lag = {lag_peak}")
    plt.title(f"Autocorrelation – {label}")
    plt.xlabel("Lag (samples)")
    plt.ylabel("Sum of products")
    plt.legend()
    plt.grid(True)
    plt.tight_layout()
    plt.show()

    return f0

# === FREQUENCY SPECTRUM ANALYSIS ===
def plot_fft_with_harmonics(signal, label, f0):
    """Plot FFT spectrum with annotated harmonics of fundamental frequency.
    
    Args:
        signal: Input time series data
        label: Sensor name for plot title
        f0: Fundamental frequency for harmonic detection
    """
    # Zero-padding for improved frequency resolution
    padded_len = padding_factor * len(signal)
    signal_padded = np.pad(signal, (0, padded_len - len(signal)), mode='constant')

    # Compute FFT and normalize amplitude
    fft_result = fft(signal_padded)
    freqs = fftfreq(padded_len, T)[:padded_len // 2]  # Frequency bins
    amplitude = 2.0 / len(signal) * np.abs(fft_result[:padded_len // 2])

    # Create spectrum DataFrame and filter
    spectrum = pd.DataFrame({"Frequency (Hz)": freqs, "Amplitude": amplitude})
    spectrum = spectrum[spectrum["Frequency (Hz)"] > 0]  # Remove DC component
    filtered = spectrum[spectrum["Frequency (Hz)"] <= 0.001].reset_index(drop=True)  # Low-pass filter
    
    # Remove first 3 frequency bins to eliminate low-frequency noise
    filtered = filtered.iloc[3:].reset_index(drop=True)

    # Harmonic detection parameters
    harmonics = []
    delta = 0.00001  # Frequency search window (±0.00001 Hz)
    n = 1  # Harmonic multiplier

    # Detect harmonics up to 0.001 Hz
    while f0 * n <= 0.001:
        f_h = f0 * n  # Expected harmonic frequency
        # Find spectral components near expected harmonic
        window = filtered[(filtered["Frequency (Hz)"] >= f_h - delta) &
                        (filtered["Frequency (Hz)"] <= f_h + delta)]
        
        if not window.empty:
            # Select peak amplitude in search window
            idx_peak = window["Amplitude"].idxmax()
            f_match = filtered.loc[idx_peak, "Frequency (Hz)"]
            a_match = filtered.loc[idx_peak, "Amplitude"]
            harmonics.append((f_match, a_match, n))
        n += 1

    # Plot spectrum with harmonics
    plt.figure(figsize=(10, 5))
    plt.plot(filtered["Frequency (Hz)"], filtered["Amplitude"], 
             label="FFT Spectrum", color='blue')
    
    # Annotate harmonics
    for f, a, n in harmonics:
        color = 'red' if n == 1 else 'orange'
        label_h = "Fundamental" if n == 1 else None
        plt.scatter(f, a, color=color, s=40, label=label_h, zorder=5)
    
    plt.axvline(x=nyquist_freq, color='red', linestyle='--', label="Nyquist Frequency")

    # Set dynamic y-axis limit based on data distribution
    upper_limit = np.percentile(filtered["Amplitude"], 99) * 1.2
    plt.ylim(0, upper_limit)

    plt.title(f"Frequency Spectrum – {label}")
    plt.xlabel("Frequency (Hz)")
    plt.ylabel("Amplitude")
    plt.grid(True)
    plt.legend()
    plt.tight_layout()
    plt.show()

# === AUTOCORRELATION FUNCTION ANALYSIS ===
def plot_acf_series(df, columns, lags=100):
    """Plot Autocorrelation Function (ACF) with confidence intervals.
    
    Args:
        df: DataFrame containing time series data
        columns: List of columns to analyze
        lags: Number of lags to display (default: 100)
    """
    for col in columns:
        fig, ax = plt.subplots(figsize=(12, 6))
        values = df[col].dropna().values  # Clean data
        
        # Compute ACF with 95% confidence intervals
        acf_values, confint = acf(values, nlags=lags, alpha=0.05)
        
        # Create stem plot with confidence interval shading
        ax.stem(range(1, lags + 1), acf_values[1:], 
                linefmt='b-', markerfmt='bo', basefmt='k-')
        ax.fill_between(range(1, lags + 1),
                        confint[1:, 0] - acf_values[1:],
                        confint[1:, 1] - acf_values[1:],
                        color='blue', alpha=0.2, label="95% Confidence Interval")
        
        # Format plot
        ax.axhline(y=0, color='black', linestyle='--')
        ax.set_ylim(-1, 1)  # ACF range
        ax.set_xlim(0, lags + 1)
        ax.set_title(f"Autocorrelation Function (ACF) – {col}", fontsize=14)
        ax.set_xlabel("Lag (samples)")
        ax.set_ylabel("Autocorrelation")
        ax.grid(True, linestyle=':', linewidth=0.7)
        ax.legend()
        plt.tight_layout()
        plt.show()

# === MAIN PROCESSING PIPELINE ===
# Analyze each sensor channel
for col in columns:
    signal = data[col].to_numpy()
    print(f"\n🔍 Processing: {col}")
    
    # Fundamental frequency estimation
    f0 = estimate_fundamental_frequency(signal, col)
    frequencies[col] = f0
    
    # Spectral analysis with harmonic detection
    plot_fft_with_harmonics(signal, col, f0)

# === RESULTS SUMMARY ===
print("\n📋 Estimated fundamental frequencies:")
for meter, f in frequencies.items():
    print(f" - {meter}: {f:.6e} Hz")

# === AUTOCORRELATION ANALYSIS ===
plot_acf_series(data, columns, lags=100)