# Preparation steps

## Before Running the notebook

Before running the notebook, install the required packages using the `requirements.txt` file.


To do so, run the `pip install -r requirements.txt` command in the terminal.

In [None]:
import jiwer
#import batchalign
import os
import pathlib
from docx import Document
import re
import pandas as pd
import librosa
import soundfile as sf

from asr_intro_handler import preprocess_transcript_pair
from audio_quality_metrics import get_audio_metrics_dataframe

#pd.set_option("display.max_columns", 50) 
#pd.set_option("display.max_rows", 50)


# Data Preprocessing

## Setting up the Paths

In [None]:
#setting up the paths

path = pathlib.Path().resolve()

'''paths to data for analysis'''

#path to the directory with the Batchalign transcriptions IN POLISH
hypothesis_path_PL = path / "data" / "batchalign" / "PL"

#path to the directory with the Batchalign transcriptions IN ENGLISH
hypothesis_path_ENG = path / "data" / "batchalign" / "ENG"

#path to the directory with the human transcriptions IN POLISH
reference_path_PL = path / "data" / "czyszczone_język_polski" / "czyszczone_język_polski" / "czyszczone_transkrypcje"

#path to the directory with the human transcriptions IN ENGLISH
reference_path_ENG = path / "data" / "czyszczone_język_angielski" / "czyszczone_język_angielski" / "czyszczone_transkrypcje_a"

'''paths to audio files'''

#path to the directory where the copy of the cleaned audio files is saved IN POLISH
used_audio_path_PL = path / "data" /"czyszczone_język_polski" / "czyszczone_język_polski" / "czyszczone_nagrania" 

#path to the directory where the copy of the cleaned audio files is saved IN ENGLISH
used_audio_path_ENG = path / "data" / "czyszczone_język_angielski" / "czyszczone_język_angielski" / "czyszczone_nagrania_a"

'''paths to backup files'''

#path to the directory where the copy of the human transcriptions is saved IN POLISH
reference_path_copy_PL = path / "data_copy" / "czyszczone_język_polski" / "czyszczone_język_polski" / "czyszczone_transkrypcje"

#path to the directory where the copy of the human transcriptions is saved IN ENGLISH
reference_path_copy_ENG = path / "data_copy" / "czyszczone_język_angielski" / "czyszczone_język_angielski" / "czyszczone_transkrypcje_a"

#path to the directory where the copy of the Batchalign transcriptions is saved IN POLISH
hypothesis_path_copy_PL = path / "data_copy" / "batchalign" / "PL"

#path to the directory where the copy of the Batchalign transcriptions is saved IN ENGLISH
hypothesis_path_copy_ENG = path / "data_copy" / "batchalign" / "ENG"

#### Script for resetting the files inside the reference paths

In [None]:
# #ONLY RUN THIS SCRIPT IF YOU WANT TO RESET THE REFERENCE PATH

# # to unncomment the code, select it and press Ctrl + / (or Cmd + / on Mac) 
# # This script will delete all files in the reference_path and copy files from reference_path_copy_PL and reference_path_copy_ENG to reference_path_PL and reference_path_ENG respectively.
# import shutil

# # 1. Delete all files from reference_path_PL
# for file in reference_path_PL.iterdir():
#     if file.is_file():
#         file.unlink()

# # 2. Copy all files from reference_path_copy_PL to reference_path_PL
# for file in reference_path_copy_PL.iterdir():
#     if file.is_file():
#         target = reference_path_PL / file.name
#         shutil.copy2(file, target)
        
# # 3. Delete all files from reference_path_ENG
# for file in reference_path_ENG.iterdir():
#     if file.is_file():
#         file.unlink()   
        
# # 4. Copy all files from reference_path_copy_ENG to reference_path_ENG
# for file in reference_path_copy_ENG.iterdir():
#     if file.is_file():
#         target = reference_path_ENG / file.name
#         shutil.copy2(file, target)

#### Script for resetting the files inside the hypothesis paths

In [None]:
# #ONLY RUN THIS SCRIPT IF YOU WANT TO 'RESET' THE HYPOTHESIS PATH

# # to unncomment the code, select it and press Ctrl + / (or Cmd + / on Mac) 
# # This script will delete all files in the hypothesis_path_PL and hypothesis_path_ENG and copy files from hypothesis_path_copy_PL and hypothesis_path_copy_ENG to hypothesis_path_PL and hypothesis_path_ENG respectively.

# import shutil

# # 1. Delete all files from hypothesis_path_PL
# for file in hypothesis_path_PL.iterdir():
#     if file.is_file():
#         file.unlink()

# # 2. Copy all files from hypothesis_path_copy_PL to hypothesis_path_PL
# for file in hypothesis_path_copy_PL.iterdir():
#     if file.is_file():
#         target = hypothesis_path_PL / file.name
#         shutil.copy2(file, target)

# # 3. Delete all files from hypothesis_path_ENG
# for file in hypothesis_path_ENG.iterdir():
#     if file.is_file():
#         file.unlink()

# # 4. Copy all files from hypothesis_path_copy_ENG to hypothesis_path_ENG
# for file in hypothesis_path_copy_ENG.iterdir():
#     if file.is_file():
#         target = hypothesis_path_ENG / file.name
#         shutil.copy2(file, target)

## Preprocessing the reference paths

### Converting the reference files to .txt

In [None]:
#converting all files in the reference_path to a .txt format

def convert_to_txt(file_path):
    """
    
    !!!!!
    Remember to have the backup of the original files before running this function, as it will delete the original files after conversion
    !!!!! 

    Converts .docx, .doc, or .cha files to .txt format and deletes the original file.

    """
    if file_path.suffix.lower() == '.docx':
        new_file_path = file_path.with_suffix('.txt')
        try:
            doc = Document(file_path)
            content = "\n".join([para.text for para in doc.paragraphs])
            with open(new_file_path, 'w', encoding='utf-8') as f:
                f.write(content)
            os.remove(file_path)
            print(f"Converted and removed: {file_path}")
            return new_file_path
        except Exception as e:
            print(f"Error converting {file_path}: {e}")
            return file_path
        
    elif file_path.suffix.lower() == '.cha':
        new_file_path = file_path.with_suffix('.txt')
        try:
            with open(file_path, 'r', encoding='utf-8') as f_in:
                content = f_in.read()
            with open(new_file_path, 'w', encoding='utf-8') as f_out:
                f_out.write(content)
            os.remove(file_path)
            print(f"Converted and removed: {file_path}")
            return new_file_path
        except Exception as e:
            print(f"Error converting {file_path}: {e}")
            return file_path
        
    elif file_path.suffix.lower() == '.doc':
        new_file_path = file_path.with_suffix('.txt')
        try:
            with open(file_path, 'r', encoding='utf-8') as f_in:
                content = f_in.read()
            with open(new_file_path, 'w', encoding='utf-8') as f_out:
                f_out.write(content)
            os.remove(file_path)
            print(f"Converted and removed: {file_path}")
            return new_file_path
        except Exception as e:
            print(f"Error converting {file_path}: {e}")
            return file_path

#### Converting Polish files in the reference_path_PL

In [None]:
for file in os.listdir(reference_path_PL):
    file_path = reference_path_PL / file
    if file_path.is_file() and file_path.suffix.lower() in ['.docx', '.cha']:
        convert_to_txt(file_path)

#### Converting English files in the reference_path_ENG

In [None]:
for file in os.listdir(reference_path_ENG):
    file_path = reference_path_ENG / file
    if file_path.is_file() and file_path.suffix.lower() in ['.docx', '.cha']:
        convert_to_txt(file_path)

### Cleaning the reference files

In [None]:
def clean_transcript_file(file_path):
    """
    Cleans transcript files by:
    - Removing all lines before the first *CHI: or *EXP:
    - Removing all lines starting with %com: anywhere in the file
    - Removing trailing empty lines and lines like '@End' or similar markers at the end
    Overwrites the file in place.
    """
    with open(file_path, 'r', encoding='utf-8') as f:
        lines = f.readlines()

    # Find the index of the first line starting with *CHI: or *EXP:
    start_idx = next(
        (i for i, line in enumerate(lines) if line.lstrip().startswith("*CHI:") or line.lstrip().startswith("*EXP:")),
        None
    )

    if start_idx is not None:
        # Remove %com: lines after trimming the start
        cleaned_lines = [
            line for line in lines[start_idx:]
            if not line.lstrip().lower().startswith("%com:")
        ]
        # Remove trailing empty lines and lines like '@End'
        while cleaned_lines and (cleaned_lines[-1].strip() == "" or cleaned_lines[-1].strip().lower() == "@end"):
            cleaned_lines.pop()
        with open(file_path, 'w', encoding='utf-8') as f:
            f.writelines(cleaned_lines)
        print(f"Cleaned: {file_path}")
    else:
        print(f"No *CHI: or *EXP: found in {file_path}")

#### Cleaning the Polish reference files in reference_path_PL

In [None]:
for file in os.listdir(reference_path_PL):
    file_path = reference_path_PL / file
    if file_path.is_file() and file_path.suffix.lower() == '.txt':
        clean_transcript_file(file_path)

#### Cleaning the English reference files in reference_path_ENG

In [None]:
for file in os.listdir(reference_path_ENG):
    file_path = reference_path_ENG / file
    if file_path.is_file() and file_path.suffix.lower() == '.txt':
        clean_transcript_file(file_path)

## Preprocessing the hypothesis paths

### Converting the hypothesis files to .txt

In [None]:
def convert_to_txt(file_path):
    """
    
    !!!!!
    Remember to have the backup of the original files before running this function, as it will delete the original files after conversion
    !!!!! 

    Converts .docx, .doc, or .cha files to .txt format and deletes the original file.

    """
    if file_path.suffix.lower() == '.docx':
        new_file_path = file_path.with_suffix('.txt')
        try:
            doc = Document(file_path)
            content = "\n".join([para.text for para in doc.paragraphs])
            with open(new_file_path, 'w', encoding='utf-8') as f:
                f.write(content)
            os.remove(file_path)
            print(f"Converted and removed: {file_path}")
            return new_file_path
        except Exception as e:
            print(f"Error converting {file_path}: {e}")
            return file_path
        
    elif file_path.suffix.lower() == '.cha':
        new_file_path = file_path.with_suffix('.txt')
        try:
            with open(file_path, 'r', encoding='utf-8') as f_in:
                content = f_in.read()
            with open(new_file_path, 'w', encoding='utf-8') as f_out:
                f_out.write(content)
            os.remove(file_path)
            print(f"Converted and removed: {file_path}")
            return new_file_path
        except Exception as e:
            print(f"Error converting {file_path}: {e}")
            return file_path
        
    elif file_path.suffix.lower() == '.doc':
        new_file_path = file_path.with_suffix('.txt')
        try:
            with open(file_path, 'r', encoding='utf-8') as f_in:
                content = f_in.read()
            with open(new_file_path, 'w', encoding='utf-8') as f_out:
                f_out.write(content)
            os.remove(file_path)
            print(f"Converted and removed: {file_path}")
            return new_file_path
        except Exception as e:
            print(f"Error converting {file_path}: {e}")
            return file_path
    return file_path

#### Converting Polish files in the hypothesis_path_PL

In [None]:
for file in os.listdir(hypothesis_path_PL):
    file_path = hypothesis_path_PL / file
    if file_path.is_file() and file_path.suffix.lower() == '.cha':
        convert_to_txt(file_path)

#### Converting English files in the hypothesis_path_ENG

In [None]:
for file in os.listdir(hypothesis_path_ENG):
    file_path = hypothesis_path_ENG / file
    if file_path.is_file() and file_path.suffix.lower() == '.cha':
        convert_to_txt(file_path)

### Cleaning the hypothesis files

In [None]:
def clean_hypothesis_file(file_path):
    """
    Cleans hypothesis files by:
    - Removing all lines before the first line starting with *PAR (e.g., *PAR0:, *PAR1:, etc.)
    - Removing all timestamp patterns ->  XXXX_XXXX
    - Removing the last line if it is '@End' (case-insensitive) and any trailing empty lines
    Overwrites the file in place.
    """
    with open(file_path, 'r', encoding='utf-8') as f:
        lines = f.readlines()

    # Find the index of the first line starting with *PAR
    start_idx = next(
        (i for i, line in enumerate(lines) if line.lstrip().startswith("*PAR")),
        None
    )

    if start_idx is not None:
        cleaned_lines = lines[start_idx:]
        # Remove timestamp patterns
        cleaned_lines = [re.sub(r"\x15\d+_\d+\x15", "", line) for line in cleaned_lines]
        
        
        # Remove trailing lines that are empty or '@End'
        while cleaned_lines and (cleaned_lines[-1].strip() == "" or cleaned_lines[-1].strip().lower() == "@end"):
            cleaned_lines.pop()
        with open(file_path, 'w', encoding='utf-8') as f:
            f.writelines(cleaned_lines)
        print(f"Cleaned: {file_path}")
    else:
        print(f"No *PAR found in {file_path}")

#### Cleaning the Polish files in hypothesis_path_PL

In [None]:
for file in os.listdir(hypothesis_path_PL):
    file_path = hypothesis_path_PL / file
    if file_path.is_file() and file_path.suffix.lower() == '.txt':
        clean_hypothesis_file(file_path)

#### Cleaning the English files in hypothesis_path_ENG

In [None]:
for file in os.listdir(hypothesis_path_ENG):
    file_path = hypothesis_path_ENG / file
    if file_path.is_file() and file_path.suffix.lower() == '.txt':
        clean_hypothesis_file(file_path)

# Data analysis

### Function for removing speaker labels

In [None]:
def remove_speaker_labels(text: str) -> str:
    """
    Remove speaker labels from transcripts.
    
    Removes patterns like:
    - *EXP:, *CHI:, *PAR0:, *PAR1:, *PAR2:, etc.
    - Any *WORD: pattern at the start of lines or after newlines
    """
    # Remove speaker labels: *WORD: or *WORDdigit:
    # Pattern matches: *EXP:, *CHI:, *PAR0:, *PAR1:, etc.
    cleaned = re.sub(r'\*[A-Za-z]+\d*:\s*', '', text)
    
    return cleaned

## Polish Data

### Defining transformations used for Word Error Rate

In [None]:
transforms_PL_WER = jiwer.Compose(
    [
        jiwer.RemoveEmptyStrings(), #removes empty strings
        jiwer.ToLowerCase(), #converts all characters to lowercase
        jiwer.SubstituteRegexes({r"\s+": " "}), # replaces all multipiles, \n, \t, etc with a single space 
        jiwer.Strip(), #removes leading and trailing spaces
        jiwer.RemovePunctuation(), #removes punctuation marks (e.g., ., !, ?, etc.)
        jiwer.ReduceToListOfListOfWords(), #reduces the transcription to a list of lists of words
    ]
)

### Defining transformations used for Character Error Rate

In [None]:
transforms_PL_CER = jiwer.Compose(
    [
        jiwer.RemoveEmptyStrings(), #removes empty strings
        jiwer.ToLowerCase(), #converts all characters to lowercase
        jiwer.SubstituteRegexes({r"\s+": " "}), # replaces all multipiles, \n, \t, etc with a single space 
        jiwer.Strip(), #removes leading and trailing spaces
        jiwer.RemovePunctuation(), #removes punctuation marks (e.g., ., !, ?, etc.)
        jiwer.ReduceToListOfListOfChars(), #reduces the transcription to a list of lists of characters
    ]
)

### Matching the data from reference and hypothesis datasets (Polish)

In [None]:
# List all .txt files in both folders
ref_files_PL = [f for f in os.listdir(reference_path_PL) if f.endswith('.txt')]
hyp_files_PL = [f for f in os.listdir(hypothesis_path_PL) if f.endswith('.txt')]

def extract_key(filename):
    # Match 2 letters, 3 or 4 digits, underscore, NAp (e.g., Ab123_NAp or Ab1234_NAp)
    match = re.match(r"^[A-Za-z]{2}\d{3,4}_NAp", filename)
    if match:
        return match.group(0)
    return filename  # fallback: whole filename if not matched

df_ref_PL = pd.DataFrame({
    'key': [extract_key(f) for f in ref_files_PL],
    'ref_file': ref_files_PL
})
df_hyp_PL = pd.DataFrame({
    'key': [extract_key(f) for f in hyp_files_PL],
    'hyp_file': hyp_files_PL
})

# Merge to get only matching pairs
data_polish = pd.merge(df_ref_PL, df_hyp_PL, on='key')

display(data_polish)

### Adding additional information to the dataframe

In [None]:
#adding string columns to the DataFrame
# Load raw strings
data_polish['ref_string_raw'] = data_polish['ref_file'].apply(lambda x: (reference_path_PL / x).read_text(encoding='utf-8'))
data_polish['hyp_string_raw'] = data_polish['hyp_file'].apply(lambda x: (hypothesis_path_PL / x).read_text(encoding='utf-8'))

# Remove speaker labels (on raw input)
data_polish['ref_string_clean'] = data_polish['ref_string_raw'].apply(remove_speaker_labels)
data_polish['hyp_string_clean'] = data_polish['hyp_string_raw'].apply(remove_speaker_labels)

# Apply intro-handling
processed_refs = []
processed_hyps = []
has_placeholder_list = []
placeholder_text_list = []
intro_extracted_list = []
intro_length_list = []
action_taken_list = []

for ref, hyp in zip(data_polish['ref_string_clean'], data_polish['hyp_string_clean']):
    pr, ph, meta = preprocess_transcript_pair(ref, hyp, method=mode)
    processed_refs.append(pr)
    processed_hyps.append(ph)
    has_placeholder_list.append(meta['has_placeholder'])
    placeholder_text_list.append(" | ".join(meta['placeholders']))
    intro_extracted_list.append(int(meta['intro_extracted']))
    intro_length_list.append(len(meta['intro_text']))
    action_taken_list.append(meta['action_taken'])

data_polish['ref_string'] = processed_refs
data_polish['hyp_string'] = processed_hyps
data_polish['has_placeholder'] = has_placeholder_list
data_polish['placeholder_text'] = placeholder_text_list
data_polish['intro_extracted'] = intro_extracted_list
data_polish['intro_length_chars'] = intro_length_list
data_polish['intro_action'] = action_taken_list

### Applying WER- and CER-appropriate transformations to the data

In [None]:
# Word Error Rate (WER) transformations
data_polish['ref_transformed_WER'] = data_polish['ref_string'].apply(lambda x: " ".join(sum(transforms_PL_WER(x), [])))
data_polish['hyp_transformed_WER'] = data_polish['hyp_string'].apply(lambda x: " ".join(sum(transforms_PL_WER(x), [])))

# Character Error Rate (CER) transformations
data_polish['ref_transformed_CER'] = data_polish['ref_string'].apply(lambda x: " ".join(sum(transforms_PL_CER(x), [])))
data_polish['hyp_transformed_CER'] = data_polish['hyp_string'].apply(lambda x: " ".join(sum(transforms_PL_CER(x), [])))

In [None]:
data_polish

### ***Optional*** Saving the transformed files locally

In [None]:
# # Create directories for transformed files if they don't exist
# transformed_ref_path_PL_WER = path / "transformed_files" / "reference" / "PL" / "WER"
# transformed_hyp_path_PL_WER = path / "transformed_files" / "hypothesis" / "PL" / "WER"
# transformed_ref_path_PL_WER.mkdir(parents=True, exist_ok=True)
# transformed_hyp_path_PL_WER.mkdir(parents=True, exist_ok=True)

# transformed_ref_path_PL_CER = path / "transformed_files" / "reference" / "PL" / "CER"
# transformed_hyp_path_PL_CER = path / "transformed_files" / "hypothesis" / "PL" / "CER"
# transformed_ref_path_PL_CER.mkdir(parents=True, exist_ok=True)
# transformed_hyp_path_PL_CER.mkdir(parents=True, exist_ok=True)


# # Save transformed reference files with WER transformations
# for _, row in data_polish.iterrows():
#     ref_output_path = transformed_ref_path_PL_WER / f"{row['key']}_ref_transformed.txt"
#     with open(ref_output_path, 'w', encoding='utf-8') as f:
#         f.write(row['ref_transformed_WER'])
    
#     # Save transformed hypothesis files
#     hyp_output_path = transformed_hyp_path_PL_WER / f"{row['key']}_hyp_transformed.txt"
#     with open(hyp_output_path, 'w', encoding='utf-8') as f:
#         f.write(row['hyp_transformed_WER'])

# # Save transformed reference files with CER transformations
# for _, row in data_polish.iterrows():
#     ref_output_path = transformed_ref_path_PL_CER / f"{row['key']}_ref_transformed.txt"
#     with open(ref_output_path, 'w', encoding='utf-8') as f:
#         f.write(row['ref_transformed_CER'])
    
#     # Save transformed hypothesis files with CER transformations
#     hyp_output_path = transformed_hyp_path_PL_CER / f"{row['key']}_hyp_transformed.txt"
#     with open(hyp_output_path, 'w', encoding='utf-8') as f:
#         f.write(row['hyp_transformed_CER'])


# print(f"Saved {len(data_polish)} transformed reference files to {transformed_ref_path_PL_WER}")
# print(f"Saved {len(data_polish)} transformed hypothesis files to {transformed_hyp_path_PL_WER}")
# print(f"Saved {len(data_polish)} transformed reference files to {transformed_ref_path_PL_CER}")
# print(f"Saved {len(data_polish)} transformed hypothesis files to {transformed_hyp_path_PL_CER}")

### Calculating WER and CER for each reference-hypothesis pair

In [None]:
# Calculate WER and CER for each pair after transformation
results = []
for _, row in data_polish.iterrows():
    # with open(reference_path / row['ref_file'], encoding='utf-8') as f:
    #     ref_text = f.read()
    #     ref_text = remove_speaker_labels(ref_text)
    # with open(hypothesis_path / row['hyp_file'], encoding='utf-8') as f:
    #     hyp_text = f.read()
    #     hyp_text = remove_speaker_labels(hyp_text)

    ref_text_PL_WER = row['ref_transformed_WER']
    hyp_text_PL_WER = row['hyp_transformed_WER']
    ref_text_PL_CER = row['ref_transformed_CER']
    hyp_text_PL_CER = row['hyp_transformed_CER']

    results.append({
        'key': row['key'],
        'ref_file': row['ref_file'],
        'hyp_file': row['hyp_file'],
        'WER': jiwer.wer(ref_text_PL_WER, hyp_text_PL_WER),
        'CER': jiwer.cer(ref_text_PL_CER, hyp_text_PL_CER),
    })

polish_results = pd.DataFrame(results)
print(polish_results)

### Calculating mean WER and CER for Polish data

In [None]:
#mean WER and CER
mean_WER = polish_results['WER'].mean()
mean_CER = polish_results['CER'].mean()
print(f"Mean WER: {mean_WER:.4f}")
print(f"Mean CER: {mean_CER:.4f}")

## English Data

### Defining transformations used for Word Error Rate

In [None]:
transforms_ENG_WER = jiwer.Compose(
    [
        jiwer.ExpandCommonEnglishContractions(), #expands common English contractions like "don't" to "do not"
        jiwer.RemoveEmptyStrings(), #removes empty strings
        jiwer.ToLowerCase(), #converts all characters to lowercase
        jiwer.SubstituteRegexes({r"\s+": " "}), # replaces all multipiles, \n, \t, etc with a single space 
        jiwer.Strip(), #removes leading and trailing spaces
        jiwer.RemovePunctuation(), #removes punctuation marks (e.g., ., !, ?, etc.)
        jiwer.ReduceToListOfListOfWords(), #reduces the transcription to a list of lists of words
    ]
)

### Defining transformations used for Character Error Rate

In [None]:
#transformation pipeline

transforms_ENG_CER = jiwer.Compose(
    [
        jiwer.ExpandCommonEnglishContractions(), #expands common English contractions like "don't" to "do not"
        jiwer.RemoveEmptyStrings(), #removes empty strings
        jiwer.ToLowerCase(), #converts all characters to lowercase
        jiwer.SubstituteRegexes({r"\s+": " "}), # replaces all multipiles, \n, \t, etc with a single space 
        jiwer.Strip(), #removes leading and trailing spaces
        jiwer.RemovePunctuation(), #removes punctuation marks (e.g., ., !, ?, etc.)
        jiwer.ReduceToListOfListOfChars(), #reduces the transcription to a list of lists of characters
    ]
)

### Matching the data from reference and hypothesis datasets (English)

In [None]:
# List all .txt files in both folders
ref_files_ENG = [f for f in os.listdir(reference_path_ENG) if f.endswith('.txt')]
hyp_files_ENG = [f for f in os.listdir(hypothesis_path_ENG) if f.endswith('.txt')]

def extract_key(filename):
    # Match 2 letters, 3 or 4 digits, underscore, NAe (e.g., Ab123_NAe or Ab1234_NAe)
    match = re.match(r"^[A-Za-z]{2}\d{3,4}_NAe", filename)
    if match:
        return match.group(0)
    return filename  # fallback: whole filename if not matched

df_ref_ENG = pd.DataFrame({
    'key': [extract_key(f) for f in ref_files_ENG],
    'ref_file': ref_files_ENG
})
df_hyp_ENG = pd.DataFrame({
    'key': [extract_key(f) for f in hyp_files_ENG],
    'hyp_file': hyp_files_ENG
})

# Merge to get only matching pairs
data_english = pd.merge(df_ref_ENG, df_hyp_ENG, on='key')

display(data_english)

### Adding additional information to the dataframe

In [None]:
#adding string columns to the DataFrame
# Load raw strings
data_english['ref_string_raw'] = data_english['ref_file'].apply(lambda x: (reference_path_ENG / x).read_text(encoding='utf-8'))
data_english['hyp_string_raw'] = data_english['hyp_file'].apply(lambda x: (hypothesis_path_ENG / x).read_text(encoding='utf-8'))

# Remove speaker labels (on raw input)
data_english['ref_string_clean'] = data_english['ref_string_raw'].apply(remove_speaker_labels)
data_english['hyp_string_clean'] = data_english['hyp_string_raw'].apply(remove_speaker_labels)

# Apply intro-handling
processed_refs = []
processed_hyps = []
has_placeholder_list = []
placeholder_text_list = []
intro_extracted_list = []
intro_length_list = []
action_taken_list = []

for ref, hyp in zip(data_english['ref_string_clean'], data_english['hyp_string_clean']):
    pr, ph, meta = preprocess_transcript_pair(ref, hyp, method=mode)
    processed_refs.append(pr)
    processed_hyps.append(ph)
    has_placeholder_list.append(meta['has_placeholder'])
    placeholder_text_list.append(" | ".join(meta['placeholders']))
    intro_extracted_list.append(int(meta['intro_extracted']))
    intro_length_list.append(len(meta['intro_text']))
    action_taken_list.append(meta['action_taken'])

data_english['ref_string'] = processed_refs
data_english['hyp_string'] = processed_hyps
data_english['has_placeholder'] = has_placeholder_list
data_english['placeholder_text'] = placeholder_text_list
data_english['intro_extracted'] = intro_extracted_list
data_english['intro_length_chars'] = intro_length_list
data_english['intro_action'] = action_taken_list

### Applying WER- and CER-appropriate transformations to the data

In [None]:
# Word Error Rate (WER) transformations
data_english['ref_transformed_WER'] = data_english['ref_string'].apply(lambda x: " ".join(sum(transforms_ENG_WER(x), [])))
data_english['hyp_transformed_WER'] = data_english['hyp_string'].apply(lambda x: " ".join(sum(transforms_ENG_WER(x), [])))

# Character Error Rate (CER) transformations
data_english['ref_transformed_CER'] = data_english['ref_string'].apply(lambda x: " ".join(sum(transforms_ENG_CER(x), [])))
data_english['hyp_transformed_CER'] = data_english['hyp_string'].apply(lambda x: " ".join(sum(transforms_ENG_CER(x), [])))

### ***Optional*** Saving the transformed files locally

In [None]:
# # Create directories for transformed files if they don't exist
# transformed_ref_path_ENG_WER = path / "transformed_files" / "reference" / "ENG" / "WER"
# transformed_hyp_path_ENG_WER = path / "transformed_files" / "hypothesis" / "ENG" / "WER"
# transformed_ref_path_ENG_WER.mkdir(parents=True, exist_ok=True)
# transformed_hyp_path_ENG_WER.mkdir(parents=True, exist_ok=True)

# transformed_ref_path_ENG_CER = path / "transformed_files" / "reference" / "ENG" / "CER"
# transformed_hyp_path_ENG_CER = path / "transformed_files" / "hypothesis" / "ENG" / "CER"
# transformed_ref_path_ENG_CER.mkdir(parents=True, exist_ok=True)
# transformed_hyp_path_ENG_CER.mkdir(parents=True, exist_ok=True)


# # Save transformed reference files with WER transformations
# for _, row in data_english.iterrows():
#     ref_output_path = transformed_ref_path_ENG_WER / f"{row['key']}_ref_transformed.txt"
#     with open(ref_output_path, 'w', encoding='utf-8') as f:
#         f.write(row['ref_transformed_WER'])
    
#     # Save transformed hypothesis files
#     hyp_output_path = transformed_hyp_path_ENG_WER / f"{row['key']}_hyp_transformed.txt"
#     with open(hyp_output_path, 'w', encoding='utf-8') as f:
#         f.write(row['hyp_transformed_WER'])

# # Save transformed reference files with CER transformations
# for _, row in data_english.iterrows():
#     ref_output_path = transformed_ref_path_ENG_CER / f"{row['key']}_ref_transformed.txt"
#     with open(ref_output_path, 'w', encoding='utf-8') as f:
#         f.write(row['ref_transformed_CER'])
    
#     # Save transformed hypothesis files with CER transformations
#     hyp_output_path = transformed_hyp_path_ENG_CER / f"{row['key']}_hyp_transformed.txt"
#     with open(hyp_output_path, 'w', encoding='utf-8') as f:
#         f.write(row['hyp_transformed_CER'])


# print(f"Saved {len(data_english)} transformed reference files to {transformed_ref_path_ENG_WER}")
# print(f"Saved {len(data_english)} transformed hypothesis files to {transformed_hyp_path_ENG_WER}")
# print(f"Saved {len(data_english)} transformed reference files to {transformed_ref_path_ENG_CER}")
# print(f"Saved {len(data_english)} transformed hypothesis files to {transformed_hyp_path_ENG_CER}")

### Calculating WER and CER for each reference-hypothesis pair

In [None]:
# Calculate WER and CER for each pair after transformation
results = []
for _, row in data_english.iterrows():
    # with open(reference_path / row['ref_file'], encoding='utf-8') as f:
    #     ref_text = f.read()
    #     ref_text = remove_speaker_labels(ref_text)
    # with open(hypothesis_path / row['hyp_file'], encoding='utf-8') as f:
    #     hyp_text = f.read()
    #     hyp_text = remove_speaker_labels(hyp_text)

    ref_text_ENG_WER = row['ref_transformed_WER']
    hyp_text_ENG_WER = row['hyp_transformed_WER']
    ref_text_ENG_CER = row['ref_transformed_CER']
    hyp_text_ENG_CER = row['hyp_transformed_CER']

    results.append({
        'key': row['key'],
        'ref_file': row['ref_file'],
        'hyp_file': row['hyp_file'],
        'WER': jiwer.wer(ref_text_ENG_WER, hyp_text_ENG_WER),
        'CER': jiwer.cer(ref_text_ENG_CER, hyp_text_ENG_CER),
    })

english_results = pd.DataFrame(results)
print(english_results)

### Calculating mean WER and CER for English data

In [None]:
#mean WER and CER
mean_WER = english_results['WER'].mean()
mean_CER = english_results['CER'].mean()
print(f"Mean WER: {mean_WER:.4f}")
print(f"Mean CER: {mean_CER:.4f}")

## Audio data analysis

In [None]:
# HOW TO HANDLE INTRO 
# Choose how to handle introductions
# Options: 
# "noop" - ignores intro handling
# "replace" - replaces placeholder in Human transcript with Batchaling2 generated intro
# "remove" - deletes intro from Batchaling2
mode = "replace"

In [None]:
from audio_quality_metrics import extract_audio_quality_metrics

In [None]:
# audio quality metrics for Polish data

audio_metric_df_PL = get_audio_metrics_dataframe(used_audio_path_PL)
final_df_PL = pd.merge(polish_results, audio_metric_df_PL, left_on="key", right_on="key")

output_name = f"ASR_results_{mode}_PL.csv"
final_df_PL.to_csv(output_name, index=False)
print("Saved:", output_name)


In [None]:
# audio quality metrics for Polish data

audio_metric_df_ENG = get_audio_metrics_dataframe(used_audio_path_ENG)
final_df_ENG = pd.merge(english_results, audio_metric_df_ENG, left_on="key", right_on="key")

output_name = f"ASR_results_{mode}_ENG.csv"
final_df_ENG.to_csv(output_name, index=False)
print("Saved:", output_name)
