In [1]:
import re, unicodedata, chinese_converter
from glob import glob
from lxml import etree


In [2]:
#text loading
corpus_files = glob("The-spoken-L1-corpus-main/L1-L1 transcripts/*.txt")
CM_txt = ""
for file in corpus_files:
    with open(file, "r", encoding='utf-8-sig') as f:
        CM_txt += f.read()

In [3]:
#pre-process corpus texts
processed_corpus0 = []
de_space = re.sub(" ", "\n", CM_txt)
de_mark = re.sub(r"<.+> ?", "", de_space) #remove codes in < > as it's coded for speaker ID, depersonalizing naming, and marks of uncertain transcripts
ta_variant = re.sub(r"她", "他", de_mark) #"she" is pronounced the same as "he"; therefore, here all "she" are turned into "he"
TA_char = re.sub(r"TA", "他", ta_variant) #the author of the corpus coded ambiguous he vs she as "TA". All "TA" turned into "he"
de_filler = re.sub(r"(?<![a-zA-Z])eng|erm?(?![a-zA-Z])", "\n", TA_char) #remove backchanneling, "eng", "er", "erm"
de_er2 = re.sub(r"儿(?!子|童)", "", de_filler) #remove 儿 due to it being mostly for phonemic markin#g
sents = de_er2.split("\n")
for sent in sents:
    if re.search(r"[\u4e00-\u9fff]+", sent): 
        processed_corpus0.append(sent)
#len(processed_corpus): 8483
#repeated bigram: 2114; repeatd trigram: 283

In [4]:
processed_corpus = []
for utt in processed_corpus0: #reduce repeated characters to one
    rep = re.findall(r"(.)\1+?", utt)
    if len(rep) == 1: #only one rep 1598x vs more than one rep 523x
        de_rep = re.sub(r"(.)\1+?", rep[0], utt)
        processed_corpus.append(de_rep)
    else:
        processed_corpus.append(utt)
    

In [5]:
#ngrams
def n_dict(n, corpus):
    n_dict = {}
    for sent in corpus:
        sent_len = len(sent)
        if sent_len >= n:
            for num in range(sent_len):
                if num+n <= sent_len:
                    n_gram = sent[num:num+n]
                    if n>1 and n_gram == sent[num]*n:
                        continue
                    else:
                        if n_gram in n_dict:
                            n_dict[n_gram] +=1
                        else:
                            n_dict[n_gram] =1
                else:
                    continue
    return n_dict
CM_fourdict = n_dict(4, processed_corpus)
CM_tridict = n_dict(3, processed_corpus)
CM_bidict = n_dict(2, processed_corpus)
CM_unidict = n_dict(1, processed_corpus)
print(len(CM_tridict), len(CM_bidict), len(CM_unidict)) #n-gram type count

104564 41635 2312


In [6]:
#sort dict by frequnecy into tuples
def sort_dict2tup(ndict, cutoff):
    tup_lst = []
    for key in ndict:
        if ndict[key] < 5: #skip extremely low frequency ngrams
            continue
        else:
            tup_lst.append((key,ndict[key]))
    tup_lst.sort(key = lambda x: x[1], reverse = True)
    return tup_lst[:cutoff+1]

#CM sorted tuples
CM_sort_fourtup = sort_dict2tup(CM_fourdict, len(CM_fourdict))
CM_sort_tritup = sort_dict2tup(CM_tridict, len(CM_tridict))
CM_sort_bitup = sort_dict2tup(CM_bidict, len(CM_bidict))
CM_sort_unitup = sort_dict2tup(CM_unidict, len(CM_unidict))


In [7]:
#remove nonword or phrase bigrams
CM_top_unigram = ['是', '就', '的', '我', '那', '有', '他', '不'] 
#['是', '就', '的', '我', '个', '那', '后']
CM_except_bigrams = ["可是", "然后", "还是", "什么"]

def rm_nonwords_bitup(ntup): #remove bigrams that contains CM_top_unigram--most likely non words or phrases
    cp_ntup = ntup.copy() #copy preserves the original ntup, making code edits easier
    for tp in cp_ntup[:]:
        if tp[0][0] in CM_top_unigram or tp[0][1] in CM_top_unigram: #any bigrams that has any unigrams from CM_top_unigram
            if tp[0] in CM_except_bigrams:
                continue
            else:
                cp_ntup.remove(tp)
        else:
            continue
    return cp_ntup
CM_bitup = rm_nonwords_bitup(CM_sort_bitup)


In [8]:
#remove nonword or phrase trigrams
CM_top_bigram = [tp[0] for tp in CM_bitup[:40]] #TM selected top 50 bigrams. Overall CM bigram inventory is ~80% of TM bigram inventory. Therefore, CM selects top 50*80%=40 bigrams
#['然后', '这个', '什么', '觉得', '一个', '因为', '可能', '时候', '特别', '其实', '所以', '比较', '还是', '这种', '自己', '这样', '一些', '知道', '现在', '怎么', '当时', '感觉', '可以', '反正', '了一', '很多', '之后', '或者', '而且', '也不', '应该', '好像', '一下', '这边', '你们', '地方', '后来', '时间', '个人', '去了', '东西']

def overlap_unigram(trigram, CM_top_unigram): #if more than 1 CM_top_unigram in the trigram, mark these trigrams to later reject them as these are likely nonwords
    count = 0
    for char in trigram:
        if char in CM_top_unigram:
            count += 1
    if count > 1:
        return True
    else:
        return False

CM_except_bigrams = ["为什么", "对不起", "新西兰", "怎么样"]
def rm_nonwords_tritup(ntup): #remove trigrams that has CM_top_bigram--most likely nonwords or phrases
    cp_ntup = ntup.copy()
    for tp in cp_ntup[:]:
        if tp[0][:2] in CM_top_bigram or tp[0][1:3] in CM_top_bigram:
            if tp[0] in CM_except_bigrams: #exclude exception(s) that are legitimate real words
                continue
            else:
                cp_ntup.remove(tp)
        elif overlap_unigram(tp[0], CM_top_unigram): #trigrams containing more than one common unigrams are likely nonwords/phrases
            if tp[0] == "有没有" or tp[0] == "是不是": #exclude exception(s) that are legitimate real words
                continue
            else:
                cp_ntup.remove(tp)
        else:
            continue
    return cp_ntup
CM_tritup = rm_nonwords_tritup(CM_sort_tritup)


In [9]:
# remove overlapping higher gram counts to avoid inflated counts
sort_tup_list = [CM_sort_unitup, CM_bitup, CM_tritup, CM_sort_fourtup]
def correct_sort_ntup(n, cutoff): # n<3
    sort_ntup = sort_tup_list[n-1]
    sort_n1tup = sort_tup_list[n]
    correct_ndict = {} 
    for ngram, value in sort_ntup:
        correct_ndict[ngram] = value #collect ngram texts and freq into dictionary for easier frequency editing
        for n1gram, vl in sort_n1tup[:int(len(sort_tup_list[n])*0.0025)]: #if the concurrent ngram in most frequent n+1gram, this needs to be deducted to avoid inflated counts, eg., "什麼(what)" in "為什麼(why)"
            if ngram in n1gram:
                correct_ndict[ngram] -= vl
    correct_sort_ntup = sort_dict2tup(correct_ndict, cutoff) #sort the new dictionary results after the correction
    return correct_sort_ntup
x = 0.02
CM_top_tritup = CM_tritup[:int(len(CM_tritup)*x)]
CM_top_bitup = correct_sort_ntup(2, int(len(CM_bidict)*0.375*x))
sort_tup_list.pop(1)
sort_tup_list.append(CM_top_bitup)
#replace the old n-tuple with the corrected one. This is only meaningful for uni-tuple counts
CM_top_unitup = correct_sort_ntup(1, int(len(CM_unidict)*(10*x)))


In [10]:
#write the ngrams (text only) into txt file
with open("CM_trigram.txt", "w") as f:
    for tup in CM_top_tritup:
        f.write(tup[0] + "\n")
with open("CM_bigram.txt", "w") as f:
    for tup in CM_top_bitup:
        f.write(tup[0] + "\n")
with open("CM_unigram.txt", "w") as f:
    for tup in CM_top_unitup:
        f.write(tup[0] + "\n")

In [11]:
#read unique CM grams in simplified Chinese and break into list of strings
with open("TM_trigram.txt", "r") as f:
    TM_trigram_txt = chinese_converter.to_simplified(f.read())
with open("TM_bigram.txt", "r") as f:
    TM_bigram_txt = chinese_converter.to_simplified(f.read())
with open("TM_unigram.txt", "r") as f:
    TM_unigram_txt = chinese_converter.to_simplified(f.read())

TM_trigram = TM_trigram_txt.split("\n")[:-1]
TM_bigram = TM_bigram_txt.split("\n")[:-1]
TM_unigram = TM_unigram_txt.split("\n")[:-1]


In [12]:
#uniq TM frequent words
def uniq_gram_count(n, TM_ngram, cutoff):
    CM_ntup_lst = [CM_sort_unitup, CM_bitup, CM_tritup]
    uniq_TM = []
    uniq_TM_count = 0
    CM_frequent_ngram = []
    for g, f in CM_ntup_lst[n-1][:int(cutoff*len(CM_ntup_lst[n-1]))]:
        CM_frequent_ngram.append(g)
    for ng in TM_ngram:
        if ng not in CM_frequent_ngram:
            if re.search(r"[a-zA-Z]+", ng):
                continue
            else:
                uniq_TM.append(ng)
                uniq_TM_count += 1
    return uniq_TM, uniq_TM_count

c = 0.9
print(uniq_gram_count(3, TM_trigram, c))
print(uniq_gram_count(2, TM_bigram, c))
print(uniq_gram_count(1, TM_unigram, c))


(['真的喔', '对不对', '跟你讲', '好不好', '跟他讲', '个礼拜', '跟我讲', '是怎样', '在干嘛', '譬如说', '没有啦', '超好笑', '很好笑', '我同学'], 14)
(['对对', '嗯嗯', '喔喔', '好笑', '讲说', '哪里', '妈妈', '好啦', '喔对', '夸张', '好好', '很像', '重点', '超好', '讲话', '很久', '刚好', '哪一', '台北', '可怕', '看看', '看起', '譬如', '在干', '恐怖', '后后', '国中', '么啊', '通常', '等一', '韩国', '刚刚', '头发', '很可', '部都', '爸爸', '原本', '可爱', '法律'], 39)
(['喔', '嗯', '耶', '-', '婆', '夸', '姊', '姑', '吵', '噢', '韩', '.', '烂', '恐', '夜', '李', '爆', '鬼', '譬', '帅', '怖', '赚', '扣', '赛', '1', '鞋', '薪', '群'], 28)


In [13]:
"""def to_trad_chin(lst): #translate to TM for documentation purpose
    convt_lst = []
    for word in lst:
        convt_lst.append(chinese_converter.to_traditional(word))
    return convt_lst
t = uniq_gram_count(3, TM_trigram, c)[0]
b = uniq_gram_count(2, TM_bigram, c)[0]
u = uniq_gram_count(1, TM_unigram, c)[0]
print(to_trad_chin(t))
print(to_trad_chin(b))
print(to_trad_chin(u))"""

'def to_trad_chin(lst): #translate to TM for documentation purpose\n    convt_lst = []\n    for word in lst:\n        convt_lst.append(chinese_converter.to_traditional(word))\n    return convt_lst\nt = uniq_gram_count(3, TM_trigram, c)[0]\nb = uniq_gram_count(2, TM_bigram, c)[0]\nu = uniq_gram_count(1, TM_unigram, c)[0]\nprint(to_trad_chin(t))\nprint(to_trad_chin(b))\nprint(to_trad_chin(u))'

In [14]:
sent_end = {}
for utter in processed_corpus:
    if utter[-1] in sent_end:
        sent_end[utter[-1]] +=1
    else:
        sent_end[utter[-1]] =1
sent_end_tup = []
for end in sent_end:
    sent_end_tup.append((end, sent_end[end]))

sent_end_tup.sort(key = lambda x: x[1], reverse = True)
freq_SFP = ['啊','吗','了','吧','嘛','呀','呢','哦','呗']
common_SFP = ['啊','吗','了','吧','嘛','呀','呢','哦']
def token_count(tuplst, lst): 
    count = 0
    for tp in tuplst:
        if tp[0] in lst:
            count += tp[1]
    return count
print(token_count(sent_end_tup, freq_SFP))
print(token_count(sent_end_tup, common_SFP)) #token frequency for the 8 common SFPs



2260
2229


In [36]:
common_SFP_omit_le = ['啊','吗','吧','嘛','呀','呢','哦']
def SFP_count(chi_char):
    SFP_count = 0
    for utt in processed_corpus:
        for char in utt:
            if char == chi_char:
                SFP_count += 1
    return SFP_count
print(SFP_count("了"))

2806


In [32]:
def prob_SFP(corpus, SFP_lst):
    count = 0
    corpus_len = 0
    for utt in corpus:
        corpus_len += len(utt)
        for char in utt:
            if char in SFP_lst:
                count += 1
    return count/(corpus_len-SFP_count("了")), count, corpus_len-SFP_count("了")
print(prob_SFP(processed_corpus, common_SFP_omit_le))

(0.02501605681786449, 5375, 214862)
