### Gamma Band (70-100 Hz) Scalogram Generation

# This notebook provides a function to generate a scalogram focused specifically on the
# high-frequency gamma band. It takes a 1D EEG signal, applies CWT, and saves the
# scalogram as an image file.

In [1]:
import os
import numpy as np
import pandas as pd
import pywt
import matplotlib.pyplot as plt
from tqdm.auto import tqdm

In [2]:
# Cell 2
def save_wpt_scalogram(signal, sampling_rate, output_path, wavelet='db4', maxlevel=5):
    """
    Performs Wavelet Packet Decomposition (WPT), 
    creates a scalogram-like image, and saves it.
    """
    try:
        # Wavelet Packet Decomposition
        wp = pywt.WaveletPacket(data=signal, wavelet=wavelet, maxlevel=maxlevel)
        
        # Get all nodes at the chosen level
        nodes = wp.get_level(maxlevel, order='freq')
        freqs = np.linspace(0, sampling_rate/2, len(nodes))  # Approx frequency mapping
        coeff_matrix = np.array([node.data for node in nodes])
        
        # Convert to magnitude (energy-like representation)
        coeff_matrix = np.abs(coeff_matrix)
        
        # Create and save plot
        fig, ax = plt.subplots(figsize=(1.28, 1.28))
        ax.imshow(coeff_matrix, aspect='auto', cmap='viridis', 
                  extent=[0, len(signal), freqs[0], freqs[-1]])
        ax.axis('off')
        plt.savefig(output_path, bbox_inches='tight', pad_inches=0, dpi=100)
        plt.close(fig)

        return {'success': True, 'output_path': output_path}
    
    except Exception as e:
        return {'success': False, 'error': str(e)}

In [3]:
# Cell 3
# --- Step 2: Setup Paths ---
normalized_data_dir = r'D:\\VIT\\IV-Year\\PJT-I\\Speech Imagery Decoding\\Inner_Speech_Dataset\\Dataset\\filtered_data_regex'
scalogram_output_dir = r'D:\\VIT\\IV-Year\\PJT-I\\Speech Imagery Decoding\\Inner_Speech_Dataset\\Dataset\\scalogram'
os.makedirs(scalogram_output_dir, exist_ok=True)

SAMPLING_RATE = 256  # Hz

In [4]:
# Cell 4
# --- Step 3: Collect Files ---
file_metadata_list = []
for root, _, files in os.walk(normalized_data_dir):
    for file in files:
        if file.endswith('.npy'):
            full_path = os.path.join(root, file)
            
            # --- MODIFIED PART ---
            # Extract trial and channel from filename to create the new structure
            base_name = os.path.splitext(file)[0]
            try:
                # Split the filename to separate the trial identifier from the channel
                trial_folder_name, channel_name = base_name.rsplit('_', 1)
                
                # Create the output path: output_dir/trial_folder/channel.png
                output_folder = os.path.join(scalogram_output_dir, trial_folder_name)
                output_filename = channel_name + '.png'
                output_file = os.path.join(output_folder, output_filename)
                
                # Ensure the directory for the trial exists
                os.makedirs(output_folder, exist_ok=True)
                
                file_metadata_list.append((full_path, output_file, SAMPLING_RATE))
            except ValueError:
                # This will happen if the filename doesn't contain an underscore to split on
                print(f"Warning: Could not parse trial and channel from filename: {file}. Skipping this file.")
            # --- END OF MODIFIED PART ---
            
print(f"Found {len(file_metadata_list)} files to process...")

Found 43120 files to process...


In [5]:
# Cell 5
# --- Step 4: Sequential Processing (No multiprocessing) ---
results = []
for file_info in tqdm(file_metadata_list, desc="Scalograms", unit="file"):
    source_path, output_path, sampling_rate = file_info
    try:
        signal = np.load(source_path)
        if signal.ndim != 1:
            signal = signal.flatten()
        result = save_wpt_scalogram(signal, sampling_rate, output_path)
    except Exception as e:
        result = {'success': False, 'error': f"Failed {source_path}: {e}"}
    results.append(result)

Scalograms:   0%|          | 0/43120 [00:00<?, ?file/s]

In [6]:
# Cell 6
# --- Step 5: Collect Metadata ---
processed_metadata = []
success_count = sum(1 for r in results if r['success'])
fail_count = len(results) - success_count

for file_info, result in zip(file_metadata_list, results):
    source_path, output_path, _ = file_info
    row = {'file_path': source_path, 'output_path': output_path}
    row.update(result)
    processed_metadata.append(row)

processed_df = pd.DataFrame(processed_metadata)

# --- Step 6: Report ---
print("\n=== PROCESSING COMPLETE ===")
print(f"✓ Successfully processed: {success_count} files")
print(f"✗ Failed: {fail_count} files")
if (success_count + fail_count) > 0:
    print(f"Success rate: {(success_count / (success_count + fail_count)) * 100:.1f}%")


=== PROCESSING COMPLETE ===
✓ Successfully processed: 43120 files
✗ Failed: 0 files
Success rate: 100.0%
