In [2]:
# !pip install git+https://git@github.com/SKTBrain/KoBERT.git@master

In [2]:
import pandas as pd

import torch
from torch import nn
import torch.nn.functional as F
import torch.optim as optim
from torch.utils.data import Dataset, DataLoader
import gluonnlp as nlp
import numpy as np
from tqdm.notebook import tqdm

from kobert import get_tokenizer
from kobert import get_pytorch_kobert_model

from transformers import AdamW
from transformers.optimization import get_cosine_schedule_with_warmup

In [3]:
# 학습에 사용할 머신 선택: cpu or gpu(cuda)
# device = torch.device("cpu")
device = torch.device("cuda:1")

In [4]:
# KoBERT 모델 및 사전 로딩
bertmodel, vocab = get_pytorch_kobert_model(cachedir=".cache")

using cached model. /home/sks/dsba/lge/lge_project_bert/.cache/kobert_v1.zip
using cached model. /home/sks/dsba/lge/lge_project_bert/.cache/kobert_news_wiki_ko_cased-1087f8699e.spiece


## 데이터셋 로드

In [5]:
dataset = pd.read_excel('dataset/textdata.xlsx', index_col=0, engine='openpyxl').reset_index(drop=True)
dataset.columns = ['manager', '_company', 'company', 'date', '_building', 'building', 'building_check', 'prepro1', '_content', '_content_sub', 'content']

In [6]:
dataset.head()

Unnamed: 0,manager,_company,company,date,_building,building,building_check,prepro1,_content,_content_sub,content
0,김인후,영선반,영선반,2017-08-29 00:00:00,U,U,,,"1층 화장실 천정 시수,공수,FCU 배관 보온 미비 부분의 결로수 발생 및 낙하로 ...",,".1층 화장실 천정 시수,공수,FCU 배관 보온 미비 부분의 결로수 발생 및 낙하로..."
1,김인후,영선반,영선반,2017-08-29 00:00:00,R1,R1,,,공조기 #6호기 RF 모터 BRG 교체,,.공조기 #6호기 RF 모터 BRG 교체
2,김인후,영선반,영선반,2017-08-29 00:00:00,R1,R1,,,공조기 #15호기 RF 모터 BRG 교체,,.공조기 #15호기 RF 모터 BRG 교체
3,김인후,원동실,원동실,2017-08-29 00:00:00,공장,공장,,,매주 화요일 가스 정압실 점검 및 기록지 교체,,.매주 화요일 가스 정압실 점검 및 기록지 교체
4,김인후,원동실,원동실,2017-08-29 00:00:00,"U,M,Y",각동,,,화재 안전 점검 실시,,.화재 안전 점검 실시


In [5]:
needed_columns = ['manager', 'company', 'date', 'building', 'content']
dataset = dataset[needed_columns]

In [6]:
dataset.head()

Unnamed: 0,manager,company,date,building,content
0,김인후,영선반,2017-08-29 00:00:00,U,".1층 화장실 천정 시수,공수,FCU 배관 보온 미비 부분의 결로수 발생 및 낙하로..."
1,김인후,영선반,2017-08-29 00:00:00,R1,.공조기 #6호기 RF 모터 BRG 교체
2,김인후,영선반,2017-08-29 00:00:00,R1,.공조기 #15호기 RF 모터 BRG 교체
3,김인후,원동실,2017-08-29 00:00:00,공장,.매주 화요일 가스 정압실 점검 및 기록지 교체
4,김인후,원동실,2017-08-29 00:00:00,각동,.화재 안전 점검 실시


In [7]:
np.unique(list(map(str, dataset.building.values)))

array(['F1', 'F2', 'F3', 'G1', 'G2', 'G3', 'M', 'P2', 'P3', 'P4', 'P5',
       'R', 'R1', 'R2', 'U', 'Y', '각동', '공동구', '공장', '관제', '규격동', '금형동',
       '기숙사', '미분류', '복지관', '제시연', '주차장'], dtype='<U3')

In [9]:
# label 추가(가상)
# 각동, 기숙사는 0(중요X)으로, 나머지는 1(중요))
_not_important = ['공장', '미분류', '주차장', '기숙사', '각동']
dataset['label'] = np.where(np.logical_not(dataset['building'].isin(_not_important)), 1, 0)
dataset.head()

Unnamed: 0,manager,_company,company,date,_building,building,building_check,prepro1,_content,_content_sub,content,label
0,김인후,영선반,영선반,2017-08-29 00:00:00,U,U,,,"1층 화장실 천정 시수,공수,FCU 배관 보온 미비 부분의 결로수 발생 및 낙하로 ...",,".1층 화장실 천정 시수,공수,FCU 배관 보온 미비 부분의 결로수 발생 및 낙하로...",1
1,김인후,영선반,영선반,2017-08-29 00:00:00,R1,R1,,,공조기 #6호기 RF 모터 BRG 교체,,.공조기 #6호기 RF 모터 BRG 교체,1
2,김인후,영선반,영선반,2017-08-29 00:00:00,R1,R1,,,공조기 #15호기 RF 모터 BRG 교체,,.공조기 #15호기 RF 모터 BRG 교체,1
3,김인후,원동실,원동실,2017-08-29 00:00:00,공장,공장,,,매주 화요일 가스 정압실 점검 및 기록지 교체,,.매주 화요일 가스 정압실 점검 및 기록지 교체,0
4,김인후,원동실,원동실,2017-08-29 00:00:00,"U,M,Y",각동,,,화재 안전 점검 실시,,.화재 안전 점검 실시,0


In [10]:
# train/val/test split
from sklearn.model_selection import train_test_split
train_x, test_x, train_y, test_y = train_test_split(dataset.content.values, dataset.label.values, test_size=0.2, random_state=42)
#train_x, val_x, train_y, val_y = train_test_split(train_x, train_y, test_size=0.2, random_state=42)

In [11]:
train_x[:5]

array(['.보일러, 온수 대류/순환 펌프 교번 운전 실시',
       '.2. R1동 진동낙하시험실 공사관련 환기 실시(08:00 ~ 17:00)',
       '.5. R1동 지하1층 카페 에어컨 신설공사 지원 - 하이엠', '.2. 각동 청정도 측정',
       '3. CEO 현장 방문관련 점검 및 공조,냉방 실시.- R1동 2층 Demo room/실생활 Test room'],
      dtype=object)

In [12]:
dataset_train = list(zip(train_x, train_y))
dataset_test = list(zip(test_x, test_y))

In [13]:
# BERT에 입력하기 위한 데이터셋 로더 정의
class BERTDataset(Dataset):
    def __init__(self, dataset, sent_idx, label_idx, bert_tokenizer, max_len, pad, pair):
        transform = nlp.data.BERTSentenceTransform(
            bert_tokenizer, # 버트 토크나이저 사용
            max_seq_length=max_len, # 배치로 입력할 텍스트 데이터의 최대 길이 설정: 만약 64로 설정 시, 그보다 짧은 문장의 경우 나머지를 padding 수행하여 길이를 맞춰줌
            pad=pad,
            pair=pair
        )
        
        self.sentences = [transform([i[sent_idx]]) for i in dataset]
        self.labels = [np.int32(i[label_idx]) for i in dataset]

    def __getitem__(self, i):
        return (self.sentences[i] + (self.labels[i], ))

    def __len__(self):
        return (len(self.labels))

In [None]:
# BERT 토크나이저 로딩
tokenizer = get_tokenizer()
tok = nlp.data.BERTSPTokenizer(tokenizer, vocab, lower=False)

In [14]:
# 하이퍼파라미터 세팅
max_len = 64
batch_size = 64
warmup_ratio = 0.1
num_epochs = 10
max_grad_norm = 1
log_interval = 20
learning_rate = 5e-5

# 데이터셋(Loader) 정의 (학습/테스트셋)
data_train = BERTDataset(
    dataset=dataset_train,
    sent_idx=0,
    label_idx=1,
    bert_tokenizer=tok,
    max_len=max_len,
    pad=True,
    pair=False
)

data_test = BERTDataset(
    dataset=dataset_test,
    sent_idx=0,
    label_idx=1,
    bert_tokenizer=tok,
    max_len=max_len,
    pad=True,
    pair=False
)

NameError: name 'tok' is not defined

In [16]:
train_dataloader = torch.utils.data.DataLoader(
    data_train,
    batch_size=batch_size,
    num_workers=8
)

test_dataloader = torch.utils.data.DataLoader(
    data_test,
    batch_size=batch_size,
    num_workers=8
)

In [17]:
class BERTClassifier(nn.Module):
    '''
    Arguments
        bert: 버트 모델 (KoBERT)
        hidden_size: 버트가 사용하는 임베딩 벡터들의 차원
        num_classes: 결과물의 클래스 수(긍/부정 분류의 경우 2가지이므로 2)
        dr_rate: Drop-out Rate
        params:
    '''
    def __init__(
        self,
        bert,
        hidden_size=768,
        num_classes=2,
        dr_rate=None
    ):
        
        super(BERTClassifier, self).__init__()
        self.bert = bert
        self.dr_rate = dr_rate
                 
        self.classifier = nn.Linear(hidden_size , num_classes) # 분류기
        if dr_rate:
            self.dropout = nn.Dropout(p=dr_rate)
    
    def gen_attention_mask(self, token_ids, valid_length):
        attention_mask = torch.zeros_like(token_ids)
        for i, v in enumerate(valid_length):
            attention_mask[i][:v] = 1
        return attention_mask.float()

    def forward(self, token_ids, valid_length, segment_ids):
        attention_mask = self.gen_attention_mask(token_ids, valid_length)
        
        _, pooler = self.bert(input_ids = token_ids, token_type_ids = segment_ids.long(), attention_mask = attention_mask.float().to(token_ids.device))
        
        if self.dr_rate:
            pooler = self.dropout(pooler)
            
        out = self.classifier(pooler)
            
        return out

In [18]:
# model 정의 및 설정된 device로 옮김
model = BERTClassifier(
    bertmodel,
    dr_rate=0.5
).to(device)

In [19]:
# 옵티마이저 설정: bias 및 Layer Normalization Layer의 경우 weight_decay 사용하지 않음
no_decay = ['bias', 'LayerNorm.weight']
optimizer_grouped_parameters = [
    {'params': [p for n, p in model.named_parameters() if not any(nd in n for nd in no_decay)], 'weight_decay': 0.01},
    {'params': [p for n, p in model.named_parameters() if any(nd in n for nd in no_decay)], 'weight_decay': 0.0}
]

optimizer = AdamW(optimizer_grouped_parameters, lr=learning_rate)

# Loss 함수 설정: 분류기 학습을 위한 크로스-엔트로피 Loss 설정
loss_fn = nn.CrossEntropyLoss()



In [20]:
# 설정된 warmup_ratio에 따른 Warmup 횟수 설정
# Warmup이 끝난 후에는 원래대로의 Learning Rate 스케쥴러를 따라 학습률이 설정됨
t_total = len(train_dataloader) * num_epochs
warmup_step = int(t_total * warmup_ratio)

In [21]:
# Learning Rate 스케쥴러
scheduler = get_cosine_schedule_with_warmup(
    optimizer,
    num_warmup_steps=warmup_step,
    num_training_steps=t_total
)

In [22]:
# 정확도 계산 함수
def calc_accuracy(X,Y):
    max_vals, max_indices = torch.max(X, 1)
    train_acc = (max_indices == Y).sum().data.cpu().numpy()/max_indices.size()[0]
    return train_acc

In [24]:
# 학습 수행
for e in range(num_epochs):
    train_acc = 0.0
    test_acc = 0.0
    model.train()
    for batch_id, (token_ids, valid_length, segment_ids, label) in enumerate(train_dataloader):
        
        optimizer.zero_grad()
        token_ids = token_ids.long().to(device)
        segment_ids = segment_ids.long().to(device)
        valid_length= valid_length
        label = label.long().to(device)
        
        out = model(token_ids, valid_length, segment_ids) # 분류 결과
        loss = loss_fn(out, label) # Loss 계산
        
        loss.backward()
        
        torch.nn.utils.clip_grad_norm_(model.parameters(), max_grad_norm)
        
        optimizer.step()
        scheduler.step() # Update learning rate schedule
        
        train_acc += calc_accuracy(out, label)
        
        if batch_id % log_interval == 0:
            print(f"Epoch [{e+1}] Batch [{batch_id}/{len(train_dataloader)}] Loss {loss.data.cpu().numpy():.4f} Train Acc. {train_acc / (batch_id + 1):.4f}")

    print(f"Epoch [{e+1}] Train Acc. {train_acc / (batch_id + 1):.4f}")
    
    # Evaluation 수행
    model.eval()
    for batch_id, (token_ids, valid_length, segment_ids, label) in enumerate(test_dataloader):
        token_ids = token_ids.long().to(device)
        segment_ids = segment_ids.long().to(device)
        valid_length= valid_length
        label = label.long().to(device)
        out = model(token_ids, valid_length, segment_ids)
        test_acc += calc_accuracy(out, label)
        
    print(f"Epoch [{e+1}] Test Acc. {test_acc / (batch_id+1):.4f}\n")

Epoch [1] Batch [0/106] Loss 0.6891 Train Acc. 0.5469
Epoch [1] Batch [20/106] Loss 0.6447 Train Acc. 0.6302
Epoch [1] Batch [40/106] Loss 0.5157 Train Acc. 0.6810
Epoch [1] Batch [60/106] Loss 0.4791 Train Acc. 0.7008
Epoch [1] Batch [80/106] Loss 0.4653 Train Acc. 0.7220
Epoch [1] Batch [100/106] Loss 0.3841 Train Acc. 0.7399
Epoch [1] Train Acc. 0.7461
Epoch [1] Test Acc. 0.8662

Epoch [2] Batch [0/106] Loss 0.3432 Train Acc. 0.8594
Epoch [2] Batch [20/106] Loss 0.2451 Train Acc. 0.8497
Epoch [2] Batch [40/106] Loss 0.3523 Train Acc. 0.8655
Epoch [2] Batch [60/106] Loss 0.4400 Train Acc. 0.8658
Epoch [2] Batch [80/106] Loss 0.4055 Train Acc. 0.8754
Epoch [2] Batch [100/106] Loss 0.3735 Train Acc. 0.8810
Epoch [2] Train Acc. 0.8839
Epoch [2] Test Acc. 0.9051

Epoch [3] Batch [0/106] Loss 0.1875 Train Acc. 0.9375
Epoch [3] Batch [20/106] Loss 0.2006 Train Acc. 0.9055
Epoch [3] Batch [40/106] Loss 0.2791 Train Acc. 0.9101
Epoch [3] Batch [60/106] Loss 0.3214 Train Acc. 0.9096
Epoch [3]

In [25]:
# 학습된 모델의 가중치 저장하기
torch.save(model.state_dict(), 'ckpt/kobert_sentiment_building.pt')

### 학습된 모델 가중치 사용하기

In [36]:
import torch
from torch import nn
import torch.nn.functional as F
import torch.optim as optim
from torch.utils.data import Dataset, DataLoader
import gluonnlp as nlp
import numpy as np
from tqdm.notebook import tqdm

from kobert import get_tokenizer
from kobert import get_pytorch_kobert_model

from transformers import AdamW
from transformers.optimization import get_cosine_schedule_with_warmup

In [37]:
# 모델 구축하기 1 (BASE)
bertmodel, vocab = get_pytorch_kobert_model(cachedir=".cache")

using cached model. /home/sks/dsba/lge/lge_project_bert/.cache/kobert_v1.zip
using cached model. /home/sks/dsba/lge/lge_project_bert/.cache/kobert_news_wiki_ko_cased-1087f8699e.spiece


In [38]:
# 모델 구축하기 2 (Classifier)
class BERTClassifier(nn.Module):
    '''
    Arguments
        bert: 버트 모델 (KoBERT)
        hidden_size: 버트가 사용하는 임베딩 벡터들의 차원
        num_classes: 결과물의 클래스 수(긍/부정 분류의 경우 2가지이므로 2)
        dr_rate: Drop-out Rate
        params:
    '''
    def __init__(
        self,
        bert,
        hidden_size=768,
        num_classes=2,
        dr_rate=None
    ):
        
        super(BERTClassifier, self).__init__()
        self.bert = bert
        self.dr_rate = dr_rate
        
        self.classifier = nn.Linear(hidden_size , num_classes) # 분류기
        if dr_rate:
            self.dropout = nn.Dropout(p=dr_rate)
    
    def gen_attention_mask(self, token_ids, valid_length):
        attention_mask = torch.zeros_like(token_ids)
        for i, v in enumerate(valid_length):
            attention_mask[i][:v] = 1
        return attention_mask.float()

    def forward(self, token_ids, valid_length, segment_ids):
        attention_mask = self.gen_attention_mask(token_ids, valid_length)
        
        _, out = self.bert(input_ids=token_ids, token_type_ids=segment_ids.long(), attention_mask=attention_mask.float().to(token_ids.device))
        
        if self.dr_rate:
            out = self.dropout(out)
            
        out = self.classifier(out)
            
        return out

In [39]:
# model 정의 및 설정된 device로 옮김
model = BERTClassifier(
    bertmodel
)

In [40]:
# 저장해 둔 학습된 가중치 불러오기 (cpu로)
saved_weights = torch.load('ckpt/kobert_sentiment_building.pt', map_location='cpu')

In [41]:
model.load_state_dict(saved_weights)
model.eval()

BERTClassifier(
  (bert): BertModel(
    (embeddings): BertEmbeddings(
      (word_embeddings): Embedding(8002, 768, padding_idx=1)
      (position_embeddings): Embedding(512, 768)
      (token_type_embeddings): Embedding(2, 768)
      (LayerNorm): LayerNorm((768,), eps=1e-12, elementwise_affine=True)
      (dropout): Dropout(p=0.1, inplace=False)
    )
    (encoder): BertEncoder(
      (layer): ModuleList(
        (0): BertLayer(
          (attention): BertAttention(
            (self): BertSelfAttention(
              (query): Linear(in_features=768, out_features=768, bias=True)
              (key): Linear(in_features=768, out_features=768, bias=True)
              (value): Linear(in_features=768, out_features=768, bias=True)
              (dropout): Dropout(p=0.1, inplace=False)
            )
            (output): BertSelfOutput(
              (dense): Linear(in_features=768, out_features=768, bias=True)
              (LayerNorm): LayerNorm((768,), eps=1e-12, elementwise_affine=True

### 추론하기

In [42]:
# BERT 토크나이저 로딩
tokenizer = get_tokenizer()
tok = nlp.data.BERTSPTokenizer(tokenizer, vocab, lower=False)

using cached model. /home/sks/dsba/lge/lge_project_bert/.cache/kobert_news_wiki_ko_cased-1087f8699e.spiece


In [43]:
text_transformer = nlp.data.BERTSentenceTransform(
            tok,
            max_seq_length=64,
            pad=True,
            pair=False
        )

In [44]:
# _not_important = ['공장', '미분류', '주차장', '기숙사', '각동']

In [45]:
dataset.content.values[3]

'.매주 화요일 가스 정압실 점검 및 기록지 교체'

In [46]:
text = '화재 안전 점검 실시'
text_infer = (text, '')

token_ids, valid_length, segment_ids = map(lambda l: torch.tensor(l), text_transformer(text_infer))

token_ids = token_ids.long().unsqueeze(0)
valid_length = valid_length.unsqueeze(0)
segment_ids = segment_ids.long().unsqueeze(0)

In [47]:
pred = model(token_ids, valid_length, segment_ids).squeeze(0)
pred = torch.argmax(torch.nn.functional.softmax(pred, dim=-1)).item()

print("중요함") if pred == 1 else print("중요하지 않음")

중요하지 않음


## CAPTUM

In [167]:
from kobert_tokenizer import KoBertTokenizer

In [53]:
import captum
import torch
import matplotlib.pyplot as plt
import glob 
from transformers import BertTokenizer, BertForSequenceClassification, BertConfig
from captum.attr import visualization as viz
from captum.attr import IntegratedGradients, LayerConductance, LayerIntegratedGradients
from captum.attr import configure_interpretable_embedding_layer, remove_interpretable_embedding_layer

from collections import OrderedDict

In [54]:
device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")

In [66]:
# 모델 구축하기 1 (BASE)
bertmodel, vocab = get_pytorch_kobert_model(cachedir=".cache")

using cached model. /home/sks/dsba/lge/lge_project_bert/.cache/kobert_v1.zip
using cached model. /home/sks/dsba/lge/lge_project_bert/.cache/kobert_news_wiki_ko_cased-1087f8699e.spiece


In [67]:
# 모델 구축하기 2 (Classifier)
class BERTClassifier(nn.Module):
    '''
    Arguments
        bert: 버트 모델 (KoBERT)
        hidden_size: 버트가 사용하는 임베딩 벡터들의 차원
        num_classes: 결과물의 클래스 수(긍/부정 분류의 경우 2가지이므로 2)
        dr_rate: Drop-out Rate
        params:
    '''
    def __init__(
        self,
        bert,
        hidden_size=768,
        num_classes=2,
        dr_rate=None
    ):
        
        super(BERTClassifier, self).__init__()
        self.bert = bert
        self.dr_rate = dr_rate
        
        self.classifier = nn.Linear(hidden_size , num_classes) # 분류기
        if dr_rate:
            self.dropout = nn.Dropout(p=dr_rate)
    
    def gen_attention_mask(self, token_ids, valid_length):
        attention_mask = torch.zeros_like(token_ids)
        for i, v in enumerate(valid_length):
            attention_mask[i][:v] = 1
        return attention_mask.float()

    def forward(self, token_ids, valid_length, segment_ids):
        attention_mask = self.gen_attention_mask(token_ids, valid_length)
        
        _, out = self.bert(input_ids=token_ids, token_type_ids=segment_ids.long(), attention_mask=attention_mask.float().to(token_ids.device))
        
        if self.dr_rate:
            out = self.dropout(out)
            
        out = self.classifier(out)
            
        return out

In [68]:
# model 정의 및 설정된 device로 옮김
model = BERTClassifier(
    bertmodel
)

In [69]:
# 저장해 둔 학습된 가중치 불러오기 (cpu로)
saved_weights = torch.load('ckpt/kobert_sentiment_building.pt', map_location='cpu')

In [70]:
model.load_state_dict(saved_weights)
model.eval()

BERTClassifier(
  (bert): BertModel(
    (embeddings): BertEmbeddings(
      (word_embeddings): Embedding(8002, 768, padding_idx=1)
      (position_embeddings): Embedding(512, 768)
      (token_type_embeddings): Embedding(2, 768)
      (LayerNorm): LayerNorm((768,), eps=1e-12, elementwise_affine=True)
      (dropout): Dropout(p=0.1, inplace=False)
    )
    (encoder): BertEncoder(
      (layer): ModuleList(
        (0): BertLayer(
          (attention): BertAttention(
            (self): BertSelfAttention(
              (query): Linear(in_features=768, out_features=768, bias=True)
              (key): Linear(in_features=768, out_features=768, bias=True)
              (value): Linear(in_features=768, out_features=768, bias=True)
              (dropout): Dropout(p=0.1, inplace=False)
            )
            (output): BertSelfOutput(
              (dense): Linear(in_features=768, out_features=768, bias=True)
              (LayerNorm): LayerNorm((768,), eps=1e-12, elementwise_affine=True

In [84]:
tokenizer = get_tokenizer()
tokenizer = nlp.data.BERTSPTokenizer(tokenizer, vocab, lower=False)

using cached model. /home/sks/dsba/lge/lge_project_bert/.cache/kobert_news_wiki_ko_cased-1087f8699e.spiece


In [87]:
ref_token_id = tokenizer.vocab.token_to_idx['[PAD]'] # A token used for generating token reference
sep_token_id = tokenizer.vocab.token_to_idx['[SEP]']
cls_token_id = tokenizer.vocab.token_to_idx['[CLS]'] # A token used for prepending to the concatenated question-text word sequence

In [156]:
def predict(inputs):
    return model(inputs)

def construct_input_ref_pair(text, ref_token_id, sep_token_id, cls_token_id):
    #text_ids = tokenizer.encode(text, add_special_tokens=False)
    token_ids, valid_length, segment_ids = map(lambda l: torch.tensor(l), text_transformer(text))

    #input_ids = [cls_token_id] + text_ids + [sep_token_id]
    input_ids = token_ids
    ref_input_ids = [cls_token_id] + [ref_token_id] * (len(token_ids)-2) + [sep_token_id]
    return torch.tensor(input_ids, device=device), torch.tensor(ref_input_ids, device=device), len(token_ids) - 2, valid_length, segment_ids

def construct_input_ref_token_type_pair(input_ids, sep_ind=0):
    seq_len = input_ids.size(1)
    token_type_ids = torch.tensor([[0 if i <= sep_ind else 1 for i in range(seq_len)]], device=device)
    ref_token_type_ids = torch.zeros_like(token_type_ids, device=device)# * -1
    return token_type_ids, ref_token_type_ids

def construct_input_ref_pos_id_pair(input_ids):
    seq_length = input_ids.size(1)
    position_ids = torch.arange(seq_length, dtype=torch.long, device=device)
    # we could potentially also use random permutation with `torch.randperm(seq_length, device=device)`
    ref_position_ids = torch.zeros(seq_length, dtype=torch.long, device=device)

    position_ids = position_ids.unsqueeze(0).expand_as(input_ids)
    ref_position_ids = ref_position_ids.unsqueeze(0).expand_as(input_ids)
    return position_ids, ref_position_ids
    
def construct_attention_mask(input_ids):
    return torch.ones_like(input_ids)

def custom_forward(inputs):
    # for negative attribution, torch.softmax(preds, dim = 1)[:, 1] <- for positive attribution
    preds = predict(inputs)
    return torch.softmax(preds, dim = 1)[:, 1] 

In [157]:
text_transformer = nlp.data.BERTSentenceTransform(
            tok,
            max_seq_length=64,
            pad=False,
            pair=False,
        )

In [158]:
text = '화재 안전 점검 실시'
text_infer = (text, '')

token_ids, valid_length, segment_ids = map(lambda l: torch.tensor(l), text_transformer(text_infer))

token_ids = token_ids.long().unsqueeze(0)
valid_length = valid_length.unsqueeze(0)
segment_ids = segment_ids.long().unsqueeze(0)

In [159]:
pred = model(token_ids, valid_length, segment_ids).squeeze(0)
pred = torch.argmax(torch.nn.functional.softmax(pred, dim=-1)).item()

print("중요함") if pred == 1 else print("중요하지 않음")

중요하지 않음


In [160]:
lig = LayerIntegratedGradients(custom_forward, model.bert.embeddings)

In [165]:
vis_data_records_ig = []
label_vocab = {0: 'negative', 1: 'positive'}

def interpret_sentence(model, sentence, label):       
    model.zero_grad()
    input_ids, ref_input_ids, sep_id, valid_length, segment_ids = construct_input_ref_pair(sentence, ref_token_id, sep_token_id, cls_token_id)
    print(input_ids, ref_input_ids, sep_id)
    #input_tokens = tokenizer.decode(input_ids[0], special_tokens=False) # input_ids.tokens[:params['max_len']]
    input_tokens = sentence

    # 단일 문장에 대한 예측 작업 수행
    pred = model(input_ids.unsqueeze(0), valid_length, segment_ids) #predict(input_ids)
    pred = torch.nn.functional.softmax(pred, dim=-1)
    pred_ind = torch.argmax(pred, dim=-1).item()
    pred = pred.cpu().detach().numpy()

    # LayerIntegratedGradients 모듈 활용해 개별 단어의 속성값 및 델타값 근사치 계산
    attributions_ig, delta = lig.attribute(inputs=input_ids, baselines=ref_input_ids, return_convergence_delta=True)
    print(f'pred: {label_vocab[pred_ind]} ({pred[pred_ind].item()}) ')
    add_attributions_to_visualizer(attributions_ig, input_tokens, pred, pred_ind, label, delta, vis_data_records_ig)

def add_attributions_to_visualizer(attributions, text, pred, pred_ind, label, delta, vis_data_records):
    attributions = attributions.sum(dim=2).squeeze(0)
    attributions = attributions / torch.norm(attributions)
    attributions = attributions.cpu().detach().numpy()
    
    # 시각화 위해 샘플을 리스트에 추가
    vis_data_records.append(viz.VisualizationDataRecord(
                                    attributions,
                                    pred[pred_ind].item(), # class prob
                                    label_vocab[pred_ind], # pred
                                    label_vocab[label], # true
                                    label_vocab[1], # attribution label
                                    attributions.sum(),       
                                    text.split(),
                                    delta
                                )
                           )

In [166]:
interpret_sentence(model, '화재 안전 점검 실시', label=0)

tensor([   2, 5112,    3], device='cuda:0', dtype=torch.int32) tensor([2, 1, 3], device='cuda:0') 1
tensor([[   2, 5112,    3]], device='cuda:0', dtype=torch.int32)


  return torch.tensor(input_ids, device=device), torch.tensor(ref_input_ids, device=device), len(token_ids) - 2, valid_length, segment_ids


TypeError: iteration over a 0-d tensor