Este agarra los audio segments de audio_segments/, los procesa, y carga los spectrograms finales a specs/ como archivos .npy

In [None]:
import os, sys
import pandas as pd
import numpy as np
import torch

sys.path.append(os.path.abspath(os.path.join(os.getcwd(), '..')))
from utils.data_processing import reduce_noise_seg, create_single_spectrogram_npy, save_test_audio, plot_summary, load_audio_segments_from_disk, clean_dir

In [None]:
def create_spectrograms_from_segments_npy(segments, spectrogram_dir, output_csv_path, 
                                        test_audios_dir=None, mels=224, hoplen=512, 
                                        nfft=2048, noise_reduce=False):
    """
    Create spectrograms from extracted audio segments and save as .npy files.
    
    Args:
        segments (list): List of segment dictionaries from extract_audio_segments
        spectrogram_dir (str): Directory to save spectrogram .npy files
        output_csv_path (str): Path to save the output CSV
        test_audios_dir (str, optional): Directory to save test audio samples
        mels (int): Number of mel bands for spectrogram
        hoplen (int): Hop length for spectrogram
        nfft (int): FFT window size
        noise_reduce (bool): Whether to apply noise reduction
        
    Returns:
        pd.DataFrame: DataFrame with spectrogram metadata
    """
    print(f"Creating spectrograms from {len(segments)} segments...")
    
    os.makedirs(spectrogram_dir, exist_ok=True)
    if test_audios_dir:
        os.makedirs(test_audios_dir, exist_ok=True)
    
    spectrogram_records = []
    saved_test_audios = 0
    skipped_count = 0
    
    for i, segment_info in enumerate(segments):
        # Apply noise reduction if requested
        if noise_reduce:
            segment_info['audio_data'] = reduce_noise_seg(
                segment_info['audio_data'], 
                srate=segment_info['sr'], 
                filename=segment_info['original_filename'], 
                class_id=segment_info['class_id']
            )
        
        # Create spectrogram as .npy file
        record = create_single_spectrogram_npy(segment_info, spectrogram_dir, mels, hoplen, nfft)
        
        if record is not None:
            # Save test audio if requested (first 10 only)
            if test_audios_dir and saved_test_audios < 10:
                save_test_audio(segment_info, test_audios_dir)
                saved_test_audios += 1
            
            spectrogram_records.append(record)
        else:
            skipped_count += 1
    
    # Create and save final DataFrame
    final_df = pd.DataFrame(spectrogram_records)
    final_df.to_csv(output_csv_path, index=False)
    
    # Print summary
    print(f"Spectrogram generation summary:")
    print(f"  Total processed: {len(segments)}")
    print(f"  Successfully created: {len(spectrogram_records)}")
    print(f"  Skipped due to errors: {skipped_count}")
    
    plot_summary(final_df, output_csv_path)
    return final_df

In [None]:
# Define Paths
# Input: Audio segments from AudioExtracting notebook
segments_dir = os.path.join('..', 'database', 'audio_segments')
segments_csv_path = os.path.join('..', 'database', 'meta', 'audio_segments.csv')

# Output: Spectrograms and metadata - using specs/ directory for .npy files
specs_dir = os.path.join('..', 'database', 'specs')
output_csv = os.path.join('..', 'database', 'meta', 'final_specs.csv')
test_audios_dir = os.path.join('..', 'database', 'test_audios')

In [None]:
# Load the segments CSV and plot distribution of samples per class
segments_df = pd.read_csv(segments_csv_path)

# Count samples per class
class_counts = segments_df['class_id'].value_counts().sort_index()

# Create the plot
import matplotlib.pyplot as plt

plt.figure(figsize=(12, 6))
plt.bar(class_counts.index, class_counts.values)
plt.xlabel('Class ID')
plt.ylabel('Number of Samples')
plt.title('Distribution of Audio Segments per Class')
plt.xticks(class_counts.index)
plt.grid(True, alpha=0.3)

# Add value labels on top of bars
for i, v in enumerate(class_counts.values):
    plt.text(class_counts.index[i], v + 1, str(v), ha='center', va='bottom', fontsize=8)

plt.tight_layout()
plt.show()

print(f"Total classes: {len(class_counts)}")
print(f"Total segments: {len(segments_df)}")
print(f"Average segments per class: {len(segments_df) / len(class_counts):.1f}")

In [None]:
# Preview files in segments_dir
print(f"Checking directory: {segments_dir}")
if os.path.exists(segments_dir):
    files = os.listdir(segments_dir)
    print(f"Total files in segments_dir: {len(files)}")
    print("\nFirst 10 files:")
    for file in files[:10]:
        print(f"  {file}")
    if len(files) > 10:
        print("  ...")
else:
    print("Directory does not exist!")

In [None]:
print("Loading audio segments from disk...")
segments = load_audio_segments_from_disk(segments_csv_path, segments_dir, sr=32000)

if not segments:
    print("No segments loaded! Make sure AudioExtracting notebook has been run first.")
else:
    print(f"Loaded {len(segments)} segments from disk")
    print(f"First segment keys: {list(segments[0].keys())}")

In [None]:
clean_dir(specs_dir)
clean_dir(test_audios_dir)

print("Creating spectrograms from loaded segments as .npy files...")
specs = create_spectrograms_from_segments_npy(
    segments, specs_dir, output_csv, 
    test_audios_dir=test_audios_dir, 
    mels=224, hoplen=512, nfft=2048, 
    noise_reduce=False
)

print("Spectrogram generation complete!")

Aca veo los tamaños de los archivos .npy

In [None]:
import random
amount = 20

# List all .npy files in specs_dir
spec_files = [f for f in os.listdir(specs_dir) if f.endswith('.npy')]

# Fetch random .npy files
random_files = random.sample(spec_files, min(amount, len(spec_files)))

for fname in random_files:
    spec_path = os.path.join(specs_dir, fname)
    spec_array = np.load(spec_path)
    print(f"{fname}: {spec_array.shape}, dtype: {spec_array.dtype}, range: [{spec_array.min():.3f}, {spec_array.max():.3f}]")

In [None]:
# Test SpecAugment on generated spectrograms (.npy format)
from utils.specaugment import SpecAugment, get_recommended_params, visualize_specaugment, test_specaugment_on_random_spec

num_specs = len(spec_files)
print(f"Total spectrograms available: {num_specs}")

num_classes = len(class_counts)
print(f"Total classes available: {num_classes}")

samples_per_class = num_specs / num_classes
print(f"Average samples per class: {samples_per_class:.1f}")

# Get recommended parameters for your dataset
recommended_params = get_recommended_params(
    num_samples=num_specs,
    num_classes=num_classes,
    input_size=(224, 313)  # height, width
)

print("Recommended SpecAugment parameters for your dataset:")
for key, value in recommended_params.items():
    print(f"  {key}: {value}")

In [None]:
# Test SpecAugment on a random spectrogram
print("Testing SpecAugment on random spectrogram...")
test_specaugment_on_random_spec(shape=(224, 313), **recommended_params)

In [None]:
# Test SpecAugment on actual generated spectrograms (.npy format)
if len(specs) > 0:
    print(f"\nTesting SpecAugment on actual spectrograms...")
    
    # Load a few actual spectrograms for testing
    test_files = [f for f in os.listdir(specs_dir) if f.endswith('.npy')][:3]
    
    for i, filename in enumerate(test_files):
        print(f"\nTesting on {filename}")
        
        # Load spectrogram from .npy file
        spec_path = os.path.join(specs_dir, filename)
        spec_array = np.load(spec_path)  # Already normalized to 0-1
        spec_tensor = torch.tensor(spec_array, dtype=torch.float32)
        
        # Apply SpecAugment
        augmenter = SpecAugment(**recommended_params)
        augmented_spec = augmenter(spec_tensor)
        
        # Visualize
        visualize_specaugment(
            spec_tensor, 
            augmented_spec, 
            title=f"SpecAugment Test - {filename}"
        )
        
        if i >= 2:  # Limit to 3 examples
            break
else:
    print("No spectrograms available for testing. Run spectrogram generation first.")