In [None]:
%cd /data/codes/prep_ps_pykaldi

import pandas as pd
import os
from glob import glob
import json
from pandarallel import pandarallel
import random
import re

pandarallel.initialize(nb_workers=10, progress_bar=True)

In [None]:
audio_dir = "/data/audio_data/prep_submission_audio/12"
metadata_path="/data/audio_data/pronunciation_scoring_result/dataset/info_out_domain_long_sentence_testset.csv"
metadata = pd.read_csv(metadata_path)
metadata.dropna(inplace=True)
metadata.head(20)

In [None]:
def check_audio_is_exist(audio_id):
    abs_path = os.path.join(audio_dir, f'{audio_id}.wav')
    if os.path.exists(abs_path):
        return True
    return False

metadata["is_exist"] =  metadata.id.parallel_apply(check_audio_is_exist)
print(metadata.shape)
metadata = metadata[metadata["is_exist"] == True]
print(metadata.shape)

In [None]:
def get_decision(json_path):
    with open(json_path, "r") as f:
        raw_sample = json.load(f)

    if "api_version" not in raw_sample:
        return None
            
    utterance = raw_sample["utterance"][0]    
    decisions = []

    for word in utterance["words"]:
        decisions.append(word["decision"])

    return decisions
    
json_dir = "/data/audio_data/pronunciation_scoring_result/marking_data/12"
decisions = metadata.id.parallel_apply(lambda x: get_decision(os.path.join(json_dir, f'{x}.json')))
decisions = decisions.explode()
decisions.hist()

In [None]:
def get_decision(json_path):
    with open(json_path, "r") as f:
        raw_sample = json.load(f)

    if "api_version" not in raw_sample:
        return None
            
    utterance = raw_sample["utterance"][0]    
    decisions = []

    for word in utterance["words"]:
        for phoneme in word["phonemes"]:
            decisions.append(phoneme["decision"])

    return decisions
    
json_dir = "/data/audio_data/pronunciation_scoring_result/marking_data/12"
decisions = metadata.id.parallel_apply(lambda x: get_decision(os.path.join(json_dir, f'{x}.json')))
decisions = decisions.explode()
decisions.hist()

In [None]:
def get_phoneme_error(json_path):
    try:
        with open(json_path, "r") as f:
            raw_sample = json.load(f)

        if "api_version" not in raw_sample:
            return None
                
        utterance = raw_sample["utterance"][0]    
        phoneme_errors = []

        for word in utterance["words"]:
            for phoneme in word["phonemes"]:
                if phoneme["phoneme_error"] == "normal":
                    phoneme_errors.append(phoneme["phoneme_error"])
                elif "-" in phoneme["phoneme_error"]:
                    phoneme_errors.append("other")
                else:
                    phoneme_errors.append(phoneme["phoneme_error"])
                    
        return phoneme_errors
    except:
        return None
    
json_dir = "/data/audio_data/pronunciation_scoring_result/marking_data/12"
phoneme_errors = metadata.id.parallel_apply(lambda x: get_phoneme_error(os.path.join(json_dir, f'{x}.json')))
phoneme_errors = phoneme_errors.explode()
phoneme_errors.hist()

In [None]:
def is_valid_phoneme(phoneme):
    if phoneme["phoneme_error_arpabet"] != "normal":
        trans = phoneme["phoneme_error_arpabet"].split(" - ")[-1]
        if len(trans.split(" ")) >= 2:
            return False
    return True

def extract_user_trans(phoneme):
    valid = True
    if phoneme["phoneme_error_arpabet"] != "normal":
        arpa, trans = phoneme["phoneme_error_arpabet"].split(" - ")
        
        try:
            assert arpa == phoneme["trans_arpabet"]
        except:
            valid = False
            # print(phoneme["phoneme_error_arpabet"], phoneme["trans_arpabet"])
    else:
        arpa, trans = phoneme["trans_arpabet"], phoneme["trans_arpabet"]

    return arpa, trans, valid
        
            
def parse_metadata_data(json_path):
    # try:
        with open(json_path, "r") as f:
            raw_sample = json.load(f)

        if "api_version" not in raw_sample:
            return None
        
        assert len(raw_sample["utterance"]) == 1
        
         
        words, phonemes = [], []
        utterance = raw_sample["utterance"][0] 
        for word in utterance["words"]:
                        
            _phonemes, _trans_phonemes = [], []
            for phoneme in word["phonemes"]:
                if not is_valid_phoneme(phoneme):
                    return None
            
                arpa, trans, valid = extract_user_trans(phoneme)
                if valid == False:
                    return None
                text = phoneme["text"]
                # score = phone_decision_to_score[phoneme["decision"]]
                score = phoneme["nativeness_score"]

                _phoneme = {
                    "text": text,
                    "trans": trans,
                    "arpa": arpa,
                    "score": score,
                }
                _phonemes.append(_phoneme)
                _trans_phonemes.append(trans)
            phonemes.append(_phonemes)

            text = word["text"]
            arpa = word["trans_arpabet"]
            trans = " ".join(_trans_phonemes).replace("SCHWA", "AH")
            # score = word_decision_to_score[word["decision"]]
            score = word["nativeness_score"]

            word = {
                "text": text,
                "arpa": arpa,
                "trans": trans,
                "score": score
            }

            words.append(word)
                
        metadata = {
            "words": words,
            "phonemes": phonemes,
            "utterance": utterance["nativeness_score"]
        }

        return json.dumps(metadata, ensure_ascii=False)
    # except:
    #     return None
    
json_dir = "/data/audio_data/pronunciation_scoring_result/marking_data/12"
metadata["score"] = metadata.id.parallel_apply(lambda x: parse_metadata_data(os.path.join(json_dir, f'{x}.json')))

print(metadata.shape)
metadata = metadata[metadata["score"].notna()]
print(metadata.shape)

# metadata.head(100000).id.apply(lambda x: parse_metadata_data(os.path.join(json_dir, f'{x}.json')))

In [None]:
def get_phone_score(score):
    phone_scores = []
    score = json.loads(score)
    for words in score["phonemes"]:
        for phoneme in words:
            phone_scores.append(phoneme["score"])

    return phone_scores

tmp = metadata.score.apply(lambda x: get_phone_score(x))
tmp = tmp.explode()
tmp.hist(bins=100)

In [None]:
def get_word_score(score):
    word_scores = []
    score = json.loads(score)
    for words in score["words"]:
        word_scores.append(words["score"])

    return word_scores

tmp = metadata.score.parallel_apply(lambda x: get_word_score(x))
tmp = tmp.explode()
tmp.hist(bins=100)

In [None]:
def get_sent_score(score):
    score = json.loads(score)["utterance"]

    return score

tmp = metadata.score.parallel_apply(lambda x: get_sent_score(x))
tmp.hist(bins=100)

In [None]:
def preprocess_text(text):
    text = re.sub("[\,\.\;\:\!\?]", " ", text)
    text = text.upper()
    text = text.strip()
    return text
metadata["question_content"] = metadata.question_content.apply(preprocess_text)

In [None]:
# lexicon_path = "resources/lexicon.txt"
# lexicon = pd.read_csv(lexicon_path, sep="\t", names=["word", "arpa"])
# lexicon.head()

# vocab = {}
# for index in lexicon.index:
#     word = lexicon["word"][index]
#     arpa = lexicon["arpa"][index]

#     if word not in vocab:
#         vocab[word] = [arpa, ]
#     else:
#         vocab[word].append(arpa)


In [None]:
lexicon_path = "resources/lexicon.txt"
vocab = {}
with open(lexicon_path, "r", encoding="utf-8") as f:
    lines = f.readlines()
    lines.reverse()
    lines = [line.strip().split() for line in lines]
    lines = [[line[0], " ".join(line[1:])] for line in lines]
    
    for word, phoneme in lines:
        vocab[word] = phoneme
        

In [None]:
def filter_data(text, words):
    words = json.loads(words)["words"]
    if len(text.split()) != len(words):
        return False
    for word, phoneme in zip(text.upper().split(), words):
        if word not in vocab:
            return False
        if phoneme["arpa"] not in vocab[word]:
            return False
    
    return True
metadata["is_selected"] = metadata.apply(lambda x: filter_data(x["question_content"], x["score"]), axis=1)
print(metadata.shape)
metadata = metadata[metadata["is_selected"]==True].reset_index()
print(metadata[metadata["is_selected"]==True].shape)


In [None]:
# metadata[["user_id", "id", "question_content", "score", "question_id"]].to_csv("/data/codes/prep_ps_pykaldi/prep_data/raw/info_out_domain_short_sentence_testset.csv")
metadata[["user_id", "id", "question_content", "score", "question_id"]].to_csv("/data/codes/prep_ps_pykaldi/prep_data/raw/info_out_domain_long_sentence_testset_old.csv")