source /workspace/tts_env/bin/activate


In [5]:
import torch

## TIME BASED FRAGMENTATION

#### German language IPA logic

In [2]:
import re

def german_to_ipa(text):
    ipa_mapping = {
        "sch": "ʃ", "ch": "x", "z": "ts", "j": "j", "r": "ʁ", "ng": "ŋ",
        "au": "aʊ̯", "ei": "aɪ̯", "eu": "ɔʏ̯", "äu": "ɔʏ̯", "sp": "ʃp", "st": "ʃt",
        "ä": "ɛ", "ö": "œ", "ü": "ʏ", "ß": "s", "ph": "f", "qu": "kv"
    }

    words = text.lower().split()
    ipa_words = []

    for word in words:
        ipa_word = word
        for key, val in ipa_mapping.items():
            ipa_word = ipa_word.replace(key, val)

        # Insert syllable boundaries and primary stress for the first syllable
        ipa_word = re.sub(r"([aeiouäöüɪɛœʏ])", r"\1.", ipa_word)  # syllables after vowels
        ipa_word = re.sub(r"\.$", "", ipa_word)  # remove trailing syllable
        ipa_word = "ˈ" + ipa_word  # add primary stress

        ipa_words.append(ipa_word)

    return " ".join(ipa_words)

# Test the function
sentences = [
    "Wollen wir das Ganze einfach mal ausführen",
    "Das ist ein Test",
    "Können wir den Text ins IPA umwandeln"
]

for sentence in sentences:
    ipa = german_to_ipa(sentence.lower())
    print(f"{sentence} → {ipa}")


Wollen wir das Ganze einfach mal ausführen → ˈwo.lle.n ˈwi.ʁ ˈda.s ˈga.ntse ˈa.ɪ.̯nfa.x ˈma.l ˈa.ʊ̯sfʏ.hʁe.n
Das ist ein Test → ˈda.s ˈi.ʃt ˈa.ɪ.̯n ˈte.ʃt
Können wir den Text ins IPA umwandeln → ˈkœ.nne.n ˈwi.ʁ ˈde.n ˈte.xt ˈi.ns ˈi.pa ˈu.mwa.nde.ln


#### converts second column to IPA format

In [3]:
import csv

def convert_metadata_to_ipa(input_file='audio_dataset/metadata.csv', output_file='audio_dataset/metadata_ipa.csv'):
    """Converts the second column of metadata.csv to IPA and saves it"""
    with open(input_file, 'r', encoding='utf-8') as infile, open(output_file, 'w', encoding='utf-8', newline='') as outfile:
        reader = csv.reader(infile, delimiter='|')
        writer = csv.writer(outfile, delimiter='|')

        for row in reader:
            if len(row) < 3:
                continue

            original_text = row[1]
            ipa_text = german_to_ipa(original_text)

            new_row = [row[0], f"/{ipa_text}/", row[2]]
            writer.writerow(new_row)

    print(f"✅ IPA conversion completed. Saved to {output_file}")


# Run the conversion
convert_metadata_to_ipa()


✅ IPA conversion completed. Saved to audio_dataset_2/metadata_ipa.csv


In [6]:
import torch
torch.cuda.empty_cache()
torch.cuda.reset_peak_memory_stats()
torch.cuda.synchronize()

#### check audio speed

In [12]:
import librosa

file_path = "org.wav"
y, sr = librosa.load(file_path, sr=None)  # sr=None ensures original sample rate is loaded
print(f"Sample rate (Hz): {sr}")


Sample rate (Hz): 44100


#### Segment mp4 as 4, 7, 10 mixed seconds lengths audio wav files

In [4]:
import os
import librosa
import soundfile as sf

input_audio = "cleaned_audio.wav"
output_dir = "audio_dataset/wavs"
os.makedirs(output_dir, exist_ok=True)

def segment_audio(input_audio, output_dir, segment_durations=[4, 6, 8]):
    print("🎯 Segmenting audio into chunks...")
    y, sr = librosa.load(input_audio, sr=22050)
    total_duration = librosa.get_duration(y=y, sr=sr)

    current_position = 0
    segment_count = 0

    while current_position < total_duration:
        duration = segment_durations[segment_count % len(segment_durations)]
        end_position = min(current_position + duration, total_duration)

        # Extract chunk
        chunk = y[int(current_position * sr):int(end_position * sr)]
        file_path = os.path.join(output_dir, f"segment_{segment_count:03d}.wav")

        # Save chunk
        sf.write(file_path, chunk, sr)
        print(f"✅ Saved: {file_path} ({duration}s)")
        current_position = end_position
        segment_count += 1

    print("🎯 Audio segmentation completed!")

segment_audio(input_audio, output_dir)


🎯 Segmenting audio into chunks...
✅ Saved: audio_dataset/wavs/segment_000.wav (4s)
✅ Saved: audio_dataset/wavs/segment_001.wav (6s)
✅ Saved: audio_dataset/wavs/segment_002.wav (8s)
✅ Saved: audio_dataset/wavs/segment_003.wav (4s)
✅ Saved: audio_dataset/wavs/segment_004.wav (6s)
✅ Saved: audio_dataset/wavs/segment_005.wav (8s)
✅ Saved: audio_dataset/wavs/segment_006.wav (4s)
✅ Saved: audio_dataset/wavs/segment_007.wav (6s)
✅ Saved: audio_dataset/wavs/segment_008.wav (8s)
✅ Saved: audio_dataset/wavs/segment_009.wav (4s)
✅ Saved: audio_dataset/wavs/segment_010.wav (6s)
✅ Saved: audio_dataset/wavs/segment_011.wav (8s)
✅ Saved: audio_dataset/wavs/segment_012.wav (4s)
✅ Saved: audio_dataset/wavs/segment_013.wav (6s)
✅ Saved: audio_dataset/wavs/segment_014.wav (8s)
✅ Saved: audio_dataset/wavs/segment_015.wav (4s)
✅ Saved: audio_dataset/wavs/segment_016.wav (6s)
✅ Saved: audio_dataset/wavs/segment_017.wav (8s)
✅ Saved: audio_dataset/wavs/segment_018.wav (4s)
✅ Saved: audio_dataset/wavs/segment

#### Transcribe wavs with whisper and save as metadata.csv

In [1]:
import os
import whisper

model = whisper.load_model("medium", download_root="/home/ahmet/.cache/whisper")
input_dir = "audio_dataset/wavs"
metadata_path = "audio_dataset/metadata.csv"

with open(metadata_path, 'w', encoding='utf-8') as f_meta:
    for file in sorted(os.listdir(input_dir)):
        if file.endswith(".wav"):
            file_path = os.path.join(input_dir, file)
            print(f"🎙️ Transcribing {file}...")

            try:
                result = model.transcribe(file_path)
                text = result['text'].strip()
                f_meta.write(f"{file}|{text}|{text}\n")
                print(f"✅ {file}: {text[:50]}...")
            except Exception as e:
                print(f"⚠️ Failed to transcribe {file}: {e}")

print("🎯 Transcription completed!")


  checkpoint = torch.load(fp, map_location=device)


🎙️ Transcribing segment_000.wav...
✅ segment_000.wav: Hallo und herzlich willkommen zu diesem React.js C...
🎙️ Transcribing segment_001.wav...
✅ segment_001.wav: JavaScript Library React bekommen wolltest, bist d...
🎙️ Transcribing segment_002.wav...
✅ segment_002.wav: die kriegen wir das beliebte CSS Framework Tailwin...
🎙️ Transcribing segment_003.wav...
✅ segment_003.wav: nutzen kann. Es handelt sich nämlich um eine sogen...
🎙️ Transcribing segment_004.wav...
✅ segment_004.wav: Elemente anlegen, lesen, updaten und löschen könne...
🎙️ Transcribing segment_005.wav...
✅ segment_005.wav: Tudolist, in der wir keine Tudus haben. Wir können...
🎙️ Transcribing segment_006.wav...
✅ segment_006.wav: eins davon ist eben offen und was können wir jetzt...
🎙️ Transcribing segment_007.wav...
✅ segment_007.wav: Wort machen, das ist ein weiteres To Do und jetzt ...
🎙️ Transcribing segment_008.wav...
✅ segment_008.wav: können diese abhaken dann sehen wir wir haben jetz...
🎙️ Transcribing segment_009.

In [2]:
!wget https://opendata.iisys.de/opendata/Datasets/HUI-Auido-Corpus-German/dataset_clean/others_Clean.zip

--2025-02-15 17:25:19--  https://opendata.iisys.de/opendata/Datasets/HUI-Auido-Corpus-German/dataset_clean/others_Clean.zip
Resolving opendata.iisys.de (opendata.iisys.de)... 194.95.60.121
Connecting to opendata.iisys.de (opendata.iisys.de)|194.95.60.121|:443... connected.
HTTP request sent, awaiting response... 404 Not Found
2025-02-15 17:25:19 ERROR 404: Not Found.



## WHISPER TIMESTAMPS BASED FRAGMENTATION

In [16]:
import json

def dynamic_audio_segmentation(input_json, char_limit=120):
    """
    Dynamically segments audio based on character counts from Whisper JSON.
    
    Logic:
    1. Iterate through each segment.
    2. Accumulate character counts until exceeding `char_limit`.
    3. When exceeded, finalize the previous segment and start a new one.
    4. Output JSON segments with accurate start and end times.

    Args:
        input_json (str): Path to Whisper JSON file.
        char_limit (int): Max character limit for each segment.

    Returns:
        list: Segmented JSON ready for audio processing.
    """
    with open(input_json, 'r') as f:
        data = json.load(f)

    segments = []
    current_segment = []
    current_length = 0
    start_time = data[0]["start_time"]

    for item in data:
        text_length = len(item["text"])
        if current_length + text_length <= char_limit:
            current_segment.append(item)
            current_length += text_length
        else:
            # Save the current segment
            end_time = current_segment[-1]["end_time"]
            segments.append({
                "start_time": start_time,
                "end_time": end_time,
                "text": "".join(seg["text"] for seg in current_segment)
            })
            # Start a new segment
            current_segment = [item]
            current_length = text_length
            start_time = item["start_time"]

    # Add the last segment
    if current_segment:
        end_time = current_segment[-1]["end_time"]
        segments.append({
            "start_time": start_time,
            "end_time": end_time,
            "text": "".join(seg["text"] for seg in current_segment)
        })

    # Save the output JSON
    with open("segmented_output_2.json", "w") as f:
        json.dump(segments, f, indent=4, ensure_ascii=False)

    print(f"✅ Segmentation completed! Output saved to segmented_output.json")

# Usage
dynamic_audio_segmentation("subtitles-original.json", char_limit=150)


✅ Segmentation completed! Output saved to segmented_output.json


In [10]:
import os
import json
import librosa
import soundfile as sf

# Load the segmented JSON file with char lengths
with open('segmented_output.json', 'r') as f:
    segments = json.load(f)

# Load original audio
input_audio = 'org.wav'
audio, sr = librosa.load(input_audio, sr=22050)

output_dir = 'audio_dataset/wavs'
os.makedirs(output_dir, exist_ok=True)

metadata = []

for idx, segment in enumerate(segments):
    start_sample = int((segment['start_time'] / 1000.0) * sr)
    end_sample = int((segment['end_time'] / 1000.0) * sr)

    chunk = audio[start_sample:end_sample]

    output_file = f'segment_{idx:03d}.wav'
    sf.write(os.path.join(output_dir, output_file), chunk, sr)

    metadata.append(f"{output_file}|{segment['text']}|{segment['text']}")

# Write metadata.csv
with open('audio_dataset/metadata.csv', 'w') as f:
    f.write("\n".join(metadata))

print("🎯 Audio segments created and metadata.csv generated!")


🎯 Audio segments created and metadata.csv generated!


#### remove .wav extensions

In [11]:
import csv

# Paths
input_csv = "audio_dataset/metadata.csv"
output_csv = "audio_dataset/metadata_fixed.csv"

# Fix .wav extensions
with open(input_csv, "r", encoding="utf-8") as infile, open(output_csv, "w", encoding="utf-8", newline="") as outfile:
    reader = csv.reader(infile, delimiter="|")
    writer = csv.writer(outfile, delimiter="|")

    for row in reader:
        # Remove '.wav' from first column if it exists
        filename = row[0].replace(".wav", "")
        writer.writerow([filename, row[1], row[2]])

print("✅ Fixed metadata.csv! Saved as metadata_fixed.csv")

# Optional: Replace the old file with the new one
import os
os.replace(output_csv, input_csv)

✅ Fixed metadata.csv! Saved as metadata_fixed.csv


#### Remove silence from audios

In [15]:
import librosa
import soundfile as sf
import webrtcvad
import numpy as np

# Function to remove silence using WebRTC VAD
def remove_silence_with_vad(input_audio, output_audio, frame_duration_ms=30):
    # Load audio at 16kHz for VAD
    audio, sr = librosa.load(input_audio, sr=16000)

    vad = webrtcvad.Vad(3)  # Aggressive mode
    frame_size = int(sr * frame_duration_ms / 1000)  # Frame size in samples

    # Convert audio to 16-bit PCM
    pcm_audio = (audio * 32767).astype(np.int16).tobytes()

    voiced_frames = []
    for i in range(0, len(pcm_audio), 2 * frame_size):
        frame = pcm_audio[i:i + 2 * frame_size]
        if len(frame) == 2 * frame_size and vad.is_speech(frame, sr):
            voiced_frames.append(frame)

    # Combine voiced frames
    voiced_pcm = b''.join(voiced_frames)
    voiced_audio = np.frombuffer(voiced_pcm, dtype=np.int16).astype(np.float32) / 32767

    # Resample to 22050Hz for XTTS
    final_audio = librosa.resample(voiced_audio, orig_sr=16000, target_sr=22050)

    # Save cleaned audio
    sf.write(output_audio, final_audio, 22050)
    print(f"✅ Cleaned audio saved at 22050Hz: {output_audio}")

# Example usage
input_dir = "audio_dataset/wavs"
for file in sorted(os.listdir(input_dir)):
        if file.endswith(".wav"):
            file_path = os.path.join(input_dir, file)
            print(f"🎙️ Transcribing {file}...")
            remove_silence_with_vad(file_path, file_path, frame_duration_ms=30)


🎙️ Transcribing segment_000.wav...
✅ Cleaned audio saved at 22050Hz: audio_dataset/wavs/segment_000.wav
🎙️ Transcribing segment_001.wav...
✅ Cleaned audio saved at 22050Hz: audio_dataset/wavs/segment_001.wav
🎙️ Transcribing segment_002.wav...
✅ Cleaned audio saved at 22050Hz: audio_dataset/wavs/segment_002.wav
🎙️ Transcribing segment_003.wav...
✅ Cleaned audio saved at 22050Hz: audio_dataset/wavs/segment_003.wav
🎙️ Transcribing segment_004.wav...
✅ Cleaned audio saved at 22050Hz: audio_dataset/wavs/segment_004.wav
🎙️ Transcribing segment_005.wav...
✅ Cleaned audio saved at 22050Hz: audio_dataset/wavs/segment_005.wav
🎙️ Transcribing segment_006.wav...
✅ Cleaned audio saved at 22050Hz: audio_dataset/wavs/segment_006.wav
🎙️ Transcribing segment_007.wav...
✅ Cleaned audio saved at 22050Hz: audio_dataset/wavs/segment_007.wav
🎙️ Transcribing segment_008.wav...
✅ Cleaned audio saved at 22050Hz: audio_dataset/wavs/segment_008.wav
🎙️ Transcribing segment_009.wav...
✅ Cleaned audio saved at 2205

In [1]:
import csv
import re

def clean_metadata(input_csv, output_csv):
    cleaned_rows = []
    with open(input_csv, 'r', encoding='utf-8') as infile:
        reader = csv.reader(infile, delimiter='|')
        for row in reader:
            cleaned_text = re.sub(r'[^\w\s.,!?-]', '', row[1].strip())  # Remove symbols except basic punctuation
            cleaned_transcript = re.sub(r'[^\w\s.,!?-]', '', row[2].strip())
            cleaned_rows.append([row[0], cleaned_text, cleaned_transcript])

    with open(output_csv, 'w', encoding='utf-8') as outfile:
        writer = csv.writer(outfile, delimiter='|')
        writer.writerows(cleaned_rows)

    print("✅ Metadata cleaned successfully!")

# Example usage
clean_metadata('audio_dataset/metadata.csv', 'audio_dataset/cleaned_metadata.csv')


✅ Metadata cleaned successfully!
