In [None]:
'''
해당 코드는 Postprocess.ipynb 파일 코드들을 압축한 것
'''
from tqdm import tqdm
from konlpy.tag import Mecab
from soynlp.hangle import jamo_levenshtein
from kspon_preprocess import special_filter, bracket_filter

import json
import pandas as pd
import numpy as np
import re

# vocab 생성

In [None]:
'''
vocab에 입력할 단어 생성
- 온갖 특수 기호 및 영어, 숫자, 퍼센트 등 한글 외의 데이터는 무시
- 성능과 별개로, 속도를 위해 가장 빠른 정제 속도를 가진 mecab 을 사용
- text_path_list : vocab으로 사용할 모든 텍스트 데이터들의 경로를 담은 자료형 변수
'''
mecab = Mecab()
remove_re = '[a-zA-Z0-9%]'
raw_texts = list()
text_list = list()
for texts in tqdm(text_path_list):
    try:
        with open(texts, 'r', encoding='cp949') as f:
            text = f.read()
    except:
        with open(texts, 'r') as f:
            text = f.read()
    # ksponspeech 데이터셋 전용 정제 모듈인 special_filter, bracket_filter 을 사용
    text = special_filter(bracket_filter(text))
    if re.findall(remove_re, text) == []:
        text_list.append(text)
        text = mecab.pos(text)
        for tt in text:
            # mecab.pos를 통해 추출한 품사들 중 품사명의 가장 맨 앞(ex. tt[1].find("N")==0) 부분이 N or V 일 경우, 명사, 형용사, 동사 판단하고 이들만 저장
            if (tt[1].find("N") == 0) or (tt[1].find("V") == 0):
                raw_texts.append(tt[0])

raw_words = list(set(raw_texts))

print(len(raw_words))
print(raw_words[:5])

In [None]:
def build_vocabulary(word_list:list) -> list:
    '''
    리스트에 포함된 모든 단어들을 dict 형태의 vocab으로 변환
    '''
    word_list.sort()
    
    vocab_dict = {}
    for index, word in enumerate(word_list):
        vocab_dict[word] = index
        
    return vocab_dict

vocab_path = "/wav2vec2/s-kr/fine-tune/transformer/post_vocab.json"

## vocab 파일 로드
with open(vocab_path, 'r') as f:
    origin_vocab_dict = json.load(f)

## vocab 파일 생성
# with open(vocab_path, 'w') as vocab_file:
#     json.dump(vocab_dict, vocab_file)

print(len(origin_vocab_dict))

# vocab 동시등장행렬 구축

In [None]:
'''
데이터프레임으로 기본적인 동시등장행렬 생성
vocab size에 따른 행, 열 개수 결정됨
'''
co_ocurrence_vectors = pd.DataFrame(
    np.zeros([len(origin_vocab_dict), len(origin_vocab_dict)]),
    index = origin_vocab_dict.keys(),
    columns = origin_vocab_dict.keys()
)

co_ocurrence_vectors.head()

In [None]:
pages = text_list

'''
- 11~12시간 소요..(vocab 54000개 기준)
- vocab에 사용한 모든 텍스트 원문들을 다시 mecab.pos를 통해 명사, 형용사, 동사만 추출
- co_ocurrence_vectors.loc를 통해 각 단어쌍이 등장하면 해당 값에 +1 을 해줌
'''
for page in tqdm(pages):
    elements_list = [elements for elements in mecab.pos(page) if (elements[1].find("N") == 0) or (elements[1].find("V") == 0)]
    for idx1, element1 in enumerate(elements_list):
        for idx2, element2 in enumerate(elements_list):
            # if idx1 < idx2:
                try:
                    co_ocurrence_vectors.loc[element1[0], element2[0]] = (co_ocurrence_vectors.loc[element1[0], element2[0]]+1)
                except:
                    pass
                
co_ocurrence_vectors.head()

In [None]:
# 약 10GB 크기의 csv 파일 생성(vocab 54000개 기준)
co_ocurrence_vectors.to_csv("/wav2vec2/s-kr/fine-tune/transformer/co_occur_df.csv")

In [None]:
# co_ocurrence_vectors csv 파일 로드 / 대략 1시간 30분 정도 소요(vocab 54000개 기준)
co_ocurrence_vectors = pd.read_csv("/wav2vec2/s-kr/fine-tune/transformer/co_occur_df.csv")
co_ocurrence_vectors = co_ocurrence_vectors.set_index(keys=['Unnamed: 0'], inplace=False, drop=True)
co_ocurrence_vectors.head()

# 추론 데이터셋 구축

In [None]:
infer_dataset_file = "/wav2vec2/s-kr/fine-tune/transformer/Korean_corpus_2021.tsv"

re_string1 = '[^A-Za-z0-9가-힣]'
re_string2 = '[^A-Za-z0-9가-힣 ]'

df = pd.read_csv(infer_dataset_file, sep='\t', index_col=0)

df.drop(['id', 'original_form'], axis=1, inplace=True)
test_df = df.copy()

## lambda를 활용하여 모든 데이터프레임의 문장에서 영어, 숫자, 한글(음절)을 제외한 기호를 전부 제거
test_df['form'] = test_df['form'].apply(lambda x: re.sub(re_string1, '', str(x)))
test_df['corrected_form'] = test_df['corrected_form'].apply(lambda x: re.sub(re_string1, '', str(x)))
## form과 corrected_form 데이터의 차이가 특수기호 유무로만 나뉘는 데이터들을 제외하기 위해 != 를 사용
new_df = test_df[test_df['form'] != test_df['corrected_form']]
## 너무 짧은 문장은 동시등장행렬로 구축할 때 수가 너무 적어 코사인 유사도 계산이 안되므로 최소 길이 4를 설정 후 다시 != 로 정제
new_df = new_df.apply(lambda x: x if len(x) > 4 else " ")
new_df = new_df[new_df['form'] != new_df['corrected_form']]
new_df.head()

In [None]:
'''
- 정확하게 form과 corrected_form 데이터의 차이가 음절 오탈자로 인해 생긴 데이터만 남김
- random을 자유롭게 사용함으로써 랜덤 추출을 할 지, 데이터를 지정할 지 결정 가능
'''
import random

refine_idx = list(new_df.index)
random.shuffle(refine_idx)
refine_idx = refine_idx[:1000]
## 원본 데이터 df에서 사용할 데이터셋을 refine_idx로 뽑아냄
refine_df = df.loc[refine_idx]
refine_df = refine_df.reset_index(drop=True)
## 다시 한 번 쓸데없는 기호들을 제거
refine_df['form'] = refine_df['form'].apply(lambda x: re.sub(re_string2, '', str(x)))
refine_df['corrected_form'] = refine_df['corrected_form'].apply(lambda x: re.sub(re_string2, '', str(x)))
## 빠른 추론 확인을 위해 우선 100개만 사용하기로 결정
refine_df = refine_df[:100]
refine_df.head()

In [None]:
error_text = list(refine_df['form'][0:50])
correct_text = list(refine_df['corrected_form'][50:])
## 추론에 사용할 데이터 리스트 생성 완료
f1_text_list = error_text + correct_text

# 추론 및 성능 확인

In [None]:
def cos_sim(a: list, b: list) -> float:
    '''
    입력받은 a,b 가 0 으로만 구성된 리스트일 경우,
    norm(a) or norm(b) 연산 과정에서 0.0 이 나올 우려가 있으므로
    이를 위해 0으로 구성된 리스트에게 0.1을 더하여 분모값이 0.0이 되는 것을 방지한다.
    '''
    try:
        if (a == [.0] * len(a)) or (b == [.0] * len(b)):
            raise RuntimeWarning
        cosine = dot(a, b)/(norm(a)*norm(b))
    except:
        if (a == [.0] * len(a)) or (b != [.0] * len(b)):
            cosine = dot(a, b)/((norm(a)+0.01)*(norm(b)))
        elif (a != [.0] * len(a)) or (b == [.0] * len(b)):
            cosine = dot(a, b)/(norm(a)*(norm(b)+0.01))
        else:
            cosine = dot(a, b)/((norm(a)+0.01)*(norm(b)+0.01))
    finally:
        return cosine
    

def co_occurence_matrix(input_elements, input_vector: pd.DataFrame, vocab_vector=co_ocurrence_vectors: pd.DataFrame) -> pd.DataFrame:
    for element1 in input_elements:
        for element2 in input_elements:
            try:
                input_vector.loc[element1, element2] = vocab_vector.loc[element1, element2]
            except:
                input_vector.loc[element1, element2] = 0
    
    return input_vector


def refine_ed_word(typo_word_list: list, vocab_dict=origin_vocab_dict: dict, ed_score=0.5: float) -> dict:
    ed_dict = dict()
    vocab_list = list(vocab_dict.keys())
    for typo in typo_word_list:
        ed_list = list()
        for vocab in vocab_list:
            ed_score = jamo_levenshtein(typo, vocab)
            # 편집거리 임계치 설정
            if ed_score < 0.5:
                ed_list.append(vocab)
        # 각 오탈자 별 대체 단어 후보들을 입력
        ed_dict[typo] = ed_list
    
    return ed_dict
    
    
output_text_list = list()
error_idx = []

In [None]:
for tp_idx, tp_text in enumerate(tqdm(f1_text_list)):
    ## 오탈자 문장마다 명사, 동사, 형용사 단어만 추출
    input_elements = [elements[0] for elements in mecab.pos(tp_text) if (elements[1].find("N") == 0) or (elements[1].find("V") == 0)]
    ## 각 문장마다 동시등장행렬 생성을 위해 입력받은 문장으로 vocab 생성
    vocab_dict = build_vocabulary(input_elements)
    ## 입력받은 문장에 대한 base 행렬을 생성
    input_co_ocur_vector = pd.DataFrame(
        np.zeros([len(vocab_dict), len(vocab_dict)]),
        index = vocab_dict.keys(),
        columns = vocab_dict.keys()
    )
    ## loc를 활용하여 단어쌍이 등장하는 횟수만큼 1씩 덧셈하며 단어쌍이 vocab 동시등장행렬에 없을 경우는 그냥 0 을 입력
    input_co_ocur_vector = co_occurence_matrix(input_elements, input_co_ocur_vector)

    ## 단어별 코사인 유사도를 계산 후 평균값을 계산, 단 같은 값끼리는 의미가 없으므로 != 을 통해 pass 하도록 한다
    typo_word = list()
    input_co_occur_list = input_co_ocur_vector.values.tolist()
    for idx, input_list1 in enumerate(input_co_occur_list):
        if len(input_co_occur_list) > 1:
            total_cosine = 0.0
            for input_list2 in input_co_occur_list:
                if input_list1 != input_list2:
                    total_cosine += cos_sim(input_list1, input_list2)

                # input_list1 에 대한 평균 코사인 유사도를 계산
            avg_cosine = total_cosine / (len(input_list1) - 1)
                # 코사인 유사도 임계치 설정
            if avg_cosine < 0.20:
                # typo_word 에 입력된 단어는 '오탈자'로 확정 인식
                typo_word.append(input_co_ocur_vector.index[idx])
        else:
            pass
    
    ## 모든 vocab 과 입력받은 문장의 단어끼리 하나하나 편집거리 계산
    ed_dict = refine_ed_word(typo_word)

    ## 이후 오탈자를 대체 단어로 바꿔주면서 동시등장행렬, 코사인 유사도, 편집거리 계산을 동일하게 진행
    results_list = list()
    for origin_word in list(ed_dict.keys()):
        og_words = list()
        replace_words = list()
        cosine_value = list()

        for ed_word in ed_dict[origin_word]:
            # 오탈자 컬럼명을 대체 단어 컬럼명 및 인덱스명으로 교체
            ed_vector = input_co_ocur_vector.rename(columns={origin_word: ed_word}, index={origin_word: ed_word}, inplace=False)
            # 데이터 값들 전부 0으로 리셋
            ed_vector = pd.DataFrame(
                np.zeros([len(ed_vector), len(ed_vector)]),
                index = ed_vector.index,
                columns = ed_vector.columns
            )

            ed_vector = co_occurence_matrix(ed_vector.columns, ed_vector)

            ed_co_occur_list = ed_vector.values.tolist()
            total_cosine = 0.0
            ed_idx = list(ed_vector.columns).index(ed_word)
            for input_list1 in ed_co_occur_list:
                if ed_co_occur_list[ed_idx] != input_list1:
                    total_cosine += cos_sim(ed_co_occur_list[ed_idx], input_list1)

            avg_cosine = total_cosine / (len(ed_co_occur_list[ed_idx]) - 1)
            # 기존 오탈자, 대체 단서, 평균 코사인 유사도 계산 값을 각각 리스트에 넣는다
            og_words.append(origin_word)
            replace_words.append(ed_word)
            cosine_value.append(avg_cosine)
        if cosine_value != []:
        # 가장 유사도가 높은 값이 오탈자를 대체할 단어로 지정된다
            cs_idx = cosine_value.index(max(cosine_value))
            rp_word = replace_words[cs_idx]
            # 최종적으로 오탈자와 확정된 대체 단어를 리스트에 같이 입력
            results_list.append([origin_word, rp_word])
            
    if results_list != []:
    ## 각 오탈자를 대체 단어로 교체하면서 결과 텍스트를 최종적으로 리스트에 입력한다.
        output_text = tp_text
        for results in results_list:
            output_text = output_text.replace(results[0], results[1])
        output_text_list.append(output_text)
        # 성능 확인을 위해 오탈자 수정 작업이 이뤄진 인덱스 위치값을 저장
        error_idx.append(tp_idx)
    else:
        output_text_list.append(tp_text)

print(len(output_text_list))
print(output_text_list[:5])