# SciTeX DSP Module - Digital Signal Processing

The `scitex.dsp` module provides tools for digital signal processing commonly used in scientific research, particularly in neuroscience and time series analysis.

In [None]:
import scitex as stx
import numpy as np
import pandas as pd
from scipy import signal
import matplotlib.pyplot as plt
from scipy.fft import fft, fftfreq

# Set random seed
np.random.seed(42)

# Define sampling parameters
fs = 1000  # Sampling frequency (Hz)
duration = 5  # Duration (seconds)
t = np.linspace(0, duration, fs * duration, endpoint=False)

## 1. Signal Generation and Basic Processing

In [None]:
# Generate complex signal with multiple components
# Component frequencies
freq1, amp1 = 10, 1.0   # 10 Hz oscillation
freq2, amp2 = 25, 0.5   # 25 Hz oscillation
freq3, amp3 = 60, 0.3   # 60 Hz noise

# Generate signal components
signal1 = amp1 * np.sin(2 * np.pi * freq1 * t)
signal2 = amp2 * np.sin(2 * np.pi * freq2 * t)
signal3 = amp3 * np.sin(2 * np.pi * freq3 * t)
noise = 0.2 * np.random.randn(len(t))

# Composite signal
composite_signal = signal1 + signal2 + signal3 + noise

# Visualize signals
fig, axes = stx.plt.subplots(3, 1, figsize=(12, 8))

# Time domain - full signal
ax = axes[0]
ax.plot(t, composite_signal, 'b-', linewidth=0.5)
ax.set_xyt('Time (s)', 'Amplitude', 'Composite Signal')
ax.grid(True, alpha=0.3)

# Time domain - zoomed
ax = axes[1]
zoom_idx = t < 0.5
ax.plot(t[zoom_idx], signal1[zoom_idx], 'r-', label=f'{freq1} Hz', linewidth=2)
ax.plot(t[zoom_idx], signal2[zoom_idx], 'g-', label=f'{freq2} Hz', linewidth=2)
ax.plot(t[zoom_idx], composite_signal[zoom_idx], 'b-', label='Composite', alpha=0.7)
ax.set_xyt('Time (s)', 'Amplitude', 'Signal Components (Zoomed)')
ax.legend()
ax.grid(True, alpha=0.3)

# Frequency domain
ax = axes[2]
freqs = fftfreq(len(t), 1/fs)[:len(t)//2]
fft_vals = np.abs(fft(composite_signal))[:len(t)//2] * 2/len(t)
ax.plot(freqs, fft_vals, 'b-')
ax.set_xyt('Frequency (Hz)', 'Amplitude', 'Frequency Spectrum')
ax.set_xlim(0, 100)
ax.grid(True, alpha=0.3)

# Mark component frequencies
for freq, label in [(freq1, '10 Hz'), (freq2, '25 Hz'), (freq3, '60 Hz')]:
    ax.axvline(freq, color='red', linestyle='--', alpha=0.5)
    ax.text(freq+1, ax.get_ylim()[1]*0.9, label, rotation=90, va='top')

fig.tight_layout()
stx.io.save(fig, './dsp/signal_generation.png')
stx.plt.show()

## 2. Filtering

In [None]:
# Design different types of filters
filter_configs = [
    ('Low-pass (30 Hz)', 'lowpass', 30),
    ('High-pass (5 Hz)', 'highpass', 5),
    ('Band-pass (8-30 Hz)', 'bandpass', [8, 30]),
    ('Band-stop (55-65 Hz)', 'bandstop', [55, 65])
]

fig, axes = stx.plt.subplots(2, 2, figsize=(15, 10))
axes = axes.flatten()

for ax, (name, ftype, freq) in zip(axes, filter_configs):
    # Design filter
    if ftype in ['lowpass', 'highpass']:
        sos = signal.butter(4, freq, ftype, fs=fs, output='sos')
    else:
        sos = signal.butter(4, freq, ftype, fs=fs, output='sos')
    
    # Apply filter
    filtered_signal = signal.sosfiltfilt(sos, composite_signal)
    
    # Plot original and filtered in time domain
    time_window = t < 1  # Show first second
    ax2 = ax.twinx()
    
    # Time domain
    ax.plot(t[time_window], composite_signal[time_window], 'b-', alpha=0.5, label='Original')
    ax.plot(t[time_window], filtered_signal[time_window], 'r-', linewidth=2, label='Filtered')
    ax.set_xlabel('Time (s)')
    ax.set_ylabel('Amplitude', color='b')
    ax.tick_params(axis='y', labelcolor='b')
    ax.set_title(f'{name} Filter', fontsize=12)
    ax.grid(True, alpha=0.3)
    
    # Frequency response
    w, h = signal.sosfreqz(sos, worN=2000, fs=fs)
    ax2.plot(w, 20 * np.log10(abs(h)), 'g--', alpha=0.7, linewidth=1)
    ax2.set_ylabel('Gain (dB)', color='g')
    ax2.tick_params(axis='y', labelcolor='g')
    ax2.set_ylim(-60, 5)
    
    # Add legend
    lines1, labels1 = ax.get_legend_handles_labels()
    ax.legend(lines1, labels1, loc='upper left')

fig.tight_layout()
stx.io.save(fig, './dsp/filtering_examples.png')
stx.plt.show()

## 3. Spectral Analysis

In [None]:
# Different spectral analysis methods
fig, axes = stx.plt.subplots(2, 2, figsize=(15, 10))

# 1. Periodogram
ax = axes[0, 0]
f_perio, Pxx_perio = signal.periodogram(composite_signal, fs)
ax.semilogy(f_perio, Pxx_perio)
ax.set_xyt('Frequency (Hz)', 'PSD', 'Periodogram')
ax.set_xlim(0, 100)
ax.grid(True, alpha=0.3)

# 2. Welch's method
ax = axes[0, 1]
f_welch, Pxx_welch = signal.welch(composite_signal, fs, nperseg=1024)
ax.semilogy(f_welch, Pxx_welch)
ax.set_xyt('Frequency (Hz)', 'PSD', "Welch's Method")
ax.set_xlim(0, 100)
ax.grid(True, alpha=0.3)

# 3. Spectrogram
ax = axes[1, 0]
f_spec, t_spec, Sxx = signal.spectrogram(composite_signal, fs, nperseg=256)
im = ax.pcolormesh(t_spec, f_spec, 10 * np.log10(Sxx), shading='gouraud', cmap='viridis')
ax.set_ylabel('Frequency (Hz)')
ax.set_xlabel('Time (s)')
ax.set_title('Spectrogram')
ax.set_ylim(0, 100)
plt.colorbar(im, ax=ax, label='Power (dB)')

# 4. Multi-taper method (if available)
ax = axes[1, 1]
# Simulate multi-taper with multiple Welch estimates
window_sizes = [256, 512, 1024]
for i, nperseg in enumerate(window_sizes):
    f_mt, Pxx_mt = signal.welch(composite_signal, fs, nperseg=nperseg)
    ax.semilogy(f_mt, Pxx_mt, label=f'Window: {nperseg}', alpha=0.7)
ax.set_xyt('Frequency (Hz)', 'PSD', 'Multiple Window Sizes')
ax.set_xlim(0, 100)
ax.legend()
ax.grid(True, alpha=0.3)

fig.tight_layout()
stx.io.save(fig, './dsp/spectral_analysis.png')
stx.plt.show()

## 4. Time-Frequency Analysis

In [None]:
# Generate chirp signal (frequency changes over time)
t_chirp = np.linspace(0, 2, 2 * fs)
chirp = signal.chirp(t_chirp, f0=5, f1=50, t1=2, method='linear')
chirp += 0.2 * np.random.randn(len(chirp))

fig, axes = stx.plt.subplots(3, 1, figsize=(12, 10))

# Time domain
ax = axes[0]
ax.plot(t_chirp, chirp)
ax.set_xyt('Time (s)', 'Amplitude', 'Chirp Signal (5-50 Hz)')
ax.grid(True, alpha=0.3)

# Short-Time Fourier Transform (STFT)
ax = axes[1]
f_stft, t_stft, Zxx = signal.stft(chirp, fs, nperseg=256)
im = ax.pcolormesh(t_stft, f_stft, np.abs(Zxx), shading='gouraud', cmap='hot')
ax.set_ylabel('Frequency (Hz)')
ax.set_xlabel('Time (s)')
ax.set_title('Short-Time Fourier Transform')
ax.set_ylim(0, 60)
plt.colorbar(im, ax=ax, label='Magnitude')

# Continuous Wavelet Transform
ax = axes[2]
widths = np.arange(1, 128)
cwt_matrix = signal.cwt(chirp, signal.ricker, widths)
# Convert scale to frequency
freqs_cwt = fs / (2 * widths)
im = ax.pcolormesh(t_chirp, freqs_cwt, np.abs(cwt_matrix), shading='gouraud', cmap='hot')
ax.set_ylabel('Frequency (Hz)')
ax.set_xlabel('Time (s)')
ax.set_title('Continuous Wavelet Transform (Ricker Wavelet)')
ax.set_ylim(0, 60)
plt.colorbar(im, ax=ax, label='Magnitude')

fig.tight_layout()
stx.io.save(fig, './dsp/time_frequency_analysis.png')
stx.plt.show()

## 5. Hilbert Transform and Analytic Signal

In [None]:
# Generate amplitude-modulated signal
t_am = np.linspace(0, 1, fs)
carrier_freq = 40  # Hz
modulation_freq = 5  # Hz

# Carrier and modulation
carrier = np.sin(2 * np.pi * carrier_freq * t_am)
modulation = 1 + 0.5 * np.sin(2 * np.pi * modulation_freq * t_am)
am_signal = modulation * carrier

# Compute analytic signal
analytic_signal = signal.hilbert(am_signal)
amplitude_envelope = np.abs(analytic_signal)
instantaneous_phase = np.unwrap(np.angle(analytic_signal))
instantaneous_frequency = np.diff(instantaneous_phase) / (2.0 * np.pi) * fs

fig, axes = stx.plt.subplots(4, 1, figsize=(12, 10))

# Original signal
ax = axes[0]
ax.plot(t_am, am_signal, 'b-', linewidth=0.5)
ax.plot(t_am, modulation, 'r--', linewidth=2, label='Modulation')
ax.set_xyt('Time (s)', 'Amplitude', 'AM Signal')
ax.legend()
ax.grid(True, alpha=0.3)

# Amplitude envelope
ax = axes[1]
ax.plot(t_am, am_signal, 'b-', alpha=0.5, linewidth=0.5)
ax.plot(t_am, amplitude_envelope, 'r-', linewidth=2, label='Envelope')
ax.plot(t_am, -amplitude_envelope, 'r-', linewidth=2)
ax.set_xyt('Time (s)', 'Amplitude', 'Amplitude Envelope')
ax.legend()
ax.grid(True, alpha=0.3)

# Instantaneous phase
ax = axes[2]
ax.plot(t_am, instantaneous_phase)
ax.set_xyt('Time (s)', 'Phase (rad)', 'Instantaneous Phase')
ax.grid(True, alpha=0.3)

# Instantaneous frequency
ax = axes[3]
ax.plot(t_am[:-1], instantaneous_frequency)
ax.axhline(carrier_freq, color='r', linestyle='--', label=f'Carrier: {carrier_freq} Hz')
ax.set_xyt('Time (s)', 'Frequency (Hz)', 'Instantaneous Frequency')
ax.set_ylim(carrier_freq - 10, carrier_freq + 10)
ax.legend()
ax.grid(True, alpha=0.3)

fig.tight_layout()
stx.io.save(fig, './dsp/hilbert_transform.png')
stx.plt.show()

## 6. Phase-Amplitude Coupling (PAC)

In [None]:
# Generate synthetic signal with PAC
t_pac = np.linspace(0, 10, 10 * fs)

# Low frequency phase-providing signal (theta, 6 Hz)
phase_freq = 6
phase_signal = np.sin(2 * np.pi * phase_freq * t_pac)

# High frequency amplitude-modulated signal (gamma, 40 Hz)
amp_freq = 40
# Modulate gamma amplitude by theta phase
modulation_index = 0.5
amplitude_modulation = 1 + modulation_index * np.sin(2 * np.pi * phase_freq * t_pac)
amp_signal = amplitude_modulation * np.sin(2 * np.pi * amp_freq * t_pac)

# Combine signals
pac_signal = phase_signal + 0.5 * amp_signal + 0.1 * np.random.randn(len(t_pac))

# Extract phase and amplitude
# Filter for phase (4-8 Hz)
sos_phase = signal.butter(4, [4, 8], 'bandpass', fs=fs, output='sos')
phase_filtered = signal.sosfiltfilt(sos_phase, pac_signal)
phase = np.angle(signal.hilbert(phase_filtered))

# Filter for amplitude (30-50 Hz)
sos_amp = signal.butter(4, [30, 50], 'bandpass', fs=fs, output='sos')
amp_filtered = signal.sosfiltfilt(sos_amp, pac_signal)
amplitude = np.abs(signal.hilbert(amp_filtered))

# Compute PAC
n_bins = 18
phase_bins = np.linspace(-np.pi, np.pi, n_bins + 1)
phase_centers = (phase_bins[:-1] + phase_bins[1:]) / 2

# Compute mean amplitude per phase bin
amp_by_phase = np.zeros(n_bins)
for i in range(n_bins):
    mask = (phase >= phase_bins[i]) & (phase < phase_bins[i + 1])
    if np.sum(mask) > 0:
        amp_by_phase[i] = np.mean(amplitude[mask])

# Visualization
fig, axes = stx.plt.subplots(3, 2, figsize=(15, 10))

# Time series
ax = axes[0, 0]
time_window = (t_pac >= 1) & (t_pac <= 2)
ax.plot(t_pac[time_window], pac_signal[time_window], 'k-', linewidth=0.5)
ax.set_xyt('Time (s)', 'Amplitude', 'PAC Signal')
ax.grid(True, alpha=0.3)

# Phase signal
ax = axes[0, 1]
ax.plot(t_pac[time_window], phase_filtered[time_window], 'b-')
ax.set_xyt('Time (s)', 'Amplitude', 'Phase Signal (4-8 Hz)')
ax.grid(True, alpha=0.3)

# Amplitude signal
ax = axes[1, 0]
ax.plot(t_pac[time_window], amp_filtered[time_window], 'r-', linewidth=0.5)
ax.plot(t_pac[time_window], amplitude[time_window], 'r-', linewidth=2)
ax.set_xyt('Time (s)', 'Amplitude', 'Amplitude Signal (30-50 Hz)')
ax.grid(True, alpha=0.3)

# Phase-amplitude plot
ax = axes[1, 1]
ax.scatter(phase[::100], amplitude[::100], alpha=0.1, s=1)
ax.set_xyt('Phase (rad)', 'Amplitude', 'Phase-Amplitude Relationship')
ax.grid(True, alpha=0.3)

# Mean amplitude by phase
ax = axes[2, 0]
ax.bar(phase_centers, amp_by_phase, width=phase_bins[1]-phase_bins[0], alpha=0.7)
ax.set_xyt('Phase (rad)', 'Mean Amplitude', 'PAC: Amplitude by Phase')
ax.set_xticks([-np.pi, -np.pi/2, 0, np.pi/2, np.pi])
ax.set_xticklabels(['-π', '-π/2', '0', 'π/2', 'π'])
ax.grid(True, alpha=0.3, axis='y')

# Polar plot
ax = fig.add_subplot(3, 2, 6, projection='polar')
ax.bar(phase_centers, amp_by_phase, width=phase_bins[1]-phase_bins[0], alpha=0.7)
ax.set_title('PAC: Polar Plot', pad=20)

fig.tight_layout()
stx.io.save(fig, './dsp/phase_amplitude_coupling.png')
stx.plt.show()

## 7. Event-Related Analysis

In [None]:
# Simulate event-related data
n_trials = 50
trial_length = 2  # seconds
baseline_length = 0.5  # seconds before event

# Generate trials with event-related response
t_trial = np.linspace(-baseline_length, trial_length - baseline_length, 
                      int(trial_length * fs))
trials = []

for i in range(n_trials):
    # Baseline activity
    baseline = 0.5 * np.random.randn(len(t_trial))
    
    # Event-related response (varies by trial)
    response_time = 0.1 + 0.05 * np.random.randn()  # Variable latency
    response_amp = 2 + 0.5 * np.random.randn()      # Variable amplitude
    
    # Gaussian response
    response = response_amp * np.exp(-(t_trial - response_time)**2 / (2 * 0.05**2))
    
    # Add oscillatory response
    osc_response = 0.5 * np.sin(2 * np.pi * 10 * t_trial) * \
                   np.exp(-(t_trial - 0.2)**2 / (2 * 0.1**2))
    
    trial_signal = baseline + response + osc_response
    trials.append(trial_signal)

trials = np.array(trials)

# Compute ERP (Event-Related Potential)
erp = np.mean(trials, axis=0)
erp_std = np.std(trials, axis=0)

# Time-frequency analysis of trials
freq_bins = np.arange(1, 30, 1)
time_bins = t_trial[::10]  # Downsample time
tf_power = np.zeros((len(freq_bins), len(time_bins)))

for i, freq in enumerate(freq_bins):
    # Filter trials at this frequency
    sos = signal.butter(4, [freq-1, freq+1], 'bandpass', fs=fs, output='sos')
    filtered_trials = signal.sosfiltfilt(sos, trials, axis=1)
    # Compute power
    power = np.abs(signal.hilbert(filtered_trials, axis=1))**2
    # Average across trials and downsample
    tf_power[i, :] = np.mean(power[:, ::10], axis=0)

# Baseline normalize
baseline_idx = time_bins < 0
baseline_power = np.mean(tf_power[:, baseline_idx], axis=1, keepdims=True)
tf_power_norm = 10 * np.log10(tf_power / baseline_power)

# Visualization
fig, axes = stx.plt.subplots(2, 2, figsize=(15, 10))

# Single trials
ax = axes[0, 0]
for i in range(min(10, n_trials)):
    ax.plot(t_trial, trials[i, :], alpha=0.3, linewidth=0.5)
ax.plot(t_trial, erp, 'k-', linewidth=3, label='ERP')
ax.axvline(0, color='red', linestyle='--', alpha=0.5)
ax.set_xyt('Time (s)', 'Amplitude', 'Single Trials and ERP')
ax.legend()
ax.grid(True, alpha=0.3)

# ERP with confidence interval
ax = axes[0, 1]
ax.plot(t_trial, erp, 'b-', linewidth=2)
ax.fill_between(t_trial, erp - 1.96*erp_std/np.sqrt(n_trials), 
                erp + 1.96*erp_std/np.sqrt(n_trials), alpha=0.3)
ax.axvline(0, color='red', linestyle='--', alpha=0.5)
ax.axhline(0, color='black', linestyle='-', alpha=0.3)
ax.set_xyt('Time (s)', 'Amplitude', 'ERP with 95% CI')
ax.grid(True, alpha=0.3)

# Time-frequency representation
ax = axes[1, 0]
im = ax.pcolormesh(time_bins, freq_bins, tf_power_norm, 
                   shading='gouraud', cmap='RdBu_r', vmin=-3, vmax=3)
ax.axvline(0, color='black', linestyle='--', alpha=0.5)
ax.set_xyt('Time (s)', 'Frequency (Hz)', 'Event-Related Spectral Perturbation')
plt.colorbar(im, ax=ax, label='Power (dB)')

# Average power in specific bands
ax = axes[1, 1]
bands = {'Delta (1-4 Hz)': (1, 4), 'Theta (4-8 Hz)': (4, 8), 
         'Alpha (8-12 Hz)': (8, 12), 'Beta (12-25 Hz)': (12, 25)}

for band_name, (f_low, f_high) in bands.items():
    band_idx = (freq_bins >= f_low) & (freq_bins < f_high)
    band_power = np.mean(tf_power_norm[band_idx, :], axis=0)
    ax.plot(time_bins, band_power, linewidth=2, label=band_name)

ax.axvline(0, color='red', linestyle='--', alpha=0.5)
ax.axhline(0, color='black', linestyle='-', alpha=0.3)
ax.set_xyt('Time (s)', 'Power Change (dB)', 'Band Power Dynamics')
ax.legend()
ax.grid(True, alpha=0.3)

fig.tight_layout()
stx.io.save(fig, './dsp/event_related_analysis.png')
stx.plt.show()

## 8. Cross-Correlation and Coherence

In [None]:
# Generate two related signals
t_corr = np.linspace(0, 10, 10 * fs)
lag_samples = int(0.05 * fs)  # 50 ms lag

# Signal 1: Multiple frequency components
sig1 = (np.sin(2 * np.pi * 10 * t_corr) + 
        0.5 * np.sin(2 * np.pi * 25 * t_corr) + 
        0.2 * np.random.randn(len(t_corr)))

# Signal 2: Delayed and partially correlated
sig2 = np.zeros_like(sig1)
sig2[lag_samples:] = 0.8 * sig1[:-lag_samples]  # Delayed version
sig2 += 0.3 * np.sin(2 * np.pi * 15 * t_corr)  # Independent component
sig2 += 0.2 * np.random.randn(len(t_corr))     # Noise

# Compute cross-correlation
correlation = signal.correlate(sig1, sig2, mode='same')
lags = signal.correlation_lags(len(sig1), len(sig2), mode='same')
lag_time = lags / fs

# Find peak correlation
peak_idx = np.argmax(np.abs(correlation))
peak_lag = lag_time[peak_idx]

# Compute coherence
f_coh, Cxy = signal.coherence(sig1, sig2, fs, nperseg=1024)

# Compute cross-spectrum phase
f_csd, Pxy = signal.csd(sig1, sig2, fs, nperseg=1024)
phase_diff = np.angle(Pxy)

# Visualization
fig, axes = stx.plt.subplots(2, 2, figsize=(15, 10))

# Time series
ax = axes[0, 0]
time_window = (t_corr >= 1) & (t_corr <= 2)
ax.plot(t_corr[time_window], sig1[time_window], 'b-', label='Signal 1')
ax.plot(t_corr[time_window], sig2[time_window], 'r-', label='Signal 2')
ax.set_xyt('Time (s)', 'Amplitude', 'Time Series')
ax.legend()
ax.grid(True, alpha=0.3)

# Cross-correlation
ax = axes[0, 1]
ax.plot(lag_time * 1000, correlation / np.max(np.abs(correlation)))
ax.axvline(peak_lag * 1000, color='red', linestyle='--', 
           label=f'Peak lag: {peak_lag*1000:.1f} ms')
ax.set_xyt('Lag (ms)', 'Correlation', 'Cross-Correlation')
ax.set_xlim(-200, 200)
ax.legend()
ax.grid(True, alpha=0.3)

# Coherence
ax = axes[1, 0]
ax.plot(f_coh, Cxy)
ax.axhline(0.5, color='red', linestyle='--', alpha=0.5)
ax.set_xyt('Frequency (Hz)', 'Coherence', 'Frequency Coherence')
ax.set_xlim(0, 50)
ax.set_ylim(0, 1)
ax.grid(True, alpha=0.3)

# Phase difference
ax = axes[1, 1]
ax.plot(f_coh, phase_diff)
ax.set_xyt('Frequency (Hz)', 'Phase Difference (rad)', 'Cross-Spectrum Phase')
ax.set_xlim(0, 50)
ax.set_ylim(-np.pi, np.pi)
ax.set_yticks([-np.pi, -np.pi/2, 0, np.pi/2, np.pi])
ax.set_yticklabels(['-π', '-π/2', '0', 'π/2', 'π'])
ax.grid(True, alpha=0.3)

fig.tight_layout()
stx.io.save(fig, './dsp/cross_correlation_coherence.png')
stx.plt.show()

print(f"Peak cross-correlation at lag: {peak_lag*1000:.1f} ms")
print(f"Expected lag: {lag_samples/fs*1000:.1f} ms")

## 9. Practical DSP Pipeline Example

In [None]:
def dsp_pipeline(signal_data, fs, config):
    """
    Complete DSP pipeline for signal analysis.
    
    Parameters:
    - signal_data: Input signal
    - fs: Sampling frequency
    - config: Analysis configuration
    """
    results = {}
    
    # 1. Preprocessing
    # Remove DC offset
    signal_data = signal_data - np.mean(signal_data)
    
    # Apply notch filter for line noise
    if config['remove_line_noise']:
        for freq in config['line_noise_freqs']:
            b, a = signal.iirnotch(freq, Q=30, fs=fs)
            signal_data = signal.filtfilt(b, a, signal_data)
    
    # 2. Filtering
    if config['filter']['apply']:
        sos = signal.butter(config['filter']['order'], 
                          config['filter']['freqs'], 
                          config['filter']['type'], 
                          fs=fs, output='sos')
        filtered_signal = signal.sosfiltfilt(sos, signal_data)
    else:
        filtered_signal = signal_data
    
    results['filtered_signal'] = filtered_signal
    
    # 3. Spectral Analysis
    f_welch, psd = signal.welch(filtered_signal, fs, 
                               nperseg=config['spectral']['nperseg'])
    results['psd'] = {'frequencies': f_welch, 'power': psd}
    
    # 4. Time-frequency Analysis
    f_spec, t_spec, Sxx = signal.spectrogram(filtered_signal, fs, 
                                            nperseg=config['tf']['nperseg'])
    results['spectrogram'] = {'f': f_spec, 't': t_spec, 'Sxx': Sxx}
    
    # 5. Feature Extraction
    features = {}
    
    # Band powers
    for band_name, (f_low, f_high) in config['bands'].items():
        band_idx = (f_welch >= f_low) & (f_welch < f_high)
        band_power = np.trapz(psd[band_idx], f_welch[band_idx])
        features[f'{band_name}_power'] = band_power
    
    # Peak frequency
    peak_idx = np.argmax(psd)
    features['peak_frequency'] = f_welch[peak_idx]
    features['peak_power'] = psd[peak_idx]
    
    # Signal statistics
    features['rms'] = np.sqrt(np.mean(filtered_signal**2))
    features['variance'] = np.var(filtered_signal)
    features['skewness'] = stats.skew(filtered_signal)
    features['kurtosis'] = stats.kurtosis(filtered_signal)
    
    results['features'] = features
    
    return results

# Configuration
pipeline_config = {
    'remove_line_noise': True,
    'line_noise_freqs': [50, 100],  # 50 Hz and harmonic
    'filter': {
        'apply': True,
        'type': 'bandpass',
        'freqs': [1, 45],
        'order': 4
    },
    'spectral': {
        'nperseg': 1024
    },
    'tf': {
        'nperseg': 256
    },
    'bands': {
        'delta': (1, 4),
        'theta': (4, 8),
        'alpha': (8, 13),
        'beta': (13, 30),
        'gamma': (30, 45)
    }
}

# Apply pipeline
results = dsp_pipeline(composite_signal, fs, pipeline_config)

# Visualize results
fig, axes = stx.plt.subplots(2, 2, figsize=(15, 10))

# Filtered signal
ax = axes[0, 0]
ax.plot(t[:fs], composite_signal[:fs], 'b-', alpha=0.5, label='Original')
ax.plot(t[:fs], results['filtered_signal'][:fs], 'r-', label='Filtered')
ax.set_xyt('Time (s)', 'Amplitude', 'Preprocessing Result')
ax.legend()
ax.grid(True, alpha=0.3)

# PSD
ax = axes[0, 1]
ax.semilogy(results['psd']['frequencies'], results['psd']['power'])
ax.axvline(results['features']['peak_frequency'], color='red', 
           linestyle='--', label=f"Peak: {results['features']['peak_frequency']:.1f} Hz")
ax.set_xyt('Frequency (Hz)', 'PSD', 'Power Spectral Density')
ax.set_xlim(0, 50)
ax.legend()
ax.grid(True, alpha=0.3)

# Band powers
ax = axes[1, 0]
bands = list(pipeline_config['bands'].keys())
powers = [results['features'][f'{band}_power'] for band in bands]
bars = ax.bar(bands, powers, alpha=0.7)
ax.set_xyt('Frequency Band', 'Power', 'Band Powers')
ax.grid(True, alpha=0.3, axis='y')

# Feature summary
ax = axes[1, 1]
ax.axis('off')
feature_text = "Extracted Features:\n\n"
for key, value in results['features'].items():
    if isinstance(value, float):
        feature_text += f"{key}: {value:.3f}\n"
ax.text(0.1, 0.9, feature_text, transform=ax.transAxes, 
        fontsize=10, verticalalignment='top', fontfamily='monospace')

fig.tight_layout()
stx.io.save(fig, './dsp/pipeline_results.png')
stx.plt.show()

# Save results
stx.io.save(results, './dsp/analysis_results.pkl')
print("Pipeline complete! Results saved.")

## Summary

This notebook covered essential DSP techniques using SciTeX:

1. **Signal Generation**: Creating synthetic signals for testing
2. **Filtering**: Low-pass, high-pass, band-pass, and band-stop filters
3. **Spectral Analysis**: FFT, periodogram, Welch's method, spectrograms
4. **Time-Frequency Analysis**: STFT, wavelet transforms
5. **Hilbert Transform**: Envelope detection, instantaneous frequency
6. **Phase-Amplitude Coupling**: Cross-frequency coupling analysis
7. **Event-Related Analysis**: ERP, time-frequency decomposition
8. **Cross-Correlation/Coherence**: Signal relationships
9. **Complete Pipeline**: Practical DSP workflow

### Key DSP Best Practices:

- **Always visualize** your data before and after processing
- **Check filter responses** to ensure desired behavior
- **Use appropriate window sizes** for spectral analysis
- **Consider edge effects** when filtering
- **Validate results** with known signals
- **Document parameters** for reproducibility

In [None]:
# Cleanup
print("DSP analysis complete!")
print("\nFiles created:")
if Path('./dsp').exists():
    for f in sorted(Path('./dsp').glob('*')):
        print(f"  - {f.name}")