-
Notifications
You must be signed in to change notification settings - Fork 0
/
modelinit.py
158 lines (133 loc) · 6.03 KB
/
modelinit.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
# import modules
import datetime
import time
import logging
import random
# for data analyze and wrangling
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns
import re
import urllib.request
import easydict
from tqdm import tqdm, notebook
# machine learning
import torch
from torch import nn
from torch.utils.data import TensorDataset, DataLoader, RandomSampler, SequentialSampler
import torch.nn.functional as F
import torch.optim as optim
from torch.utils.data import Dataset, DataLoader
from transformers import DataCollatorWithPadding
from transformers import BertTokenizer
from transformers import BertForSequenceClassification, AdamW, BertConfig
from transformers import get_linear_schedule_with_warmup
from transformers import BertModel
from transformers.optimization import get_cosine_schedule_with_warmup
from sklearn.model_selection import train_test_split
import gluonnlp as nlp
# kobert
from kobert_tokenizer import KoBERTTokenizer
def preprocessing(df):
logging.info(f"Before preprocessing df's shape: {df.shape}")
df['document'] = df['document'].str.replace("[^ㄱ-ㅎㅏ-ㅣ가-힣 ]","")
df['document'] = df['document'].str.replace('^ +', "")
df['document'] = df['document'].replace('', np.nan)
df = df.dropna(how = 'any')
logging.info(f"After preprocessing df's shape: {df.shape}")
return df
#TODO easydict 형식으로 parameters 정리 가능?
class ModelInit():
def __init__(self, model_name):
self.args = easydict.EasyDict({'bert_model': model_name,
'n_class': 2, 'max_token_len': 512})
self.tokenizer = None
self.vocab = None
self.model = None
if model_name == "bert-base-multilingual-cased":
self.tokenizer = BertTokenizer.from_pretrained(self.args.bert_model)
self.model = BertModel.from_pretrained(self.args.bert_model, num_labels=self.args.n_class, return_dict=False)
elif model_name == "skt/kobert-base-v1":
self.tokenizer = KoBERTTokenizer.from_pretrained(self.args.bert_model)
self.model = BertModel.from_pretrained(self.args.bert_model, num_labels=self.args.n_class, return_dict=False)
self.vocab = nlp.vocab.BERTVocab.from_sentencepiece(self.tokenizer.vocab_file, padding_token='[PAD]')
self.train_batch_size = 32
self.test_batch_size = 16
self.no_decay = None
self.optimizer_grouped_parameters = None
self.optimizer = None
self.loss_fn = None
self.t_total = None
self.warmup_step = None
self.scheduler = None
self.train_loss_per_epoch = []
self.validation_loss_per_epoch = []
self.train_accuracy_per_epoch = []
self.validation_accuracy_per_epoch = []
self.accuracy = None
self.train_dataloader = None
self.test_dataloader = None
self.epochs = None
def define_hyperparameters(self, model, train_dataloader, epochs):
self.no_decay = ['bias', 'LayerNorm.weight']
self.optimizer_grouped_parameters = [
{'params': [p for n, p in model.named_parameters() if not any(nd in n for nd in self.no_decay)], 'weight_decay': 0.01},
{'params': [p for n, p in model.named_parameters() if any(nd in n for nd in self.no_decay)], 'weight_decay': 0.0}
]
self.optimizer = AdamW(model.parameters(),
lr = 5e-5, # Learning rate
eps = 1e-8 # Epsilon for AdamW to avoid numerical issues (zero division)
)
self.loss_fn = nn.CrossEntropyLoss()
self.t_total = len(train_dataloader) * epochs
self.warmup_step = int(self.t_total * 0.1)
self.scheduler = get_cosine_schedule_with_warmup(self.optimizer, num_warmup_steps=self.warmup_step, num_training_steps=self.t_total)
self.epochs = epochs
class BERTDataset(Dataset):
def __init__(self, dataset, feat_col, label_col, bert_tokenizer):
self.sentences = bert_tokenizer(
list(dataset[feat_col]),
return_tensors="pt",
padding=True,
truncation=True,
max_length=512,
add_special_tokens=True
)
self.label = dataset[label_col].values
def __getitem__(self, idx):
item = {key: value[idx].clone().detach() for key, value in self.sentences.items()}
item["label"] = torch.tensor(self.label[idx])
return item
def __len__(self):
return len(self.label)
class BERTClassifier(nn.Module):
def __init__(self,
bert,
hidden_size = 768,
num_classes=2,
dr_rate=None,
params=None):
super(BERTClassifier, self).__init__()
self.bert = bert
self.dr_rate = dr_rate
self.classifier = nn.Linear(hidden_size , num_classes)
if dr_rate:
self.dropout = nn.Dropout(p=dr_rate)
def gen_attention_mask(self, token_ids, valid_length):
attention_mask = torch.zeros_like(token_ids)
for i, v in enumerate(valid_length):
attention_mask[i][:v] = 1
return attention_mask.float()
def forward(self, input_ids, token_type_ids, attention_mask, bert_model):
if bert_model == "bert-base-multilingual-cased":
pooler = self.bert(input_ids = input_ids, token_type_ids = token_type_ids.long(), attention_mask = attention_mask.float().to(input_ids.device))
if self.dr_rate:
out = self.dropout(pooler[0])
logits = self.classifier(out[:, 0])
if bert_model == "skt/kobert-base-v1":
_, pooler = self.bert(input_ids = input_ids, token_type_ids = token_type_ids.long(), attention_mask = attention_mask.float().to(input_ids.device))
if self.dr_rate:
out = self.dropout(pooler)
logits = self.classifier(out)
return logits