In [1]:
import json

In [2]:
data = json.load(open(f'/workspace/nlplab/kienvt/PhoNER_COVID19_implement/data'
                      f'/syllable/train_syllable.json', 'r'))

In [3]:
data

[{'words': ['Đồng',
   'thời',
   ',',
   'bệnh',
   'viện',
   'tiếp',
   'tục',
   'thực',
   'hiện',
   'các',
   'biện',
   'pháp',
   'phòng',
   'chống',
   'dịch',
   'bệnh',
   'COVID',
   '-',
   '19',
   'theo',
   'hướng',
   'dẫn',
   'của',
   'Bộ',
   'Y',
   'tế',
   '.'],
  'tags': ['O',
   'O',
   'O',
   'O',
   'O',
   'O',
   'O',
   'O',
   'O',
   'O',
   'O',
   'O',
   'O',
   'O',
   'O',
   'O',
   'O',
   'O',
   'O',
   'O',
   'O',
   'O',
   'O',
   'B-ORGANIZATION',
   'I-ORGANIZATION',
   'I-ORGANIZATION',
   'O']},
 {'words': ['"',
   'Số',
   'bệnh',
   'viện',
   'có',
   'thể',
   'tiếp',
   'nhận',
   'bệnh',
   'nhân',
   'bị',
   'sốt',
   'cao',
   'và',
   'khó',
   'thở',
   'đang',
   'giảm',
   'dần',
   '"',
   ',',
   'thông',
   'cáo',
   'có',
   'đoạn',
   ',',
   'cảnh',
   'báo',
   'những',
   'bệnh',
   'nhân',
   'này',
   'thay',
   'vào',
   'đó',
   'được',
   'chuyển',
   'tới',
   'các',
   'phòng',
   'khám',
   'khẩn',
   'cấ

In [4]:
from collections import Counter

In [5]:
cnt = Counter()
tag_cnt = Counter()
# cnt.update(data[0]['words'])

In [6]:
for item in data:
    words = [_.lower() for _ in item['words']]
    tags = [_ for _ in item['tags']]
    # print(words)
    cnt.update(words)
    tag_cnt.update(tags)

In [7]:
list(tag_cnt.keys())

['O',
 'B-ORGANIZATION',
 'I-ORGANIZATION',
 'B-SYMPTOM_AND_DISEASE',
 'I-SYMPTOM_AND_DISEASE',
 'B-LOCATION',
 'I-LOCATION',
 'B-DATE',
 'B-PATIENT_ID',
 'B-AGE',
 'B-NAME',
 'I-DATE',
 'B-JOB',
 'I-JOB',
 'B-TRANSPORTATION',
 'B-GENDER',
 'I-GENDER',
 'I-TRANSPORTATION',
 'I-NAME',
 'I-AGE',
 'I-PATIENT_ID']

In [8]:
import string
def clean_word(word: str, special_sep='"&\'()*+-;?'):
    if word == '': return word
    integers = [str(_) for _ in range(10)]
    punctuation = list(string.punctuation) 
    prev = []
    while len(word) > 0:
        if word[0] in punctuation:
            prev.append(word[0])
            word = word[1:]
        elif word[0] in integers:
            if prev != [] and prev[-1][-1] in integers:
                prev[-1] += word[0]
            else:
                prev.append(word[0])
            word = word[1:]
        else: break

    post = []
    while len(word) > 0:
        if word[-1] in punctuation:
            post.append(word[-1])
            word = word[:-1]
        elif word[-1] in integers:
            if post != [] and post[-1][-1] in integers:
                post[-1] += word[-1]
            else:
                post.append(word[-1])
            word = word[:-1]
        else: break
    word = list(word)
    words = []
    for i in word:
        if i not in special_sep and words != [] and words[-1][-1] not in special_sep:
                words[-1] += i
        else:
            words.append(i)
    return prev + words + post

In [9]:
from typing import Optional

class Vocab:
    def __init__(
            self,
            counter: Optional[Counter] = None,
            pad_token: str = '<pad>',
            unk_token: Optional[str] = None,
            start_with_special_tokens: Optional[bool] = True,
    ):
        word_start_idx = 2 if start_with_special_tokens else 0
        self.word2index = {k:v for k, v in zip(
                                        list(counter.keys()),
                                        range(word_start_idx, len(counter) + word_start_idx)
                                        )}
        self.pad_token = pad_token
        self.pad_token_id = 0 if start_with_special_tokens else len(self.word2index)
        self.word2index[pad_token] = self.pad_token_id
        if unk_token is not None:
            self.unk_token = unk_token
            self.unk_token_id = 1 if start_with_special_tokens else len(self.word2index)
            self.word2index[unk_token] = self.unk_token_id
        else:
            self.unk_token = ''
            self.unk_token_id = -1
        self.index2word = {v:k for k, v in self.word2index.items()}
        self.punc_tokens = [p for p in string.punctuation if p in self.word2index.keys()]

    def __len__(self):
        return len(self.word2index)
    
    @property
    def vocab_size(self):
        return len(self) + int(self.unk_token != '')
    
    @property
    def vocab(self):
        return self.word2index

    def stoi(self, w: str):
        if w not in self.word2index.keys():
            return self.unk_token_id
        return self.word2index[w]
    
    def itos(self, i: int):
        if i not in self.index2word.keys():
            return self.unk_token
        return self.index2word[i]
    

    def tokenize(
            self, 
            text: str or list(str), 
            max_length: Optional[int] = None, 
            do_lower: bool = True,
            do_clean: bool = True,
        ):
        if isinstance(text, str):
            while '  ' in text: text = text.replace('  ', ' ')
            words = text.strip().split()
        else:
            words = text
        words = [_ for _ in words if _ != '']
        if do_lower: words = [w.lower() for w in words]
        cleaned_words = []
        if do_clean:
            for w in words:
                cleaned_words += clean_word(w)
        else:
            cleaned_words = words
        token_ids = [self.stoi(w) for w in cleaned_words]
        if max_length is not None:
            while len(token_ids) < max_length: token_ids.append(self.pad_token_id)
        return token_ids[:max_length]      
    

    def detokenize(
            self,
            ids: list[int],
            add_special_tokens: bool = True,
            unk_token_alternative: Optional[str] = None,
    ):  
        if add_special_tokens == False:
            ids = [i for i in ids if i not in [self.pad_token_id, self.unk_token_id]]
        text = [self.itos(_) for _ in ids]
        if unk_token_alternative is not None:
            for t in range(len(text)):
                if text[t] == self.unk_token:
                    text[t] = unk_token_alternative
        return text
    
    def save(self, fp):
        data = {
            'word2index' : self.word2index,
            'pad_token' : self.pad_token,
            'pad_token_id' : self.pad_token_id,
            'unk_token' : self.unk_token,
            'unk_token_id' : self.unk_token_id
        }
        json.dump(data, open(fp, 'w'))

    def load(self, fp):
        data = json.load(open(fp, 'r'))
        self.word2index = data['word2index']
        self.pad_token = data['pad_token']
        self.pad_token_id = data['pad_token_id']
        self.unk_token = data['unk_token']
        self.unk_token_id = data['unk_token_id']
        return self

In [10]:
tokenizer = Vocab(cnt, pad_token='<pad>', unk_token='<unk>')
tag_vocab = Vocab(tag_cnt, pad_token='<pad>', unk_token=None)

In [11]:
tag_vocab.vocab

{'O': 2,
 'B-ORGANIZATION': 3,
 'I-ORGANIZATION': 4,
 'B-SYMPTOM_AND_DISEASE': 5,
 'I-SYMPTOM_AND_DISEASE': 6,
 'B-LOCATION': 7,
 'I-LOCATION': 8,
 'B-DATE': 9,
 'B-PATIENT_ID': 10,
 'B-AGE': 11,
 'B-NAME': 12,
 'I-DATE': 13,
 'B-JOB': 14,
 'I-JOB': 15,
 'B-TRANSPORTATION': 16,
 'B-GENDER': 17,
 'I-GENDER': 18,
 'I-TRANSPORTATION': 19,
 'I-NAME': 20,
 'I-AGE': 21,
 'I-PATIENT_ID': 22,
 '<pad>': 0}

In [12]:
tokenizer.tokenize('Tôi là học sinh trường đại học, Công Nghệ')

[915, 113, 465, 268, 122, 530, 465, 4, 159, 636]

In [13]:
tokenizer.detokenize([915, 113, 465, 268, 122, 530, 465, 4, 159, 636])

['tôi', 'là', 'học', 'sinh', 'trường', 'đại', 'học', ',', 'công', 'nghệ']

In [14]:
def encode(text: list[str], tags: list[str], tokenizer: Vocab, tag_vocab: Vocab, max_length=20):
    input_ids = [tokenizer.stoi(w) for w in text][:max_length]
    target_tags = [tag_vocab.stoi(w) for w in tags][:max_length]
    while len(input_ids) < max_length: input_ids.append(tokenizer.pad_token_id)
    while len(target_tags) < max_length: target_tags.append(tag_vocab.pad_token_id)
    return (input_ids, target_tags)

In [15]:
print(data[0])

{'words': ['Đồng', 'thời', ',', 'bệnh', 'viện', 'tiếp', 'tục', 'thực', 'hiện', 'các', 'biện', 'pháp', 'phòng', 'chống', 'dịch', 'bệnh', 'COVID', '-', '19', 'theo', 'hướng', 'dẫn', 'của', 'Bộ', 'Y', 'tế', '.'], 'tags': ['O', 'O', 'O', 'O', 'O', 'O', 'O', 'O', 'O', 'O', 'O', 'O', 'O', 'O', 'O', 'O', 'O', 'O', 'O', 'O', 'O', 'O', 'O', 'B-ORGANIZATION', 'I-ORGANIZATION', 'I-ORGANIZATION', 'O']}


In [16]:
print(encode(data[0]['words'], data[0]['tags'], tokenizer, tag_vocab, 50))

([1, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 5, 1, 18, 19, 20, 21, 22, 23, 1, 1, 26, 27, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0], [2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 3, 4, 4, 2, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0])


In [17]:
tokenizer_ = Vocab(Counter())

In [18]:
tokenizer_.load('/workspace/nlplab/kienvt/transformers_implement_from_scratch/tokenizer.json')

<__main__.Vocab at 0x7f1f7652f250>

In [19]:
tokenizer_.unk_token_id

1

In [21]:
tagger = Vocab(Counter()).load('/workspace/nlplab/kienvt/transformers_implement_from_scratch/tagger.json')
tagger.vocab

{'O': 0,
 'B-ORGANIZATION': 1,
 'I-ORGANIZATION': 2,
 'B-SYMPTOM_AND_DISEASE': 3,
 'I-SYMPTOM_AND_DISEASE': 4,
 'B-LOCATION': 5,
 'I-LOCATION': 6,
 'B-DATE': 7,
 'B-PATIENT_ID': 8,
 'B-AGE': 9,
 'B-NAME': 10,
 'I-DATE': 11,
 'B-JOB': 12,
 'I-JOB': 13,
 'B-TRANSPORTATION': 14,
 'B-GENDER': 15,
 'I-GENDER': 16,
 'I-TRANSPORTATION': 17,
 'I-NAME': 18,
 'I-AGE': 19,
 'I-PATIENT_ID': 20,
 '<pad>': 21}