In [None]:
!git clone https://github.com/kmounlp/NER.git
!pip install transformers
!pip install sentencepiece
!pip install 'git+https://github.com/SKTBrain/KoBERT.git#egg=kobert_tokenizer&subdirectory=kobert_hf'

Cloning into 'NER'...
remote: Enumerating objects: 1770, done.[K
remote: Counting objects: 100% (43/43), done.[K
remote: Compressing objects: 100% (42/42), done.[K
remote: Total 1770 (delta 0), reused 42 (delta 0), pack-reused 1727[K
Receiving objects: 100% (1770/1770), 20.69 MiB | 13.38 MiB/s, done.
Resolving deltas: 100% (146/146), done.
Updating files: 100% (1755/1755), done.
Collecting kobert_tokenizer
  Cloning https://github.com/SKTBrain/KoBERT.git to /tmp/pip-install-vlomiag4/kobert-tokenizer_4c1fb91b3aee49499019bba17b585201
  Running command git clone --filter=blob:none --quiet https://github.com/SKTBrain/KoBERT.git /tmp/pip-install-vlomiag4/kobert-tokenizer_4c1fb91b3aee49499019bba17b585201
  Resolved https://github.com/SKTBrain/KoBERT.git to commit 47a69af87928fc24e20f571fe10c3cc9dd9af9a3
  Preparing metadata (setup.py) ... [?25l[?25hdone
Building wheels for collected packages: kobert_tokenizer
  Building wheel for kobert_tokenizer (setup.py) ... [?25l[?25hdone
  Creat

In [None]:
!pip install seqeval

Collecting seqeval
  Downloading seqeval-1.2.2.tar.gz (43 kB)
[?25l     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m0.0/43.6 kB[0m [31m?[0m eta [36m-:--:--[0m[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m43.6/43.6 kB[0m [31m1.8 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Building wheels for collected packages: seqeval
  Building wheel for seqeval (setup.py) ... [?25l[?25hdone
  Created wheel for seqeval: filename=seqeval-1.2.2-py3-none-any.whl size=16161 sha256=703f5aec0518632abee36aa7dc015e6bfe09d5f07d0df1efb24f2e68e81316eb
  Stored in directory: /root/.cache/pip/wheels/1a/67/4a/ad4082dd7dfc30f2abfe4d80a2ed5926a506eb8a972b4767fa
Successfully built seqeval
Installing collected packages: seqeval
Successfully installed seqeval-1.2.2


In [63]:
import os
import glob
from pathlib import Path
import re
import random
import pandas as pd
from sklearn.model_selection import train_test_split
import torch
from transformers import BertModel, AdamW, get_cosine_schedule_with_warmup
from kobert_tokenizer import KoBERTTokenizer
import torch.nn as nn
from seqeval.metrics import accuracy_score, classification_report

# 데이터 파일을 읽어와서 ORG, MNY, PET 태그만 살리고 나머지는 O로 처리하는 함수
def read_file(file_list):
    token_docs = []
    tag_docs = []

    for file_path in file_list:
        file_path = Path(file_path)
        raw_text = file_path.read_text().strip()
        raw_docs = re.split(r'\n\t?\n', raw_text)

        for doc in raw_docs:
            tokens = []
            tags = []
            for line in doc.split('\n'):
                if line.startswith(("$", ";", "##")):
                    continue
                try:
                    token = line.split('\t')[0]
                    tag = line.split('\t')[3]

                    # ORG, MNY, PET 태그 처리
                    if tag in ['B-ORG', 'I-ORG', 'B-MNY', 'I-MNY', 'B-PER', 'I-PER']:
                        pass
                    else:
                        tag = 'O'

                    for i, syllable in enumerate(token):
                        tokens.append(syllable)
                        if i == 0:
                            tags.append(tag)
                        else:
                            modi_tag = 'I' + tag[1:] if tag.startswith('B') else tag
                            tags.append(modi_tag)
                except:
                    continue
            if len(tokens) == len(tags):
                token_docs.append(tokens)
                tag_docs.append(tags)

    return token_docs, tag_docs

# CSV 파일에서 회사명 데이터를 읽어와서 ORG 태그로 변환하는 함수
def read_company_names(file_path):
    df = pd.read_csv(file_path)
    company_names = df['회사명'].tolist()

    token_docs = []
    tag_docs = []

    for name in company_names:
        tokens = list(name.strip())
        tags = ['B-ORG'] + ['I-ORG'] * (len(tokens) - 1)
        token_docs.append(tokens)
        tag_docs.append(tags)

    return token_docs, tag_docs

# 실제 파일 경로로 설정
file_list = []

for x in os.walk('/content/NER/말뭉치 - 형태소_개체명'):
    for y in glob.glob(os.path.join(x[0], '*_NER.txt')):
        file_list.append(y)

file_list = sorted(file_list)

# 데이터 전처리
texts, tags = read_file(file_list)

# CSV 파일에서 회사명 데이터 추가
company_texts, company_tags = read_company_names('/content/drive/MyDrive/프로젝트/증권 뉴스 분류 및 개체명 인식/상장법인목록.csv')
texts.extend(company_texts)
tags.extend(company_tags)

# 데이터를 train과 test로 분할
train_texts, test_texts, train_tags, test_tags = train_test_split(texts, tags, test_size=0.2, random_state=42)

print(f"Train samples: {len(train_texts)}, Test samples: {len(test_texts)}")

Train samples: 17464, Test samples: 4367


In [64]:
# 음절 단위 토크나이저 함수 정의
def ner_tokenizer(sent, max_seq_length):
    pre_syllable = "-"
    input_ids = [0] * (max_seq_length - 1)
    attention_mask = [0] * (max_seq_length - 1)
    token_type_ids = [0] * max_seq_length
    sent = sent[:max_seq_length - 2]

    for i, syllable in enumerate(sent):
        if syllable == '-':
            pre_syllable = syllable
        if pre_syllable != "-":
            syllable = '##' + syllable
        pre_syllable = syllable

        input_ids[i] = tokenizer.convert_tokens_to_ids(syllable)
        attention_mask[i] = 1

    input_ids = [2] + input_ids
    input_ids[len(sent) + 1] = 3
    attention_mask = [1] + attention_mask
    attention_mask[len(sent) + 1] = 1

    return {
        'input_ids': input_ids,
        'attention_mask': attention_mask,
        'token_type_ids': token_type_ids
    }

In [65]:
# 태그 인코딩 함수 정의
def encode_tags(tags, tag_to_id, max_seq_length):
    tags = tags[:max_seq_length - 2]
    labels = [tag_to_id[tag] for tag in tags]
    labels = [tag_to_id['O']] + labels + [tag_to_id['O']]

    padding_length = max_seq_length - len(labels)
    labels += [tag_to_id['O']] * padding_length

    return labels

In [66]:
# 데이터셋 클래스 정의
class NERDataset(torch.utils.data.Dataset):
    def __init__(self, texts, tags, tokenizer, tag_to_id, max_len):
        self.texts = texts
        self.tags = tags
        self.tokenizer = tokenizer
        self.tag_to_id = tag_to_id
        self.max_len = max_len

    def __len__(self):
        return len(self.texts)

    def __getitem__(self, idx):
        text = self.texts[idx]
        tags = self.tags[idx]

        # 음절 단위로 토큰화
        tokenized = ner_tokenizer(text, self.max_len)
        tokens = tokenized['input_ids']
        attention_mask = tokenized['attention_mask']
        token_type_ids = tokenized['token_type_ids']

        labels = encode_tags(tags, self.tag_to_id, self.max_len)

        return {
            'input_ids': torch.tensor(tokens, dtype=torch.long),
            'attention_mask': torch.tensor(attention_mask, dtype=torch.long),
            'token_type_ids': torch.tensor(token_type_ids, dtype=torch.long),
            'labels': torch.tensor(labels, dtype=torch.long)
        }

In [70]:
# KoBERT 토크나이저 로드
tokenizer = KoBERTTokenizer.from_pretrained('skt/kobert-base-v1')

tag_to_id = {'B-ORG': 1, 'I-ORG': 2, 'B-MNY': 3, 'I-MNY': 4, 'B-PER': 5, 'I-PER': 6, 'O': 0}
max_len = 128

train_dataset = NERDataset(train_texts, train_tags, tokenizer, tag_to_id, max_len)
test_dataset = NERDataset(test_texts, test_tags, tokenizer, tag_to_id, max_len)

The tokenizer class you load from this checkpoint is not the same type as the class this function is called from. It may result in unexpected tokenization. 
The tokenizer class you load from this checkpoint is 'XLNetTokenizer'. 
The class this function is called from is 'KoBERTTokenizer'.


In [71]:
def collate_fn(batch):
    input_ids = torch.stack([item['input_ids'] for item in batch])
    attention_mask = torch.stack([item['attention_mask'] for item in batch])
    token_type_ids = torch.stack([item['token_type_ids'] for item in batch])
    labels = torch.stack([item['labels'] for item in batch])

    return {
        'input_ids': input_ids,
        'attention_mask': attention_mask,
        'token_type_ids': token_type_ids,
        'labels': labels
    }

train_dataloader = torch.utils.data.DataLoader(train_dataset, batch_size=16, shuffle=True, collate_fn=collate_fn)
test_dataloader = torch.utils.data.DataLoader(test_dataset, batch_size=16, shuffle=False, collate_fn=collate_fn)

In [72]:
# 디버깅을 위해 첫 배치를 확인하는 코드 추
first_batch = next(iter(train_dataloader))
print(first_batch)

{'input_ids': tensor([[   2, 5561,    0,  ...,    0,    0,    0],
        [   2, 5907,    0,  ...,    0,    0,    0],
        [   2, 7005,    0,  ...,    0,    0,    0],
        ...,
        [   2, 7005,    0,  ...,    0,    0,    0],
        [   2,  285,    0,  ...,    0,    0,    0],
        [   2, 6521,    0,  ...,    0,    0,    0]]), 'attention_mask': tensor([[1, 1, 1,  ..., 0, 0, 0],
        [1, 1, 1,  ..., 0, 0, 0],
        [1, 1, 1,  ..., 0, 0, 0],
        ...,
        [1, 1, 1,  ..., 0, 0, 0],
        [1, 1, 1,  ..., 0, 0, 0],
        [1, 1, 1,  ..., 0, 0, 0]]), 'token_type_ids': tensor([[0, 0, 0,  ..., 0, 0, 0],
        [0, 0, 0,  ..., 0, 0, 0],
        [0, 0, 0,  ..., 0, 0, 0],
        ...,
        [0, 0, 0,  ..., 0, 0, 0],
        [0, 0, 0,  ..., 0, 0, 0],
        [0, 0, 0,  ..., 0, 0, 0]]), 'labels': tensor([[0, 0, 0,  ..., 0, 0, 0],
        [0, 0, 0,  ..., 0, 0, 0],
        [0, 1, 2,  ..., 0, 0, 0],
        ...,
        [0, 1, 2,  ..., 0, 0, 0],
        [0, 1, 2,  ..., 0,

In [73]:
# KoBERT 모델 정의
class KoBERTNER(nn.Module):
    def __init__(self, num_labels):
        super(KoBERTNER, self).__init__()
        self.bert = BertModel.from_pretrained('skt/kobert-base-v1')
        self.dropout = nn.Dropout(0.3)
        self.classifier = nn.Linear(768, num_labels)

    def forward(self, input_ids, attention_mask, token_type_ids, labels=None):
        outputs = self.bert(input_ids=input_ids, attention_mask=attention_mask, token_type_ids=token_type_ids)
        sequence_output = outputs[0]
        sequence_output = self.dropout(sequence_output)
        logits = self.classifier(sequence_output)

        if labels is not None:
            loss_fct = nn.CrossEntropyLoss()
            loss = loss_fct(logits.view(-1, self.classifier.out_features), labels.view(-1))
            return loss, logits
        return logits

num_labels = len(tag_to_id)
model = KoBERTNER(num_labels)

# GPU 설정
device = 'cuda' if torch.cuda.is_available() else 'cpu'
model.to(device)

# 옵티마이저 및 스케줄러 설정
optimizer = AdamW(model.parameters(), lr=1e-5)
total_steps = len(train_dataloader) * 10
scheduler = get_cosine_schedule_with_warmup(optimizer, num_warmup_steps=0, num_training_steps=total_steps)

loss_function = nn.CrossEntropyLoss()



In [74]:
def train(epoch, model, dataloader, optimizer, scheduler, device, loss_function):
    model.train()
    for _, data in enumerate(dataloader, 0):
        ids = data['input_ids'].to(device, dtype=torch.long)
        mask = data['attention_mask'].to(device, dtype=torch.long)
        token_type_ids = data['token_type_ids'].to(device, dtype=torch.long)
        targets = data['labels'].to(device, dtype=torch.long)

        outputs = model(ids, mask, token_type_ids)
        loss = loss_function(outputs.view(-1, outputs.size(-1)), targets.view(-1))

        if _ % 500 == 0:
            print(f'Epoch: {epoch}, Loss:  {loss.item()}')

        optimizer.zero_grad()
        loss.backward()
        optimizer.step()
        scheduler.step()

for epoch in range(5):  # 원하는 epoch 수로 변경 가능
    train(epoch, model, train_dataloader, optimizer, scheduler, device, loss_function)

Epoch: 0, Loss:  1.969158411026001
Epoch: 0, Loss:  0.16629494726657867
Epoch: 0, Loss:  0.30614638328552246
Epoch: 1, Loss:  0.12371194362640381
Epoch: 1, Loss:  0.21896202862262726
Epoch: 1, Loss:  0.13116100430488586
Epoch: 2, Loss:  0.1541145145893097
Epoch: 2, Loss:  0.18381766974925995
Epoch: 2, Loss:  0.11312593519687653
Epoch: 3, Loss:  0.17277130484580994
Epoch: 3, Loss:  0.21344032883644104
Epoch: 3, Loss:  0.11048656702041626
Epoch: 4, Loss:  0.10978903621435165
Epoch: 4, Loss:  0.1971770077943802
Epoch: 4, Loss:  0.1985599547624588


In [75]:
def validate(model, dataloader, device):
    model.eval()
    fin_targets = []
    fin_outputs = []
    with torch.no_grad():
        for _, data in enumerate(dataloader, 0):
            ids = data['input_ids'].to(device, dtype=torch.long)
            mask = data['attention_mask'].to(device, dtype=torch.long)
            token_type_ids = data['token_type_ids'].to(device, dtype=torch.long)
            targets = data['labels'].to(device, dtype=torch.long)

            outputs = model(ids, mask, token_type_ids)
            fin_outputs.extend(torch.argmax(outputs, dim=2).cpu().numpy().flatten())
            fin_targets.extend(targets.cpu().numpy().flatten())
    return fin_outputs, fin_targets

outputs, targets = validate(model, test_dataloader, device)
accuracy = accuracy_score(targets, outputs)
print(f'Validation Accuracy: {accuracy}')

Validation Accuracy: 0.9644152879551179


In [84]:
def predict_sentence(model, tokenizer, sentence, tag_to_id, id_to_tag, max_len, device):
    model.eval()

    # 음절 단위로 토큰화
    tokenized = ner_tokenizer(sentence, max_len)
    tokens = tokenized['input_ids']
    attention_mask = tokenized['attention_mask']
    token_type_ids = tokenized['token_type_ids']

    # 데이터를 텐서로 변환
    input_ids = torch.tensor(tokens, dtype=torch.long).unsqueeze(0).to(device)
    attention_mask = torch.tensor(attention_mask, dtype=torch.long).unsqueeze(0).to(device)
    token_type_ids = torch.tensor(token_type_ids, dtype=torch.long).unsqueeze(0).to(device)

    # 모델 예측
    with torch.no_grad():
        outputs = model(input_ids, attention_mask, token_type_ids)
        predictions = torch.argmax(outputs, dim=2).cpu().numpy().flatten()

    # 예측 결과 디코딩
    decoded_predictions = [id_to_tag[pred] for pred in predictions]

    return decoded_predictions

def extract_entities(sentence, tags):
    entities = []
    entity = ""
    current_tag = None

    for char, tag in zip(sentence, tags):
        if tag.startswith("B-"):
            if entity:
                entities.append(entity)
            entity = char
            current_tag = tag[2:]
        elif tag.startswith("I-") and current_tag == tag[2:]:
            entity += char
        else:
            if entity:
                entities.append(entity)
                entity = ""
            current_tag = None

    if entity:
        entities.append(entity)

    return entities

# 테스트할 문장
input_sentence = "이재용"

# 개체명 인식 수행
id_to_tag = {v: k for k, v in tag_to_id.items()}  # 태그 인덱스를 태그 이름으로 매핑
predicted_tags = predict_sentence(model, tokenizer, input_sentence, tag_to_id, id_to_tag, max_len, device)

# 예측 결과에서 개체명 추출
entities = extract_entities(input_sentence, predicted_tags)

# 예측 결과 출력
print(f"Input Sentence: {input_sentence}")
print(f"Predicted Entities: {entities}")

Input Sentence: 이재용
Predicted Entities: ['재용']


In [85]:
# 모델과 토크나이저 저장
MODEL_SAVE_PATH = "kobert_ner_model.pth"
TOKENIZER_SAVE_FOLDER = "kobert_tokenizer_ner"

if not os.path.exists(TOKENIZER_SAVE_FOLDER):
    os.makedirs(TOKENIZER_SAVE_FOLDER)

torch.save(model.state_dict(), MODEL_SAVE_PATH)
tokenizer.save_pretrained(TOKENIZER_SAVE_FOLDER)

('kobert_tokenizer_ner/tokenizer_config.json',
 'kobert_tokenizer_ner/special_tokens_map.json',
 'kobert_tokenizer_ner/spiece.model',
 'kobert_tokenizer_ner/added_tokens.json')