In [None]:
# https://commonvoice.mozilla.org/en/datasets

In [13]:
import pandas as pd
from tqdm import tqdm
import IPython

import argparse
import multiprocessing as mp
import re
from functools import partial
from typing import Dict, List

import pandas as pd
from pythainlp.tokenize import word_tokenize

In [None]:
# load common voice validated data
validated = pd.read_csv("/mnt/d/data/cv-corpus-15.0-2023-09-08/th/validated.tsv", delimiter="\t")

In [18]:
# filter only no down vote
validated = validated[validated["down_votes"] == 0].sort_values("client_id").reset_index(drop=True)[["client_id", "path", "sentence"]]

In [19]:
def format_repeat(text: str) -> str:
    """Format Thai sentence containing ๆ
    
    Arguments
    ---------
    text: str
        text to be processed

    Return
    ------
    formatted_text: str
        formatted text that got repeated by ๆ
    """
    # check whether sentence start with ๆ
    text = text.replace(" ", "");
    if text[0] == "ๆ":
        raise ValueError(f"ๆ must not be at the start of sentence: {text}");

    tokenized_text: List[str] = word_tokenize(text);
    formatted_text: List[str] = [];

    for i, word in enumerate(tokenized_text):
        if "ๆ" in word:
            splitted_word: List[str] = [x for x in re.split("(ๆ)", word) if x != ""];
            if splitted_word[0] == "ๆ":
                # if splitted words are all ๆ
                last_word: str = tokenized_text[i-1];
                for c in word:
                    if c == "ๆ":
                        formatted_text.append(last_word);
                formatted_text.append(word.replace("ๆ", ""));
            else:
                current_word = splitted_word[0];
                for w in splitted_word:
                    if w != "ๆ":
                        current_word: str = w;
                        formatted_text.append(w);
                    else:
                        formatted_text.append(current_word);
        else:
            formatted_text.append(word)
    return "".join(formatted_text);


def correct_sentence(sentence: str, custom_dict: Dict[str, str] = {}) -> None:
    """Correct misspell sentence according to the following rule
    1. check whether แ is spelled by เ + เ
    2. check whether ำ is spelled by  ํ + า
    3. check whether tonal mark ( ่,  ้,  ๊,  ๋ ) is followed after vowel ( ั, ำ, ุ, ู )
    and save it in output file

    Arguments
    ---------
    sentence: str
        Sentence to be corrected
    """
    tonal_marks: List[str] = ["่", "้", "๊", "๋"];
    vowel: List[str] = ["ั", "ุ", "ู", "ํ"];

    # replace custom dict
    for word, replace_word in custom_dict.items():
        if word in sentence:
            sentence = sentence.replace(word, replace_word);
            # print(f"CUSTOM DICT: Replace `{word}` => `{replace_word}`");

    if "เเ" in sentence:
        sentence = sentence.replace("เเ", "แ");  # correct เ + เ -> แ
        # print(f"Correct เ + เ => แ");
    if "ํา" in sentence:
        sentence = sentence.replace("ํา", "ำ");  # correct ํ + า -> ำ
        # print(f"Correct ํ + า => ำ");
    if "ๆ" in sentence:
        sentence = format_repeat(sentence);
        # print("ๆ Replaced")
    # correct #3
    corrected_sentence: str = sentence;
    for i in range(len(sentence) - 1):
        char: str = sentence[i];
        next_char: str = sentence[i+1];
        if char in tonal_marks and next_char in vowel:
                corrected_sentence: List[str] = list(corrected_sentence);
                corrected_sentence[i] = next_char;
                corrected_sentence[i+1] = char;
                corrected_sentence: str = str(corrected_sentence);
                # print(f"Corrected `{char}` + `{next_char}` => `{next_char}` + `{char}`");
        if char == "ํ" and next_char in tonal_marks and sentence[i+2] == "า":
            corrected_sentence = corrected_sentence.replace(f"ํ{next_char}า", f"{next_char}ำ");
            # print(f"Corrected `ํ` + `{next_char}` + `า` => `{next_char}` + `ำ`");
    
    return corrected_sentence

def remove_special_char(text: str) -> str:
    text = text.replace("!", "")
    text = text.replace('"', "")
    text = text.replace("'", "")
    text = text.replace(',', "")
    text = text.replace('-', "")
    text = text.replace('.', "")
    text = text.replace(':', "")
    text = text.replace(';', "")
    text = text.replace('?', "")
    text = text.replace('[', "")
    text = text.replace(']', "")
    text = text.replace(':', "")
    text = text.replace('_', "")
    text = text.replace('|', "")
    text = text.replace('~', "")
    text = text.replace('—', "")
    text = text.replace('‘', "")
    text = text.replace('’', "")
    text = text.replace('“', "")
    text = text.replace('”', "")
    text = text.replace('ฯ', "")
    return text.lower()
    
    
def get_char(texts: List[str]) -> List[str]:
    """Get unique char from list of documents"""
    return sorted(set([char for sent in texts for char in sent]))

In [21]:
validated["sentence"] = validated["sentence"].map(correct_sentence)
validated["sentence"] = validated["sentence"].map(format_repeat)
validated["sentence"] = validated["sentence"].map(remove_special_char)

In [None]:
# print unique character
get_char(validated["sentence"].values)

In [23]:
validated

Unnamed: 0,client_id,path,sentence
0,00086f5dc46f9038f13bbd829c4118fab3ac28688d5ee8...,common_voice_th_25695281.mp3,ใครเป็นผู้รับ
1,001373e40d26bbdd9c2416bd189496a707950947125144...,common_voice_th_28444375.mp3,พวกเราต้องทำให้แน่ใจว่ามันจะไม่เกิดขึ้นอีก
2,001373e40d26bbdd9c2416bd189496a707950947125144...,common_voice_th_28444373.mp3,ทางฝ่ายพระเจ้ากรุงจีน
3,001373e40d26bbdd9c2416bd189496a707950947125144...,common_voice_th_28443814.mp3,ที่ข้าพเจ้าได้เคยล่วงเกินท่านไว้
4,001373e40d26bbdd9c2416bd189496a707950947125144...,common_voice_th_28443793.mp3,เสื้อกันลมทำให้ตัวเขาแห้งสนิทถึงแม้มันจะเปียกฝน
...,...,...,...
129513,fffc52acec32b85f7fed484ce96120c469628b46f7d553...,common_voice_th_25678469.mp3,ฉันตื่นเต้นที่กำลังจะได้เริ่มงานใหม่
129514,fffc52acec32b85f7fed484ce96120c469628b46f7d553...,common_voice_th_25678473.mp3,สำหรับฉันแล้วดูเหมือนว่าอุตสาหกรรมของเขาผิดไปห...
129515,fffc52acec32b85f7fed484ce96120c469628b46f7d553...,common_voice_th_25678477.mp3,ในขณะเดียวกันหนังสือเล่มนี้ได้บังคับให้เขาหวนร...
129516,fffc52acec32b85f7fed484ce96120c469628b46f7d553...,common_voice_th_25678087.mp3,ในวิดีโอมีชายคนหนึ่งหยุดให้แมวข้ามถนน


In [24]:
import re

def contains_letters(text):
    pattern = re.compile('[a-zA-Z]')
    return bool(re.search(pattern, text))

In [28]:
# split train test validate
train_ratio = 0.9
dev_ratio = 0.03
test_ratio = 0.07

train_row = int(train_ratio*validated.shape[0])
dev_row = int(dev_ratio*validated.shape[0])
test_row = int(test_ratio*validated.shape[0])

train_client_set = set()
dev_client_set = set()
test_client_set = set()

train_data = pd.DataFrame([], columns=["path", "sentence"])
dev_data = pd.DataFrame([], columns=["path", "sentence"])
test_data = pd.DataFrame([], columns=["path", "sentence"])

for idx, row in tqdm(validated.iterrows(), total=validated.shape[0]):
    client_id = row["client_id"]
    path = row["path"]
    sentence = row["sentence"]

    # skip english sentence
    if contains_letters(sentence):
        continue

    # filter some wrong sentence
    if sentence == "เขาหยุดแล้วคว้ามือออกมาเฉียดตัวฉันไปเส้นยาแเงผ่าแปด":
        sentence = "เขาหยุดแล้วคว้ามือออกมาเฉียดตัวฉันไปเส้นยาแดงผ่าแปด"

    # we use leave one out method for evaluate, so the test and validate dataset won't have the same person id as train dataset
    if client_id in train_client_set:
        train_data = pd.concat([train_data, pd.DataFrame([[path, sentence]], columns=["path", "sentence"])])
    elif client_id in dev_client_set:
        dev_data = pd.concat([dev_data, pd.DataFrame([[path, sentence]], columns=["path", "sentence"])])
    elif client_id in test_client_set:
        test_data = pd.concat([test_data, pd.DataFrame([[path, sentence]], columns=["path", "sentence"])])
    elif train_data.shape[0] < train_row:
        train_data = pd.concat([train_data, pd.DataFrame([[path, sentence]], columns=["path", "sentence"])])
        train_client_set.add(client_id)
    elif dev_data.shape[0] < dev_row:
        dev_data = pd.concat([dev_data, pd.DataFrame([[path, sentence]], columns=["path", "sentence"])])
        dev_client_set.add(client_id)
    elif test_data.shape[0] < test_row:
        test_data = pd.concat([test_data, pd.DataFrame([[path, sentence]], columns=["path", "sentence"])])
        test_client_set.add(client_id)

100%|██████████████████████████████████████████████████████████████| 129518/129518 [04:17<00:00, 503.69it/s]


In [42]:
# export data
train_data.to_csv("cv15_train.csv", index=False)
dev_data.to_csv("cv15_dev.csv", index=False)
test_data.to_csv("cv15_test.csv", index=False)