In [2]:
# %pip install git+https://github.com/m-bain/whisperx.git

In [None]:
import re

turkish_upper_chars = {"ı": "I", "i": "İ", "ş": "Ş", "ğ": "Ğ", "ü": "Ü", "ö": "Ö", "ç": "Ç"}

turkish_hatted_chars = {
    "â": "a",
    "Â": "A",
    "î": "i",
    "Î": "I",
    "û": "u",
    "Û": "U",
    "ô": "o",  # Bazı metinlerde karşılaşılabiliyor, standartta yer almamakla birlikte eklenmiştir.
    "Ô": "O"
}
turkish_lower_chars = {v: k for k, v in turkish_upper_chars.items()}

def replace_hatted_characters(s):
    for k, v in turkish_hatted_chars.items():
        s = s.replace(k, v)
    return s

def turkish_lower(s):
    return "".join(turkish_lower_chars.get(c, c.lower()) for c in s)

def normalize_text(text: str) -> str:
    """
    Apply normalization to text described in Moonshine: Speech Recognition for Live Transcription and Voice Commands
    https://arxiv.org/html/2410.15608v2

    Handle Turkish specific characters, use helper functions defined earlier.
    """
    text = text.replace(" '", "'").replace("' ", " ").replace("'", "")
    text = replace_hatted_characters(text)
    text = turkish_lower(text)
    text = re.sub(r'[^a-zçğıöşü]', ' ', text).replace("  ", " ")
    return text.strip()

print(normalize_text("âîôû Çok iyi ve nazik biriydi. Prusya'daki ilk karşılaşmamızda onu konuşturmayı başarmıştım. Bana o yaz North Cape’de bulunduğunu ve Nijni Novgorod panayırına gitmeyi çok istediğini anlatmıştı.,;)([-*])"))
print(replace_hatted_characters("âîôû Çok iyi ve nazik biriydi. Prusya'daki ilk karşılaşmamızda onu konuşturmayı başarmıştım. Bana o yaz North Cape’de bulunduğunu ve Nijni Novgorod panayırına gitmeyi çok istediğini anlatmıştı.,;)([-*])"))

In [None]:
import whisperx

# import gc
import json
import time
import random
from pathlib import Path
from evaluate import load


wer_metric = load("wer")
SAMPLE_RATE = 16000  # Sample rate in Hz
NUM_SAMPLES_TO_TRIM = 0  # 4 * SAMPLE_RATE  # Number of samples in 4 seconds
SYNT_LABELS = False
device = "cuda"
batch_size = 1  # reduce if low on GPU mem
# compute_type = "int8"  # change to "int8" if low on GPU mem (may reduce accuracy)
compute_type = "float32"

asr_options = {
    # "temperatures": 0.0,
    "beam_size": 5,
    "condition_on_previous_text": False,
    # "initial_prompt": ""
    "hotwords": None,
    "multilingual": False,
}

# model = whisperx.load_model("N:/models/faster/ysdede/whisper-large-v3-turbo", device, compute_type=compute_type, download_root="n:\\whisperx_models", language="tr", asr_options=asr_options)   # 15.54      0.067
# model = whisperx.load_model("N:/models/faster/ysdede/whisper-base-turkish-0", device, compute_type=compute_type, download_root="n:\\whisperx_models", language="tr", asr_options=asr_options)   # 19.8   0.066              / 27
# model = whisperx.load_model("N:/models/faster/ysdede/whisper-base-turkish-1", device, compute_type=compute_type, download_root="n:\\whisperx_models", language="tr", asr_options=asr_options)   # 15.82       0.058              / 45
# model = whisperx.load_model("N:/models/faster/ysdede/whisper-small-turkish-0", device, compute_type=compute_type, download_root="n:\\whisperx_models", language="tr", asr_options=asr_options)    # 11.9   0.04    / 18
# model = whisperx.load_model("N:/models/faster/ysdede/whisper-tiny-turkish-0", device, compute_type=compute_type, download_root="n:\\whisperx_models", language="tr", asr_options=asr_options)   #   22.37     0.076
model = whisperx.load_model("N:/models/faster/ysdede/whisper-base-turkish-1.1", device, compute_type=compute_type, download_root="n:\\whisperx_models", language="tr", asr_options=asr_options)

# model = whisperx.load_model("Systran/faster-whisper-tiny", device, compute_type=compute_type, download_root="n:\\whisperx_models", language="tr", asr_options=asr_options)                        # 50     0.170
# model = whisperx.load_model("N:/models/faster/Systran/faster-whisper-base", device, compute_type=compute_type, download_root="n:\\whisperx_models", language="tr", asr_options=asr_options)  #
# model = whisperx.load_model("N:/models/faster/Systran/faster-whisper-small", device, compute_type=compute_type, download_root="n:\\whisperx_models", language="tr", asr_options=asr_options)  #

# model = whisperx.load_model(r"N:\models\faster\ysdede\whisper-large-v3-turbo-med-tr-30k", device, compute_type=compute_type, download_root="n:\\whisperx_models", language="tr", asr_options=asr_options)  # Çok iyi. İlk finetune, frozen, drop active, bütün parametreler vardı. Bu ctranslatre2 tokenizer vs json dosyaların ıhatalı oluşturduğu için kötü sonuç veriyormuş meğer.
# model = whisperx.load_model(r"N:\models\faster\ysdede\whisper-large-v3-turbo-med-tr-2-40k", device, compute_type=compute_type, download_root="n:\\whisperx_models", language="tr", asr_options=asr_options)  # iyi, encoder frozen olmalı. İlki yarım kalınca ve inference ı hatalı yapınca buna geçmiştim. Kısaltmaları tam öğrenememiş. Yukarıdaki daha iyi.
# model = whisperx.load_model("N:/models/faster/ysdede/whisper-khanacademy-large-v3-turbo-tr", device, compute_type=compute_type, download_root="n:\\whisperx_models", language="tr", asr_options=asr_options)  # Başarılı, bazı terimler ve kısaltmaları bilmiyor.

In [5]:
def get_largest_files(dataset_dir, n=25, file_type="opus"):
    dataset_dir = Path(dataset_dir)
    opus_files = list(dataset_dir.rglob(f'*.{file_type}'))
    print(len(opus_files))
    sorted_opus_files = sorted(opus_files, key=lambda x: x.stat().st_size, reverse=True)
    top_n_files = sorted_opus_files[:n]
    top_n_files_with_size = [(str(file), file.stat().st_size) for file in top_n_files]

    for file, size in top_n_files_with_size:
        print(f"{file}: {size / 1024:.0f} KB")
    return top_n_files

In [6]:
def get_random_files(dataset_dir, n=25, file_type="opus"):
    seed=42
    dataset_dir = Path(dataset_dir)
    opus_files = list(dataset_dir.rglob(f'*.{file_type}'))
    random.Random(seed).shuffle(opus_files)
    top_n_files = opus_files[:n]
    top_n_files_with_size = [(str(file), file.stat().st_size) for file in top_n_files]

    for file, size in top_n_files_with_size:
        print(f"{file}: {size / 1024:.0f} KB")
    return top_n_files

In [7]:
def transcript(audio_path, normalize=True):
    start = time.time()
    audio = whisperx.load_audio(audio_path)

    num_samples = audio.shape[0]  # Get the number of samples from the shape
    audio_length = round(num_samples / SAMPLE_RATE, 3)

    result = model.transcribe(audio, batch_size=batch_size, print_progress=False, language="tr")
    transcription_time = round(time.time() - start, 2)
    prediction = ""
    for result in result["segments"]:
        prediction += result["text"] + " "

    return prediction.strip(), transcription_time, audio_length


In [8]:
def levenshtein_distance(str1: str, str2: str) -> int:
    """
    Calculate the Levenshtein distance between two strings.
    
    Args:
        str1: First string
        str2: Second string
        
    Returns:
        int: The minimum number of single-character edits needed to change str1 into str2
    """
    # Create a matrix of size (len(str1) + 1) x (len(str2) + 1)
    matrix = [[0 for _ in range(len(str2) + 1)] for _ in range(len(str1) + 1)]
    
    # Initialize first row and column
    for i in range(len(str1) + 1):
        matrix[i][0] = i
    for j in range(len(str2) + 1):
        matrix[0][j] = j
    
    # Fill in the rest of the matrix
    for i in range(1, len(str1) + 1):
        for j in range(1, len(str2) + 1):
            if str1[i-1] == str2[j-1]:
                matrix[i][j] = matrix[i-1][j-1]
            else:
                matrix[i][j] = min(
                    matrix[i-1][j] + 1,    # deletion
                    matrix[i][j-1] + 1,    # insertion
                    matrix[i-1][j-1] + 1   # substitution
                )
    
    return matrix[len(str1)][len(str2)]


In [9]:
def normalized_levenshtein_distance(str1: str, str2: str) -> float:
    """
    Calculate the normalized Levenshtein distance between two strings.
    Returns a value between 0.0 (identical) and 1.0 (completely different).
    
    Args:
        str1: First string
        str2: Second string
        
    Returns:
        float: Normalized distance between 0.0 (identical) and 1.0 (completely different)
    """
    # Get the raw Levenshtein distance
    distance = levenshtein_distance(str1, str2)
    
    # Normalize by the length of the longer string
    max_length = max(len(str1), len(str2))
    
    # Avoid division by zero
    if max_length == 0:
        return 0.0 if len(str1) == len(str2) else 1.0
        
    return round(distance / max_length, 3)

In [None]:
audio_file = r"N:\dataset_v3\khanacademy-tr\Other\Sosyoloji___Khan_Academy\Kulturel_Gecikme_ve_Kultur_oku_Sosyoloji_Toplum_ve_Kultur-[DpEl50Dpw7Y]\chunk_0.mp3"
label = """"Kültürel Gecikme" ya da "Kültürel Boşluk" terimi; kültürün, teknolojik gelişmeleri yakalamasının vakit almasını ve bunun, toplumsal sorunlara yol açmasını tanımlar."""
label = normalize_text(label)
prediction, transcription_time, audio_length = transcript(audio_file, normalize=False)
prediction = normalize_text(prediction)
speed = round(audio_length / transcription_time, 2)
print(prediction, "\n", transcription_time, audio_length, speed)

In [None]:

distance = levenshtein_distance(label, prediction)
print(label)
print(prediction)
print(distance)
normalized_distance = normalized_levenshtein_distance(label, prediction)
print(normalized_distance)


In [None]:
# from ytk.normalizer import fill_dates
# from ytk.utils import turkish_capitalize
from webvtt import read as read_vtt

SYNT_LABELS = False
n = 9999

# INPUT_DIR = r"N:\dataset_v3\YENI_SPLIT_LQ_NOISY"
# INPUT_DIR = r"N:\dataset_v3\YENI_SPLIT"
# INPUT_DIR = r"N:\dataset_v3\commonvoice_17_tr\commonvoice_17_tr_fixed\test"
# INPUT_DIR = r"N:\dataset_v3\tr-med-audio"
# INPUT_DIR = r"N:\dataset_v3\YENI_SPLITTEN ARTAN"
# INPUT_DIR = r"N:\dataset_v3"
INPUT_DIR = "N:\dataset_v4\MediaSpeech_TR"

# file_list = get_random_files(INPUT_DIR, n, file_type="mp3")
file_list = get_random_files(INPUT_DIR, n, file_type="wav")
# file_list = get_largest_files(INPUT_DIR, n, file_type="mp3")
print(len(file_list))

total_wer = 0
avg_wer = 0
processed_files = 0
label_pairs = []
total_dist = 0
avg_dist = 0

def get_text_from_file(file_path):
    """Extract text from either VTT or TXT file."""
    try:
        # Check file extension
        if file_path.suffix.lower() == '.vtt':
            captions = read_vtt(str(file_path))
            transcription = ' '.join(caption.text.replace('\n', ' ') for caption in captions).strip()
        else:  # Assume txt file
            with open(file_path, 'r', encoding='utf-8') as f:
                transcription = f.read().replace('\n', ' ')
        
        return transcription.strip()
    except Exception as e:
        print(f"Error reading file {file_path}: {str(e)}")
        return None

for audio_path in file_list:
    try:
        # Try VTT first, then TXT
        vtt_file = audio_path.with_suffix('.vtt')
        txt_file = audio_path.with_suffix('.txt')
        
        if vtt_file.exists():
            reference = get_text_from_file(vtt_file)
            transcript_file = vtt_file
        elif txt_file.exists():
            reference = get_text_from_file(txt_file)
            transcript_file = txt_file
        else:
            print(f"No transcript file found for {audio_path}")
            continue
            
        if not reference or len(reference) < 5:
            continue


        prediction, transcription_time, audio_length = transcript(audio_path)
        # prediction = prediction.replace(" x ", "x")

        if "No active speech found in audio" in prediction:
            continue

        rf, pre = reference, prediction

        prediction = normalize_text(prediction)
        reference = normalize_text(reference)

        wer = wer_metric.compute(references=[reference], predictions=[prediction])
        lev_dist = normalized_levenshtein_distance(reference, prediction)
        
        total_wer += wer
        processed_files += 1
        avg_wer = total_wer / processed_files
        total_dist += lev_dist
        avg_dist = total_dist / processed_files

        

        label_pairs.append((f"{wer * 100:0.02f}", lev_dist, rf, pre))

        print(rf)
        print(pre)
        print(f"{processed_files}/{len(file_list)} Avg WER: {avg_wer * 100:0.02f}%, WER: {wer * 100:0.02f}%, Avg Dist: {avg_dist:0.03f}, Distance: {lev_dist}, {audio_path} - Duration: {audio_length} sec, time: {transcription_time}, Speed: {transcription_time / audio_length:0.02f}")

    except Exception as e:
        print(f"Error processing {audio_path}: {e}")

with open("labels-base-new.tsv", "w", encoding="utf-8") as lf:
    for wer, lev_dist, r, p in label_pairs:
        lf.write(f"{wer}\t{lev_dist}\t{r}\t{p}\n")



In [12]:
# !pip install webvtt-py