In [2]:
from datasets import load_dataset
import torchaudio
import torch
import os
import pandas as pd
from sklearn.model_selection import train_test_split

  from .autonotebook import tqdm as notebook_tqdm


In [3]:
dataset = load_dataset("Jzuluaga/atco2_corpus_1h")["test"].train_test_split(test_size=0.2)

In [4]:
# Config
main_dir = "STT/data"
audio_dir = os.path.join(main_dir, "audio")
ref_dir = os.path.join(main_dir, "ref")
audio_test_dir = os.path.join(main_dir, "audio_test")
ref_test_dir = os.path.join(main_dir, "ref_test")
sample_rate = 16000
test_ratio = 0.2  # Adjust as needed

In [None]:
import os
import torch
import torchaudio
from sklearn.model_selection import train_test_split

# === CONFIG ===
vad_audio_base = "VAD_Input/Audio"
vad_ground_dir = "VAD_Input/Ground"
sample_rate = 16000
test_ratio = 0.2
dev_ratio = 0.1  # portion of train set that goes to DEV

# Ensure directories exist
for d in [os.path.join(vad_audio_base, split) for split in ["TRAIN", "DEV", "TEST"]] + [vad_ground_dir]:
    os.makedirs(d, exist_ok=True)

# Combine dataset entries
if "all" in dataset:
    all_items = list(dataset["all"])
else:
    all_items = list(dataset["train"]) + list(dataset["test"])

# Filter valid items
valid_items = [
    item for item in all_items
    if all(k in item for k in ["id", "audio", "text", "segment_start_time", "segment_end_time"])
]

# Deduplicate by ID
unique_items = {str(item["id"]): item for item in valid_items}

# === SPLITTING ===
all_ids = list(unique_items.keys())
train_ids, test_ids = train_test_split(all_ids, test_size=test_ratio, random_state=42)
train_ids, dev_ids = train_test_split(train_ids, test_size=dev_ratio, random_state=42)

# Ensure disjoint sets
train_set = set(train_ids)
dev_set = set(dev_ids)
test_set = set(test_ids)

overlap = (train_set & dev_set) | (train_set & test_set) | (dev_set & test_set)
if overlap:
    raise ValueError(f"❌ Overlapping IDs across splits: {overlap}")

# === HELPER FUNCTION ===
def save_pair(item, split):
    split_dir = os.path.join(vad_audio_base, split.upper())
    os.makedirs(split_dir, exist_ok=True)

    item_id = str(item["id"])
    audio_path = os.path.join(split_dir, f"{item_id}.wav")
    ref_path = os.path.join(vad_ground_dir, f"{item_id}.txt")

    # Process audio data
    audio_array = item["audio"]["array"]
    
    # Ensure audio data is in the correct range [-1, 1]
    if audio_array.max() > 1.0 or audio_array.min() < -1.0:
        audio_array = audio_array / max(abs(audio_array.max()), abs(audio_array.min()))
    
    # Convert to correct format for torchaudio
    waveform = torch.tensor(audio_array, dtype=torch.float32).unsqueeze(0)
    
    # Validate waveform
    if torch.isnan(waveform).any():
        print(f"Warning: NaN values found in waveform for {item_id}")
        return
    
    if waveform.size(1) == 0:
        print(f"Warning: Empty waveform for {item_id}")
        return

    try:
        # Save audio with explicit format settings
        torchaudio.save(
            audio_path, 
            waveform, 
            sample_rate=sample_rate,
            encoding='PCM_S', 
            bits_per_sample=16
        )
        
        # Verify the saved file
        try:
            verification = torchaudio.load(audio_path)
            if verification[0].size() != waveform.size():
                print(f"Warning: Size mismatch in saved file for {item_id}")
        except Exception as e:
            print(f"Warning: Could not verify saved file for {item_id}: {e}")

        # Save text with STT-compatible format
        with open(ref_path, "w") as f:
            f.write(f"{item['segment_start_time']}\t{item['segment_end_time']}\t{item['text']}\n")
    
    except Exception as e:
        print(f"Error saving {item_id}: {e}")

# === WRITE TO DISK ===
for item_id in train_ids:
    save_pair(unique_items[item_id], "TRAIN")

for item_id in dev_ids:
    save_pair(unique_items[item_id], "DEV")

for item_id in test_ids:
    save_pair(unique_items[item_id], "TEST")

print(f"✅ STT-compatible VAD export complete:")
print(f"  TRAIN: {len(train_ids)}")
print(f"  DEV:   {len(dev_ids)}")
print(f"  TEST:  {len(test_ids)}")


✅ STT-compatible VAD export complete:
  TRAIN: 626
  DEV:   70
  TEST:  175


In [8]:
import os
from pydub import AudioSegment
import soundfile as sf
import numpy as np
from pathlib import Path

def verify_file_matching():
    """
    Verify that all audio files in DEV, TEST, and TRAIN have matching ground truth files.
    Returns dictionaries of matched and unmatched files.
    """
    vad_audio_base = "VAD_Input/Audio"
    vad_ground_dir = "VAD_Input/Ground"
    splits = ["DEV", "TEST", "TRAIN"]
    
    results = {
        'matched': {},
        'unmatched': {}
    }
    
    # Get all ground truth files
    ground_truth_files = set(f.stem for f in Path(vad_ground_dir).glob('*.txt'))
    
    # Check each split
    for split in splits:
        audio_dir = Path(vad_audio_base) / split
        
        # Initialize results for this split
        results['matched'][split] = []
        results['unmatched'][split] = []
        
        # Check each audio file
        for audio_file in audio_dir.glob('*.wav'):
            if audio_file.stem in ground_truth_files:
                results['matched'][split].append(audio_file.name)
            else:
                results['unmatched'][split].append(audio_file.name)
    
    # Print summary
    print("=== File Matching Summary ===")
    for split in splits:
        matched = len(results['matched'][split])
        unmatched = len(results['unmatched'][split])
        total = matched + unmatched
        
        print(f"\n{split}:")
        print(f"  Total files: {total}")
        print(f"  Matched: {matched}")
        print(f"  Unmatched: {unmatched}")
        
        if unmatched > 0:
            print("\n  Unmatched files:")
            for file in results['unmatched'][split]:
                print(f"    - {file}")
    
    return results

# Run the verification
matching_results = verify_file_matching()

=== File Matching Summary ===

DEV:
  Total files: 70
  Matched: 70
  Unmatched: 0

TEST:
  Total files: 175
  Matched: 175
  Unmatched: 0

TRAIN:
  Total files: 626
  Matched: 626
  Unmatched: 0


In [13]:
from pathlib import Path
import random
from pydub import AudioSegment
import soundfile as sf
import numpy as np
import matplotlib.pyplot as plt
from datetime import timedelta

def test_segmentation_random_files(n_files=5):
    """
    Test segmentation on n random files and display results for manual verification.
    """
    # Setup paths
    base_dir = Path("VAD_Input")
    splits = ["DEV", "TEST", "TRAIN"]
    
    # Collect all audio files
    all_audio_files = []
    for split in splits:
        audio_dir = base_dir / "Audio" / split
        all_audio_files.extend(list(audio_dir.glob("*.wav")))
    
    # Randomly select n files
    test_files = random.sample(all_audio_files, min(n_files, len(all_audio_files)))
    
    print(f"=== Testing {len(test_files)} Random Files ===\n")
    
    for audio_file in test_files:
        gt_file = base_dir / "Ground" / f"{audio_file.stem}.txt"
        
        print(f"\nFile: {audio_file.name}")
        print("-" * 80)
        
        # Load audio
        audio = AudioSegment.from_file(str(audio_file))
        duration = len(audio) / 1000.0  # Convert to seconds
        
        # Read ground truth
        segments = []
        with open(gt_file, 'r') as f:
            for line in f:
                start, end, text = line.strip().split('\t')
                segments.append({
                    'start': float(start),
                    'end': float(end),
                    'text': text
                })
        
        # Print timeline
        print(f"Total duration: {timedelta(seconds=duration)}")
        print("\nTimeline:")
        current_time = 0.0
        
        for i, seg in enumerate(segments):
            # If there's a gap before this segment, it's non-speech
            if current_time < seg['start']:
                gap_duration = seg['start'] - current_time
                print(f"  {timedelta(seconds=current_time)} → {timedelta(seconds=seg['start'])} "
                      f"({gap_duration:.2f}s): NON-SPEECH")
            
            # Print speech segment
            print(f"  {timedelta(seconds=seg['start'])} → {timedelta(seconds=seg['end'])} "
                  f"({seg['end']-seg['start']:.2f}s): SPEECH")
            print(f"    Text: {seg['text']}")
            
            current_time = seg['end']
        
        # If there's remaining time after last segment
        if current_time < duration:
            gap_duration = duration - current_time
            print(f"  {timedelta(seconds=current_time)} → {timedelta(seconds=duration)} "
                  f"({gap_duration:.2f}s): NON-SPEECH")
        
        print("\nTo verify this file:")
        print(f"1. Open audio file: {audio_file}")
        print(f"2. Open ground truth: {gt_file}")
        print(f"3. Listen to the audio and check if segments match the timeline above")
        print("-" * 80)

if __name__ == "__main__":
    random.seed(42)  # For reproducibility
    test_segmentation_random_files()

=== Testing 5 Random Files ===


File: atco2_test-set-1h_LSZH_ZURICH_ApronN_121_85MHz_20210414_081643-A__000007-000263.wav
--------------------------------------------------------------------------------
Total duration: 0:00:02.560000

Timeline:
  0:00:00 → 0:00:00.070000 (0.07s): NON-SPEECH
  0:00:00.070000 → 0:00:02.630000 (2.56s): SPEECH
    Text: turkish two seven xray tower one one eight one bye bye

To verify this file:
1. Open audio file: VAD_Input\Audio\TRAIN\atco2_test-set-1h_LSZH_ZURICH_ApronN_121_85MHz_20210414_081643-A__000007-000263.wav
2. Open ground truth: VAD_Input\Ground\atco2_test-set-1h_LSZH_ZURICH_ApronN_121_85MHz_20210414_081643-A__000007-000263.txt
3. Listen to the audio and check if segments match the timeline above
--------------------------------------------------------------------------------

File: atco2_test-set-1h_LSGS_SION_Ground_Control_121_7MHz_20210502_164218-A__000189-000834.wav
--------------------------------------------------------------------------

In [17]:
from pathlib import Path
from pydub import AudioSegment
import soundfile as sf
import numpy as np
from datetime import timedelta

def process_audio_file(audio_file, gt_file, speech_dir, non_speech_dir):
    """
    Process a single audio file and split it into speech/non-speech segments.
    Returns the processed segments and their metadata.
    """
    # Load audio file
    audio = AudioSegment.from_file(str(audio_file))
    total_duration = len(audio) / 1000.0
    
    # Read and sort ground truth segments
    speech_segments = []
    with open(gt_file, 'r') as f:
        for line in f:
            start, end, text = line.strip().split('\t')
            speech_segments.append({
                'start': float(start),
                'end': float(end),
                'text': text
            })
    speech_segments.sort(key=lambda x: x['start'])
    
    # Process speech segments
    combined_speech = AudioSegment.empty()
    speech_metadata = []
    for seg in speech_segments:
        start_ms = int(seg['start'] * 1000)
        end_ms = int(seg['end'] * 1000)
        segment = audio[start_ms:end_ms]
        combined_speech += segment
        speech_metadata.append(seg)
    
    # Save speech segments
    speech_output = speech_dir / f"{audio_file.stem}_speech.wav"
    combined_speech.export(str(speech_output), format="wav", parameters=["-ac", "1"])
    
    # Process non-speech segments
    current_time = 0.0
    combined_non_speech = AudioSegment.empty()
    non_speech_metadata = []
    
    for seg in speech_segments:
        if current_time < seg['start']:
            start_ms = int(current_time * 1000)
            end_ms = int(seg['start'] * 1000)
            non_speech_segment = audio[start_ms:end_ms]
            combined_non_speech += non_speech_segment
            non_speech_metadata.append({
                'start': current_time,
                'end': seg['start']
            })
        current_time = seg['end']
    
    # Handle final non-speech segment
    if current_time < total_duration:
        start_ms = int(current_time * 1000)
        end_ms = int(total_duration * 1000)
        non_speech_segment = audio[start_ms:end_ms]
        combined_non_speech += non_speech_segment
        non_speech_metadata.append({
            'start': current_time,
            'end': total_duration
        })
    
    # Save non-speech segments
    non_speech_output = non_speech_dir / f"{audio_file.stem}_non_speech.wav"
    combined_non_speech.export(str(non_speech_output), format="wav", parameters=["-ac", "1"])
    
    return {
        'total_duration': total_duration,
        'speech_segments': speech_metadata,
        'non_speech_segments': non_speech_metadata,
        'speech_file': speech_output,
        'non_speech_file': non_speech_output
    }

def print_metadata(audio_file, metadata):
    """Print formatted metadata for the processed file."""
    print("\n=== Metadata ===")
    print(f"Original file: {audio_file.name}")
    print(f"Original duration: {timedelta(seconds=metadata['total_duration'])}")
    
    print("\nSpeech segments:")
    for seg in metadata['speech_segments']:
        print(f"[{timedelta(seconds=seg['start'])} → {timedelta(seconds=seg['end'])}] "
              f"Text: {seg['text']}")
    
    print("\nNon-speech segments:")
    for seg in metadata['non_speech_segments']:
        print(f"[{timedelta(seconds=seg['start'])} → {timedelta(seconds=seg['end'])}]")
    
    print(f"\nOutput files:")
    print(f"Speech file: {metadata['speech_file'].name}")
    print(f"Non-speech file: {metadata['non_speech_file'].name}")

def test_single_file_split(audio_path, gt_path, output_dir="test_output"):
    """Test splitting functionality on a single audio/ground truth pair."""
    try:
        # Setup paths
        audio_file = Path(audio_path)
        gt_file = Path(gt_path)
        output_dir = Path(output_dir)
        speech_dir = output_dir / "speech"
        non_speech_dir = output_dir / "non_speech"
        
        # Create output directories
        speech_dir.mkdir(parents=True, exist_ok=True)
        non_speech_dir.mkdir(parents=True, exist_ok=True)
        
        print("\n=== Processing Single File ===")
        print(f"Audio file: {audio_file.name}")
        print(f"Ground truth: {gt_file.name}")
        
        # Process the audio file
        metadata = process_audio_file(audio_file, gt_file, speech_dir, non_speech_dir)
        
        # Print metadata after processing is complete
        print_metadata(audio_file, metadata)
        
        return True
        
    except Exception as e:
        print(f"Error processing file: {str(e)}")
        return False

if __name__ == "__main__":
    # Example usage
    audio_file = "VAD_Input/Audio/DEV/atco2_test-set-1h_LKPR_RUZYNE_Radar_120_520MHz_20201026_145941-G__001153-001308.wav"
    gt_file = "VAD_Input/Ground/atco2_test-set-1h_LKPR_RUZYNE_Radar_120_520MHz_20201026_145941-G__001153-001308.txt"
    
    success = test_single_file_split(audio_file, gt_file)
    if not success:
        print("Processing failed!")


=== Processing Single File ===
Audio file: atco2_test-set-1h_LKPR_RUZYNE_Radar_120_520MHz_20201026_145941-G__001153-001308.wav
Ground truth: atco2_test-set-1h_LKPR_RUZYNE_Radar_120_520MHz_20201026_145941-G__001153-001308.txt
Error processing file: [WinError 2] The system cannot find the file specified
Processing failed!
