In [None]:
!pip install pandas
!pip install sentencepiece
!pip install hgtk
!pip install gluonnlp

!git clone https://github.com/SKTBrain/KoBERT.git
!pip install -r KoBERT/requirements.txt
!pip install KoBERT/.

## 1. Base Function

In [None]:
def file_num_padding(file_num):
    if file_num < 10:
        return '00000' + str(file_num)
    elif file_num < 100:
        return '0000' + str(file_num)
    elif file_num < 1000:
        return '000' + str(file_num)
    elif file_num < 10000:
        return '00' + str(file_num)
    elif file_num < 100000:
        return '0' + str(file_num)
    else:
        return str(file_num)

def folder_1_padding(folder_num):
    if folder_num < 10:
        return '0' + str(folder_num) + '/'
    else:
        return str(folder_num) + '/'

def folder_2_padding(folder_num):
    if folder_num < 10:
        return '000' + str(folder_num) + '/'
    elif folder_num < 100:
        return '00' + str(folder_num) + '/'
    elif folder_num < 1000:
        return '0' + str(folder_num) + '/'
    else:
        return str(folder_num) + '/'

In [None]:
def get_path(path, fname, folder_1_num, folder_2_num, file_num, format):
    folder_1_num = folder_1_padding(folder_1_num)
    folder_2_num = folder_2_padding(folder_2_num)
    file_num = file_num_padding(file_num)
    return path + fname + folder_1_num + fname + folder_2_num + fname + file_num + format

In [None]:
BASE_PATH = '/content/drive/My Drive/googledrive/'
fname = 'KsponSpeech_'
folder_1_num = 1
folder_2_num = 1
file_num = 1
format = '.txt'

TEMP = get_path(BASE_PATH, fname, folder_1_num, folder_2_num, file_num, format)
print(TEMP)

## 2. Data-Preprocess

In [None]:
def bracket_filter(sentence):
    new_sentence = ''
    flag = False

    for ch in sentence:
        if ch == '(' and flag == False:
            flag = True
            continue
        if ch == ')' and flag == True:
            flag = False
            continue
        if ch != ')' and flag == False:
            new_sentence += ch
            
    return new_sentence

In [None]:
with open(TEMP, 'r', encoding='ms949') as f:
   r = f.read()
   print(bracket_filter(r))

In [None]:
def special_filter(sentence):
    SENTENCE_MARK = ['.', '?', ',', '!']
    NOISE = ['o', 'n', 'u', 'b', 'l']
    EXCEPT = ['/', '+', '*', '-', '@', '$', '^', '&', '[', ']', '=', ':', ';']

    import re
    
    new_sentence = ''
    for idx, ch in enumerate(sentence):
        if ch not in SENTENCE_MARK:
            # o/, n/ 등을 처리
            if idx + 1 < len(sentence) and ch in NOISE and sentence[idx+1] == '/':
                continue
        if ch not in EXCEPT:
            new_sentence += ch
    pattern = re.compile(r'\s\s+')
    new_sentence = re.sub(pattern, ' ', new_sentence.strip())
    return new_sentence

In [None]:
with open(TEMP, 'r', encoding='ms949') as f:
   r = f.read()
   print(special_filter(r))

In [None]:
def sentence_filter(raw_sentence):
    return special_filter(bracket_filter(raw_sentence))

In [None]:
with open(TEMP, 'r', encoding='ms949') as f:
   r = f.read()
   print(sentence_filter(r))

## 3. Create Character labels

In [None]:
import pandas as pd
from tqdm import trange

BASE_PATH = '/content/drive/My Drive/googledrive/'
fname = 'KsponSpeech_'
format = '.txt'

total_f1 = 1
total_f2 = 4
total_fn = 4000

label_list = []
label_freq = []

now1 = 1
now2 = 1

print('started... \n\n')
for f1 in trange(1, total_f1+1):
    for f2 in trange(now1, now1+100):
        if f2 > total_f2:
            break
        for fn in trange(now2, now2+1000):
            if fn > total_fn:
                break
            with open(get_path(BASE_PATH, fname, f1, f2, fn, format), 'r', encoding='ms949') as f:
                sentence = f.readline()
            for ch in sentence:
                if ch not in label_list:
                    label_list.append(ch)
                    label_freq.append(1)
                else:
                    label_freq[label_list.index(ch)] += 1
        now2 += 1000
    now1 += 100
    

# sort together Using zip
label_freq, label_list = zip(*sorted(zip(label_freq, label_list), reverse=True))
label = {'id': [0, 1, 2], 'char': ['_', '<s>', '</s>'], 'freq': [0, 0, 0]}
for idx, (ch, freq) in enumerate(zip(label_list, label_freq)):
    label['id'].append(idx)
    label['char'].append(ch)
    label['freq'].append(freq)

# dictionary to csv
label_df = pd.DataFrame(label)
label_df.to_csv('aihub_labels.csv', encoding='ms949', index=False)
print(label_df)

## 4. Create target text

In [None]:
import pandas as pd

def load_label(file_path):
    char2id = {}
    id2char = {}
    ch_labels = pd.read_csv(file_path, encoding='ms949')
    id_list = ch_labels['id']
    char_list = ch_labels['char']
    freq_list = ch_labels['freq']

    for (id, char, freq) in zip(id_list, char_list, freq_list):
        char2id[char] = id
        id2char[id] = char
    return char2id, id2char

In [None]:
def sentence_to_target(sentence, char2id):
    target = ''
    for ch in sentence:
        target += (str(char2id[ch]) + ' ')
    return target[:-1]

In [None]:
def target_to_sentence(target, id2char):
    sentence = ''
    targets = target.split()

    for n in targets:
        sentence += id2char[int(n)]
    return sentence

In [None]:
file_path = '/content/aihub_labels.csv'
char2id, id2char = load_label(file_path)

test = '인공지능 사관학교 화이팅!'
a = sentence_to_target(test, char2id)
print(a)
b = target_to_sentence(a, id2char)
print(b)

In [None]:
def get_label_path(path, fname, folder_1_num, folder_2_num, file_num, format, new_fname):
    folder_1_num = folder_1_padding(folder_1_num)
    folder_2_num = folder_2_padding(folder_2_num)
    file_num = file_num_padding(file_num)
    return path + fname + folder_1_num + fname + folder_2_num + new_fname + file_num + format

In [None]:
import pandas as pd
from tqdm import trange

BASE_PATH = '/content/drive/My Drive/googledrive/'
fname = 'KsponSpeech_'
format = '.txt'
new_fname = 'KsponSpeech_label_'

total_f1 = 1
total_f2 = 4
total_fn = 4000
char2id, id2char = load_label('aihub_labels.csv')

now1 = 1
now2 = 1

print('started... \n\n')
for f1 in trange(1, total_f1+1):
    for f2 in trange(now1, now1+100):
        if f2 > total_f2:
            break
        for fn in trange(now2, now2+1000):
            if fn > total_fn:
                break
            with open(get_path(BASE_PATH, fname, f1, f2, fn, format), 'r', encoding='ms949') as f:
                sentence = f.readline()

            with open(get_label_path(BASE_PATH, fname, f1, f2, fn, format, new_fname), 'w', encoding='ms949') as f:
                target = sentence_to_target(sentence, char2id)
                f.write(target)
        now2 += 1000
    now1 += 100

## 5. Create data list

In [None]:
import pandas as pd

total_fn = 4000
train_num = int(total_fn * 0.98)
test_num = total_fn - train_num

train_data_list = {'audio': [], 'label': []}
test_data_list = {'audio': [], 'label': []}
aihub_labels = pd.read_csv('aihub_labels.csv', encoding='ms949')
rare_labels = aihub_labels['char'][996:]   # 슬라이싱 값 직접 설정 필요: rare_labels = aihub_labels['char'][index num (started 'freq == 1'):]

In [None]:
from tqdm import trange

fname = 'KsponSpeech_'
target_fname = 'KsponSpeech_label_'

audio_paths = []
target_paths = []

total_f1 = 1
total_f2 = 4

now1 = 1
now2 = 1

for f1 in trange(1, total_f1+1):
    for f2 in trange(now1, now1+100):
        if f2 > total_f2:
            break
        for fn in trange(now2, now2+1000):
            if fn > total_fn:
                break
            audio_paths.append(get_path('', fname, f1, f2, fn, '.pcm'))
            target_paths.append(get_label_path('', fname, f1, f2, fn, '.txt', target_fname))
        now2 += 1000
    now1 += 100

In [None]:
import random

data_paths = list(zip(audio_paths, target_paths))
random.shuffle(data_paths)
audio_paths, target_paths = zip(*data_paths)

In [None]:
from tqdm import trange

path = '/content/drive/My Drive/googledrive/'
train_full = False
train_dict = {'audio': [], 'label': []}
test_dict = {'audio': [], 'label': []}

print('started...')
for idx in trange(len(audio_paths)):
    audio = audio_paths[idx]
    target = target_paths[idx]
    if len(train_dict['audio']) == train_num:
        train_full = True
    if train_full:
        test_dict['audio'].append(audio)
        test_dict['label'].append(target)
    else:
        rare_in = False
        sentence = None
        with open((path+audio).split('.')[0]+'.txt', encoding='ms949') as f:
            sentence = f.readline()
            
        for rare in rare_labels:
            if rare in sentence:
                rare_in = True
                break
        if rare_in:
            test_dict['audio'].append(audio)
            test_dict['label'].append(target)
        else:
            train_dict['audio'].append(audio)
            train_dict['label'].append(target)
            
print('\n\n Ended!!!')

In [None]:
test_df = pd.DataFrame(test_dict)
train_df = pd.DataFrame(train_dict)

test_df.to_csv('test_list.csv', encoding='ms949', index=False)
train_df.to_csv('train_list.csv', encoding='ms949', index=False)

- total_f1, total_f2, total_fn: 사용 시 실제 값에 맞도록 수정 필요 (1, 4, 4000으로 임의 설정하여 테스트 진행 중)
- Create data list에서 rare_label 슬라이싱 값 실제 값에 맞도록 수동 설정 필요 (가능하면 변수로 만드는 것도 좋을 듯)