In [27]:
import os
import glob
import re
import json
import string
import itertools


from sacremoses import MosesTokenizer, MosesDetokenizer

OUTPUT_DIR = "preprocessed_data/elitr"

ELITR_EN_DIR = "../../datasets/ELITR Minuting Corpus/ELITR-minuting-corpus/elitr-minuting-corpus-en"
ELITR_AUTOMIN_2023_DIR = "../../datasets/automin-2023-data/Task-A"

TRAIN_DIR = "train"
DEV_DIR = "dev"
TEST_DIR = "test"
TEST2_DIR = "test2"
AUTOMIN_EN_DIR = "test2023-en"

In [28]:
def read_transcripts(meetings_dir):
    transcripts = {}

    for meeting_id in sorted(os.listdir(meetings_dir)):
        meeting_dir = os.path.join(meetings_dir, meeting_id)
        transcript_file = glob.glob(os.path.join(meeting_dir, "transcript_*.txt"))[0]

        with open(transcript_file, "r", encoding="utf-8") as f:
            transcript = f.read().splitlines()
            transcripts[meeting_id] = transcript

    return transcripts

In [29]:
en_train = read_transcripts(os.path.join(ELITR_EN_DIR, TRAIN_DIR))
en_dev = read_transcripts(os.path.join(ELITR_EN_DIR, DEV_DIR))
en_test = read_transcripts(os.path.join(ELITR_EN_DIR, TEST_DIR))
en_test2 = read_transcripts(os.path.join(ELITR_EN_DIR, TEST2_DIR))
en_automin2023 = read_transcripts(os.path.join(ELITR_AUTOMIN_2023_DIR, AUTOMIN_EN_DIR))

In [30]:
def parse_transcript_by_speaker(transcript):
    parsed_transcipt = []

    for line in transcript:
        if line.startswith("(PERSON"):
            match = re.match(r"\((PERSON\d?\d?)\)(.*)", line)
            role, utterance = match.group(1), match.group(2).strip()
            parsed_transcipt.append({"role": role, "utterance": [utterance]})
        elif len(parsed_transcipt) > 0:
            parsed_transcipt[-1]["utterance"].append(line.strip())

    return parsed_transcipt

In [31]:
def remove_asr_errors(tokens):
    ASR_STOPWORDS = [r"u+h+m*-?", r"m*h+m+-?", r"u+m+-?", r"e+h+m*-?", r"e*m+-?", r"e+r+m+-?", r"a+h+", r"u+h+n+-?", r"h+u+(h|m)+-?"]
    ASR_STOPWORDS_COMBINATIONS = [f"{stop0}-{stop1}" for stop0, stop1 in (itertools.combinations(ASR_STOPWORDS, 2))]

    # Remove ASR stopwords
    filtered_tokens = [token for token in tokens if not any(re.fullmatch(regex, token.lower()) for regex in ASR_STOPWORDS + ASR_STOPWORDS_COMBINATIONS)]

    # Words ending with '-' -> remove if prefix of next word or just remove '-' from end
    filtered_tokens2 = []

    for idx, token in enumerate(filtered_tokens):
        if token == "-" or not token.endswith("-"):
            filtered_tokens2.append(token)
        elif idx == len(filtered_tokens)-1 or not filtered_tokens[idx+1].lower().startswith(token[:-1].lower()):
            filtered_tokens2.append(token[:-1])

    return filtered_tokens2

def remove_tags(text):
    text = re.sub(r"<.*?>", "", text)
    text = re.sub(r"\(?\)", "", text)
    text = re.sub(r"\[", "", text)
    text = re.sub(r"]", "", text)
    text = re.sub(r"\(", "", text)
    text = re.sub(r"\)", "", text)

    return text

def is_punct(str):
    return all(c in string.punctuation + "–" for c in str)

def normalize_text(text):
    # Remove tags
    text = remove_tags(text)

    # Tokenizer and detokenizer
    tokenizer = MosesTokenizer(lang="en")
    detokenizer = MosesDetokenizer(lang="en")

    # Tokenize and remove ASR errors
    tokens = tokenizer.tokenize(text)
    tokens = remove_asr_errors(tokens)

    # Remove punctuation at the start of sentence
    try:
        first_non_punct_idx = next(idx for idx, token in enumerate(tokens) if not is_punct(token))
        tokens = tokens[first_non_punct_idx:]
    except StopIteration:
        tokens = []

    if len(tokens) > 0:
        # Remove consecutive duplicates
        tokens = [token for idx, token in enumerate(tokens) if idx == 0 or token.lower() != tokens[idx-1].lower()]

        # Remove consecutive punctuation
        tokens = [token for idx, token in enumerate(tokens) if idx == len(tokens) - 1 or not is_punct(tokens[idx]) or not is_punct(tokens[idx+1])]

        # Start sentence with uppercase
        tokens[0] = tokens[0][0].upper() + tokens[0][1:]

        # End sentence with punctuation
        if not is_punct(tokens[-1][-1]):
            tokens.append(".")

    # Detokenize
    return detokenizer.detokenize(tokens)

In [32]:
def preprocess_transcript(transcript):
    roles = []
    utterances = []

    for line in transcript:
        normalized_utterance = [normalize_text(sentence) for sentence in line["utterance"]]
        normalized_utterance = " ".join(sentence for sentence in normalized_utterance if len(sentence) > 0)

        if len(normalized_utterance) > 0:
            roles.append(line["role"])
            utterances.append(normalized_utterance)

    assert len(roles) == len(utterances)
    return {"roles": roles, "utterances": utterances}

In [33]:
def preprocess_transcripts(transcripts):
    preprocessed_transcripts = {}

    for meeting_id, transcript in transcripts.items():
        preprocessed_transcript = parse_transcript_by_speaker(transcript)
        preprocessed_transcripts[meeting_id] = preprocess_transcript(preprocessed_transcript)

    return preprocessed_transcripts

In [34]:
en_train_preprocessed = preprocess_transcripts(en_train)
en_dev_preprocessed = preprocess_transcripts(en_dev)
en_test_preprocessed = preprocess_transcripts(en_test)
en_test2_preprocessed = preprocess_transcripts(en_test2)
en_automin2023_preprocessed = preprocess_transcripts(en_automin2023)

In [35]:
def save_preprocessed(preprocessed, output_dir, output_file):
    os.makedirs(os.path.join(OUTPUT_DIR, output_dir), exist_ok=True)

    with open(os.path.join(OUTPUT_DIR, output_dir, f"{output_file}.json"), "w") as f:
        json.dump(preprocessed, f, ensure_ascii=False, indent=4)

In [36]:
save_preprocessed(en_train_preprocessed, "en", TRAIN_DIR)
save_preprocessed(en_dev_preprocessed, "en", DEV_DIR)
save_preprocessed(en_test_preprocessed, "en", TEST_DIR)
save_preprocessed(en_test2_preprocessed, "en", TEST2_DIR)
save_preprocessed(en_automin2023_preprocessed, "en", AUTOMIN_EN_DIR)