# 데이터 불러오기 및 학습 데이터와 테스트 데이터 분류 

In [1]:
import pandas as pd
import numpy as np
import torch
from tqdm.auto import tqdm
import random
import os

def reset_seeds(seed):
    random.seed(seed)
    os.environ['PYTHONHASHSEED'] = str(seed)
    np.random.seed(seed)
    torch.manual_seed(seed)
    torch.cuda.manual_seed(seed)
    torch.backends.cudnn.deterministic = True

device = 'cuda' if torch.cuda.is_available() else 'cpu'
device

'cuda'

In [2]:
# 데이터 불러오기 

train_ft = pd.read_excel('zigzag_clothes.xlsx')

In [3]:
# 간단한 전처리 

train_ft['리뷰'] = train_ft['리뷰'].str.replace('\n','. ')

In [4]:
# 당장 분석에 필요없는 컬럼 제거 

drop_columns = ['중분류', '브랜드', '상품명', '가격', '리뷰수', '상품평점', '닉네임', '별점', '날짜', '키',
       '몸무게', '상의사이즈', '선택옵션', '사이즈평가', '퀄리티평가', '색감평가']

train_ft.drop(drop_columns,axis = 1,inplace = True)

train_ft.shape

(7193, 8)

In [5]:
# ex) 기본 키워드의 경우에는 키워드별 존재 여부에 대한 모델에 적용하기 위해 -1을 1로 바꿔야 합니다. 
# 그렇게 되면 원본 키워드가 훼손되기 때문에 복사해서 따로 컬럼을 만들어줍니다. 
train_ft['색감_긍부정'] = train_ft['색감'].copy()
train_ft['핏_긍부정'] = train_ft['핏'].copy()
train_ft['재질_긍부정'] = train_ft['재질'].copy()
train_ft['퀄리티_긍부정'] = train_ft['퀄리티'].copy()
train_ft['제품상태_긍부정'] = train_ft['제품상태'].copy()
train_ft['가격.1_긍부정'] = train_ft['가격.1'].copy()
train_ft['두께_긍부정'] = train_ft['두께'].copy()

In [6]:
# 학습 데이터와 테스트 데이터 분류 작업 
# y정답데이터를 기준으로 키워드별로 층화추출이 행해집니다. 

x,y = train_ft[['리뷰','색감_긍부정','핏_긍부정','재질_긍부정','퀄리티_긍부정','제품상태_긍부정','가격.1_긍부정','두께_긍부정']].to_numpy(), train_ft[['색감', '핏', '재질', '퀄리티', '제품상태', '가격.1', '두께']].replace(-1,1).to_numpy()

from iterstrat.ml_stratifiers import MultilabelStratifiedKFold

msss = MultilabelStratifiedKFold(n_splits=5, shuffle=True, random_state=42)

for train_index, test_index in msss.split(x, y):
    print("TRAIN:", train_index, "TEST:", test_index)
    x_train, x_test = x[train_index], x[test_index]
    y_train, y_test = y[train_index], y[test_index]
    break

TRAIN: [   0    1    2 ... 7189 7191 7192] TEST: [   6    7   19 ... 7183 7188 7190]


In [7]:
# 최종적인 학습 데이터와 테스트 데이터가 만들어집니다. 

tmp_1 = pd.DataFrame(x_train,columns = ['리뷰','색감_긍부정','핏_긍부정','재질_긍부정','퀄리티_긍부정','제품상태_긍부정','가격.1_긍부정','두께_긍부정'])
tmp_2 = pd.DataFrame(y_train,columns = ['색감', '핏', '재질', '퀄리티', '제품상태', '가격.1', '두께'])

train_ft = pd.concat([tmp_1,tmp_2],axis = 1)

tmp_1 = pd.DataFrame(x_test,columns = ['리뷰','색감_긍부정','핏_긍부정','재질_긍부정','퀄리티_긍부정','제품상태_긍부정','가격.1_긍부정','두께_긍부정'])
tmp_2 = pd.DataFrame(y_test,columns = ['색감', '핏', '재질', '퀄리티', '제품상태', '가격.1', '두께'])

test_ft = pd.concat([tmp_1,tmp_2],axis = 1)

In [8]:
train_ft.shape,test_ft.shape

((5754, 15), (1439, 15))

In [9]:
# 키워드별 긍부정을 예측할 때 사용하는 데이터 셋

train_tmp = train_ft.iloc[:,:8].copy()
test_tmp = test_ft.iloc[:,:8].copy()

train_tmp.shape,test_tmp.shape

((5754, 8), (1439, 8))

In [10]:
# 키워드 존재 여부를 판단하는 모델을 위한 데이터 셋

train_ft = train_ft.drop(train_ft.columns[1:8], axis=1).copy()
test_ft = test_ft.drop(test_ft.columns[1:8], axis=1).copy()

train_ft.shape,test_ft.shape

((5754, 8), (1439, 8))

In [11]:
# 사전학습 모델에서 임베딩 작업이 행해지기 때문에 numpy의 형태로 전달해야 합니다. 

train_arr = train_ft['리뷰'].to_numpy()
test_arr = test_ft['리뷰'].to_numpy()

train_arr.shape,test_arr.shape

((5754,), (1439,))

In [12]:
# 이항분류 문제의 경우에는 이차원으로 전달해야 하는데 이미 2차원의 형태이기 때문에 numpy의 형태로만 전달을 해줍니다. 

target = train_ft.iloc[:,1:].to_numpy().copy()
target_test = test_ft.iloc[:,1:].to_numpy().copy()

target.shape,target_test.shape

((5754, 7), (1439, 7))

# 사전학습 모델 불러오기 

In [13]:
# model_name = 'BM-K/KoDiffCSE-RoBERTa'
# TF-IDF로 단어문서 행렬을 정리한 후 decomp를 통한 주제분석 피처를 추가했을 경우 점수 75 (한번의 Fold에서)
# 오직 사전학습만 적용했을 경우 점수 73

In [14]:
# model_name = 'amberoad/bert-multilingual-passage-reranking-msmarco'

In [15]:
# model_name = 'team-lucid/deberta-v3-base-korean'
# 순수하게 이것만 이용했을 때는 72

In [16]:
# model_name = 'smilegate-ai/kor_unsmile'
# 순수하게 이것만 이용했을 때 74
# 단어문서 행렬을 붙이면 73

In [17]:
# model_name = 'BM-K/KoSimCSE-roberta'
# 순수하게 이것만 이용했을 때 75가 나온다. 

In [18]:
# model_name = 'minalang/KoDSE-bert'

In [19]:
model_name = 'kykim/bert-kor-base'
# 기본으로 적용해도 76
# TF-IDF로 단어문서 행렬을 정리한 후 decomp를 통한 주제분석 피처를 추가했을 경우 점수 76 (한번의 Fold에서)
# cat decomp, RNN, CNN의 경우에는 모델 자체가 무거워지는 현상이 발생함, 점수 75

In [20]:
from transformers import AutoTokenizer, AutoModel

In [21]:
model = AutoModel.from_pretrained(model_name)
tokenizer = AutoTokenizer.from_pretrained(model_name)

# 만일 padding의 길이를 지정해주고 싶다면?
# tokenizer = AutoTokenizer.from_pretrained(model_name, model_max_length = 512)
# tokenizer.model_max_length로 padding의 길이도 한번 확인보면 좋을 듯 

  return self.fget.__get__(instance, owner)()


# 사전학습만을 이용하여 학습 및 검증

In [22]:
class ReviewDataset(torch.utils.data.Dataset):
    def __init__(self, tokenizer, x, y=None):
        self.tokenizer = tokenizer
        self.x = x
        self.y = y
    def __len__(self):
        return len(self.x)
    def __getitem__(self, idx):
        item = {}
        item["x"] = self.get_tokenizer(self.x[idx])
        if self.y is not None:
            item["y"] = torch.Tensor(self.y[idx])
        return item
    def get_tokenizer(self, text):
        x = self.tokenizer(text, padding="max_length", truncation=True)
        for k, v in x.items():
            x[k] = torch.tensor(v)
        return x

In [23]:
dt = ReviewDataset(tokenizer, train_arr, target)
dl = torch.utils.data.DataLoader(dt, batch_size=2, shuffle=False)
batch = next(iter(dl))
batch

{'x': {'input_ids': tensor([[    2,  2059, 18764,  ...,     0,     0,     0],
         [    2,  7386, 18392,  ...,     0,     0,     0]]), 'token_type_ids': tensor([[0, 0, 0,  ..., 0, 0, 0],
         [0, 0, 0,  ..., 0, 0, 0]]), 'attention_mask': tensor([[1, 1, 1,  ..., 0, 0, 0],
         [1, 1, 1,  ..., 0, 0, 0]])},
 'y': tensor([[0., 0., 0., 1., 0., 0., 0.],
         [0., 1., 0., 0., 0., 0., 1.]])}

In [24]:
class Net(torch.nn.Module):
    def __init__(self, model_name):
        super().__init__()
        self.pre_model = AutoModel.from_pretrained(model_name)
        self.fc_out = torch.nn.Linear( self.pre_model.config.hidden_size, 7)

    def forward(self, x):
        x = self.pre_model(**x)
        # x[0]: 모든 시점의 히든출력 batch, seq, features
        # x[1]: CLS 토큰의 히든출력 batch, features
        return self.fc_out(x[1])

In [25]:
model = Net(model_name)
model(batch["x"])

tensor([[ 0.0426, -0.3442,  0.0453, -0.1306, -0.0273, -0.0229,  0.3785],
        [ 0.5101,  0.4435,  0.0894,  0.0829, -0.0481, -0.1151,  0.4884]],
       grad_fn=<AddmmBackward0>)

In [26]:
def train_loop(dataloader, model, loss_fn, optimizer, device):
    epoch_loss = 0
    model.train() # 학습 모드
    for batch in tqdm(dataloader):
        pred = model( batch["x"].to(device) )
        loss = loss_fn( pred, batch["y"].to(device) )

        optimizer.zero_grad()
        loss.backward()
        optimizer.step()

        epoch_loss += loss.item()

    epoch_loss /= len(dataloader)
    return epoch_loss

In [27]:
@torch.no_grad()
def test_loop(dataloader, model, loss_fn, device):
    epoch_loss = 0
    pred_list = []
    act_func = torch.nn.Sigmoid()
    model.eval() # 평가 모드
    for batch in tqdm(dataloader):
        pred = model( batch["x"].to(device) )
        if batch.get("y") is not None:
            loss = loss_fn( pred, batch["y"].to(device) )
            epoch_loss += loss.item()

        pred = act_func(pred) # logit 값을 확률로 변환
        pred = pred.to("cpu").numpy() # cpu 이동후 ndarray 로변환
        pred_list.append(pred)

    epoch_loss /= len(dataloader)
    pred = np.concatenate(pred_list)
    return epoch_loss, pred

In [28]:
from sklearn.model_selection import KFold
from sklearn.metrics import accuracy_score
from sklearn.metrics import f1_score

n_splits = 5
cv = KFold(n_splits, shuffle=True, random_state=42)

batch_size = 8 # 배치 사이즈
loss_fn = torch.nn.BCEWithLogitsLoss() # 손실 객체
epochs = 100 # 최대 가능한 에폭수

In [None]:
is_holdout = False
reset_seeds(42) # 재현을 위해 시드고정
best_score_list = []
for i, (tri, vai) in enumerate( cv.split(train_arr) ):
    # 학습용 데이터로더 객체
    train_dt = ReviewDataset(tokenizer, train_arr[tri], target[tri])
    train_dl = torch.utils.data.DataLoader(train_dt, batch_size=batch_size, shuffle=True)

    # 검증용 데이터로더 객체
    valid_dt = ReviewDataset(tokenizer, train_arr[vai], target[vai])
    valid_dl = torch.utils.data.DataLoader(valid_dt, batch_size=batch_size, shuffle=False)

    # 모델 객체와 옵티마이저 객체 생성
    model = Net(model_name).to(device)
    optimizer = torch.optim.Adam( model.parameters(), lr=2e-5 )

    best_score = 0 # 현재 최고 점수
    patience = 0 # 조기 종료 조건을 주기 위한 변수
    for epoch in range(epochs):
        train_loss = train_loop(train_dl, model, loss_fn, optimizer, device)
        valid_loss, pred = test_loop(valid_dl, model, loss_fn, device)

        pred = (pred > 0.5).astype(int) # 이진분류 문제에서 클래스 번호 결정
        score = f1_score(target[vai], pred,average = None).mean()

        print(train_loss, valid_loss, score)
        if score > best_score:
            best_score = score # 최고 점수 업데이트
            patience = 0
            torch.save(model.state_dict(), f"model_{i}.pth") # 최고 점수 모델 가중치 저장

        patience += 1
        if patience == 5:
            break

    print(f"{i}번째 폴드 최고 정확도: {best_score}")
    best_score_list.append(best_score)

    if is_holdout:
        break

# 테스트 데이터 예측 및 검증

In [None]:
test_dt = ReviewDataset(tokenizer,test_arr)
test_dl = torch.utils.data.DataLoader(test_dt, batch_size=2, shuffle=False)

In [None]:
pred_list = []
for i in range(n_splits):
    model = Net(model_name).to(device)
    state_dict = torch.load(f"model_{i}.pth")
    model.load_state_dict(state_dict)

    _, pred = test_loop(test_dl, model, loss_fn, device)

    pred_list.append(pred)
    if is_holdout:
        break

In [None]:
pred = np.mean(pred_list, axis=0)
pred = (pred > 0.5).astype(int)

In [None]:
score = f1_score(target_test, pred,average = 'macro')
score

# 만일 사전 학습 모델의 히든 출력에 cls토큰이 없다면 ? last_hidden_cell만 존재하는 경우에는?

In [None]:
class ReviewDataset(torch.utils.data.Dataset):
    def __init__(self, tokenizer, x, y=None):
        self.tokenizer = tokenizer
        self.x = x
        self.y = y
    def __len__(self):
        return len(self.x)
    def __getitem__(self, idx):
        item = {}
        item["x"] = self.get_tokenizer(self.x[idx])
        if self.y is not None:
            item["y"] = torch.Tensor(self.y[idx])
        return item
    def get_tokenizer(self, text):
        x = self.tokenizer(text, padding="max_length", truncation=True)
        for k, v in x.items():
            x[k] = torch.tensor(v)
        return x

In [None]:
dt = ReviewDataset(tokenizer, train_arr, target)
dl = torch.utils.data.DataLoader(dt, batch_size=2, shuffle=False)
batch = next(iter(dl))
batch

In [None]:
class Net(torch.nn.Module):
    def __init__(self, model_name):
        super().__init__()
        self.pre_model = AutoModel.from_pretrained(model_name)
        self.fc_out = torch.nn.Linear( self.pre_model.config.hidden_size, 7)

    def forward(self, x):
        x = self.pre_model(**x)
        # x['last_hidden_state']: 모든 시점의 히든출력 batch, seq, feature
        x = x['last_hidden_state']
        return self.fc_out(x[:,0])
        # 첫 번째 sequence의 batch,feature를 뽑아줍니다. 
        # 그러면 cls토큰과 똑같이 batch,feature차원으로 변경됩니다. 

In [None]:
model = Net(model_name)
model(batch["x"])

In [None]:
def train_loop(dataloader, model, loss_fn, optimizer, device):
    epoch_loss = 0
    model.train() # 학습 모드
    for batch in tqdm(dataloader):
        pred = model( batch["x"].to(device) )
        loss = loss_fn( pred, batch["y"].to(device) )

        optimizer.zero_grad()
        loss.backward()
        optimizer.step()

        epoch_loss += loss.item()

    epoch_loss /= len(dataloader)
    return epoch_loss

In [None]:
@torch.no_grad()
def test_loop(dataloader, model, loss_fn, device):
    epoch_loss = 0
    pred_list = []
    act_func = torch.nn.Sigmoid()
    model.eval() # 평가 모드
    for batch in tqdm(dataloader):
        pred = model( batch["x"].to(device) )
        if batch.get("y") is not None:
            loss = loss_fn( pred, batch["y"].to(device) )
            epoch_loss += loss.item()

        pred = act_func(pred) # logit 값을 확률로 변환
        pred = pred.to("cpu").numpy() # cpu 이동후 ndarray 로변환
        pred_list.append(pred)

    epoch_loss /= len(dataloader)
    pred = np.concatenate(pred_list)
    return epoch_loss, pred

In [None]:
from sklearn.model_selection import KFold
from sklearn.metrics import f1_score
n_splits = 5
cv = KFold(n_splits, shuffle=True, random_state=SEED)

batch_size = 8 # 배치 사이즈
loss_fn = torch.nn.BCEWithLogitsLoss() # 손실 객체
epochs = 100 # 최대 가능한 에폭수

In [None]:
is_holdout = False
reset_seeds(42) # 재현을 위해 시드고정
best_score_list = []
for i, (tri, vai) in enumerate( cv.split(train_arr) ):
    # 학습용 데이터로더 객체
    train_dt = ReviewDataset(tokenizer, train_arr[tri], target[tri])
    train_dl = torch.utils.data.DataLoader(train_dt, batch_size=batch_size, shuffle=True)

    # 검증용 데이터로더 객체
    valid_dt = ReviewDataset(tokenizer, train_arr[vai], target[vai])
    valid_dl = torch.utils.data.DataLoader(valid_dt, batch_size=batch_size, shuffle=False)

    # 모델 객체와 옵티마이저 객체 생성
    model = Net(model_name).to(device)
    optimizer = torch.optim.Adam( model.parameters(), lr=2e-5 )

    best_score = 0 # 현재 최고 점수
    patience = 0 # 조기 종료 조건을 주기 위한 변수
    for epoch in range(epochs):
        train_loss = train_loop(train_dl, model, loss_fn, optimizer, device)
        valid_loss, pred = test_loop(valid_dl, model, loss_fn, device)

        pred = (pred > 0.5).astype(int) # 이진분류 문제에서 클래스 번호 결정
        score = f1_score(target[vai], pred,average = None).mean()

        print(train_loss, valid_loss, score)
        if score > best_score:
            best_score = score # 최고 점수 업데이트
            patience = 0
            torch.save(model.state_dict(), f"model_{i}.pth") # 최고 점수 모델 가중치 저장

        patience += 1
        if patience == 5:
            break

    print(f"{i}번째 폴드 최고 정확도: {best_score}")
    best_score_list.append(best_score)

    if is_holdout:
        break

# 테스트 데이터에 대한 예측은 위와 동일하게 실행시킵니다.

# 단어 문서 행렬 

In [None]:
from sklearn.feature_extraction.text import TfidfVectorizer

In [None]:
vectorizer = TfidfVectorizer(max_features=8192)

In [None]:
vectorizer.fit(train_tmp["리뷰"])

In [None]:
train_tdm = vectorizer.transform(train_tmp["리뷰"])
test_tdm = vectorizer.transform(test_tmp["리뷰"])
train_tdm.shape, test_tdm.shape

# 차원 축소 

In [None]:
SEED  = 42

from sklearn.decomposition import TruncatedSVD
decomp = TruncatedSVD(1024, random_state=SEED)
decomp.fit(train_tdm)

train_tdm = decomp.transform(train_tdm)
test_tdm = decomp.transform(test_tdm)

train_tdm.shape, test_tdm.shape

# 사전학습 + 단어 문서 행렬

In [None]:
class ReviewDataset(torch.utils.data.Dataset):
    def __init__(self, tokenizer,x_tdm,x, y=None):
        self.tokenizer = tokenizer
        self.x_tdm = x_tdm # 단어문서행렬 
        self.x = x
        self.y = y
    def __len__(self):
        return len(self.x_tdm)
    def __getitem__(self, idx):
        item = {}
        item['x_tdm'] = torch.Tensor(self.x_tdm[idx])
        item["x"] = self.get_tokenizer(self.x[idx])
        if self.y is not None:
            item["y"] = torch.Tensor(self.y[idx])
        return item
    def get_tokenizer(self, text):
        x = self.tokenizer(text, truncation = True, padding = 'max_length')
        for k, v in x.items():
            x[k] = torch.tensor(v)
        return x

In [None]:
dt = ReviewDataset(tokenizer, train_tdm, train_arr, target)
dl = torch.utils.data.DataLoader(dt, batch_size=2, shuffle=False)
batch = next(iter(dl))
batch

In [None]:
class Net(torch.nn.Module):
    def __init__(self, model_name):
        super().__init__()
        self.pre_model = AutoModel.from_pretrained(model_name)
        self.fc_out = torch.nn.Linear( self.pre_model.config.hidden_size  + 1024, 7)

    def forward(self, x_tdm, x):
        x = self.pre_model(**x) 
        return self.fc_out(torch.cat([x_tdm,x[1]],dim = 1))

In [None]:
model = Net(model_name)
model(batch["x_tdm"],batch['x'])

In [None]:
def train_loop(dataloader, model, loss_fn, optimizer, device):
    epoch_loss = 0
    model.train() # 학습 모드
    for batch in tqdm(dataloader):
        pred = model( batch["x_tdm"].to(device),batch["x"].to(device) )
        loss = loss_fn( pred, batch["y"].to(device) )

        optimizer.zero_grad()
        loss.backward()
        optimizer.step()

        epoch_loss += loss.item()

    epoch_loss /= len(dataloader)
    return epoch_loss

In [None]:
@torch.no_grad()
def test_loop(dataloader, model, loss_fn, device):
    epoch_loss = 0
    pred_list = []
    act_func = torch.nn.Sigmoid()
    model.eval() # 평가 모드
    for batch in tqdm(dataloader):
        pred = model( batch["x_tdm"].to(device),batch["x"].to(device) )
        if batch.get("y") is not None:
            loss = loss_fn( pred, batch["y"].to(device) )
            epoch_loss += loss.item()

        pred = act_func(pred) # logit 값을 확률로 변환
        pred = pred.to("cpu").numpy() # cpu 이동후 ndarray 로변환
        pred_list.append(pred)

    epoch_loss /= len(dataloader)
    pred = np.concatenate(pred_list)
    return epoch_loss, pred

In [None]:
from sklearn.model_selection import KFold
from sklearn.metrics import accuracy_score
from sklearn.metrics import f1_score

n_splits = 5
cv = KFold(n_splits, shuffle=True, random_state=42)

batch_size = 16 # 배치 사이즈
loss_fn = torch.nn.BCEWithLogitsLoss() # 손실 객체
epochs = 100 # 최대 가능한 에폭수

In [None]:
is_holdout = True
reset_seeds(42) # 재현을 위해 시드고정
best_score_list = []
for i, (tri, vai) in enumerate( cv.split(train_arr) ):
    # 학습용 데이터로더 객체
    train_dt = ReviewDataset(tokenizer, train_tdm[tri],train_arr[tri], target[tri])
    train_dl = torch.utils.data.DataLoader(train_dt, batch_size=batch_size, shuffle=True)

    # 검증용 데이터로더 객체
    valid_dt = ReviewDataset(tokenizer, train_tdm[vai],train_arr[vai], target[vai])
    valid_dl = torch.utils.data.DataLoader(valid_dt, batch_size=batch_size, shuffle=False)

    # 모델 객체와 옵티마이저 객체 생성
    model = Net(model_name).to(device)
    optimizer = torch.optim.Adam( model.parameters(),lr=2e-5)

    best_score = 0 # 현재 최고 점수
    patience = 0 # 조기 종료 조건을 주기 위한 변수
    for epoch in range(epochs):
        train_loss = train_loop(train_dl, model, loss_fn, optimizer, device)
        valid_loss, pred = test_loop(valid_dl, model, loss_fn, device)

        pred = (pred > 0.5).astype(int) # 이진분류 문제에서 클래스 번호 결정
        score = f1_score(target[vai], pred,average = None).mean()

        print(train_loss, valid_loss, score)
        if score > best_score:
            best_score = score # 최고 점수 업데이트
            patience = 0
            torch.save(model.state_dict(), f"model_{i}.pth") # 최고 점수 모델 가중치 저장

        patience += 1
        if patience == 5:
            break

    print(f"{i}번째 폴드 최고 정확도: {best_score}")
    best_score_list.append(best_score)

    if is_holdout:
        break

# 사전학습 모델 + CNN + RNN

In [None]:
# 모델이 너무 무거워 짐
 class Net(torch.nn.Module):
     def __init__(self, model_name):
         super().__init__()
         self.pre_# model = AutoModel.from_pretrained(model_name)
         self.conv1d_block = torch.nn.Sequential(
             torch.nn.Conv1d(self.pre_model.config.hidden_size,self.pre_model.config.hidden_size*2,3),
             torch.nn.BatchNorm1d(self.pre_model.config.hidden_size * 2),
             torch.nn.ReLU(),
             torch.nn.AvgPool1d(2),
 
             torch.nn.Conv1d(self.pre_model.config.hidden_size * 2, self.pre_model.config.hidden_size * 4,3),
             torch.nn.BatchNorm1d(self.pre_model.config.hidden_size * 4),
             torch.nn.ReLU(),
             torch.nn.AvgPool1d(2),
 
             # torch.nn.AdaptiveMaxPool1d(1),
             # torch.nn.Flatten(),
             torch.nn.Dropout(0.2)
         )
         self.rnn_layer = torch.nn.GRU(self.pre_model.config.hidden_size * 4,
                                        self.pre_model.config.hidden_size * 8,
                                        batch_first = True,
                                        bidirectional = True)
         self.fc_out = torch.nn.Linear( self.pre_model.config.hidden_size * 16 + 1024, 7)
 
     def forward(self, x_tdm, x):
         x = self.pre_model(**x)
         x = x[0].permute(0,2,1) # batch,feature,sequence
         x = self.conv1d_block(x) # batch,feature,sequence
         x = x.permute(0,2,1) # batch,sequence,feature
         _, hn= self.rnn_layer(x) # nlayer, batch, features
         x = hn.permute(1,0,2) # batch,nlayer,features
         x = x.flatten(1) # batch,nlayer x features
         # x[0]: 모든 시점의 히든출력 batch, seq, features
         # x[1]: CLS 토큰의 히든출력 batch, features
         return self.fc_out(torch.cat([x_tdm,x],dim = 1))

In [None]:
def train_loop(dataloader, model, loss_fn, optimizer, device):
    epoch_loss = 0
    model.train() # 학습 모드
    for batch in tqdm(dataloader):
        pred = model( batch["x_tdm"].to(device),batch["x"].to(device) )
        loss = loss_fn( pred, batch["y"].to(device) )

        optimizer.zero_grad()
        loss.backward()
        optimizer.step()

        epoch_loss += loss.item()

    epoch_loss /= len(dataloader)
    return epoch_loss

In [None]:
@torch.no_grad()
def test_loop(dataloader, model, loss_fn, device):
    epoch_loss = 0
    pred_list = []
    act_func = torch.nn.Sigmoid()
    model.eval() # 평가 모드
    for batch in tqdm(dataloader):
        pred = model( batch["x_tdm"].to(device),batch["x"].to(device) )
        if batch.get("y") is not None:
            loss = loss_fn( pred, batch["y"].to(device) )
            epoch_loss += loss.item()

        pred = act_func(pred) # logit 값을 확률로 변환
        pred = pred.to("cpu").numpy() # cpu 이동후 ndarray 로변환
        pred_list.append(pred)

    epoch_loss /= len(dataloader)
    pred = np.concatenate(pred_list)
    return epoch_loss, pred

In [None]:
from sklearn.model_selection import KFold
from sklearn.metrics import accuracy_score
from sklearn.metrics import f1_score

n_splits = 5
cv = KFold(n_splits, shuffle=True, random_state=42)

batch_size = 16 # 배치 사이즈
loss_fn = torch.nn.BCEWithLogitsLoss() # 손실 객체
epochs = 100 # 최대 가능한 에폭수

In [None]:
is_holdout = True
reset_seeds(42) # 재현을 위해 시드고정
best_score_list = []
for i, (tri, vai) in enumerate( cv.split(train_arr) ):
    # 학습용 데이터로더 객체
    train_dt = ReviewDataset(tokenizer, train_tdm[tri],train_arr[tri], target[tri])
    train_dl = torch.utils.data.DataLoader(train_dt, batch_size=batch_size, shuffle=True)

    # 검증용 데이터로더 객체
    valid_dt = ReviewDataset(tokenizer, train_tdm[vai],train_arr[vai], target[vai])
    valid_dl = torch.utils.data.DataLoader(valid_dt, batch_size=batch_size, shuffle=False)

    # 모델 객체와 옵티마이저 객체 생성
    model = Net(model_name).to(device)
    optimizer = torch.optim.Adam( model.parameters(),lr=2e-5)

    best_score = 0 # 현재 최고 점수
    patience = 0 # 조기 종료 조건을 주기 위한 변수
    for epoch in range(epochs):
        train_loss = train_loop(train_dl, model, loss_fn, optimizer, device)
        valid_loss, pred = test_loop(valid_dl, model, loss_fn, device)

        pred = (pred > 0.5).astype(int) # 이진분류 문제에서 클래스 번호 결정
        score = f1_score(target[vai], pred,average = None).mean()

        print(train_loss, valid_loss, score)
        if score > best_score:
            best_score = score # 최고 점수 업데이트
            patience = 0
            torch.save(model.state_dict(), f"model_{i}.pth") # 최고 점수 모델 가중치 저장

        patience += 1
        if patience == 5:
            break

    print(f"{i}번째 폴드 최고 정확도: {best_score}")
    best_score_list.append(best_score)

    if is_holdout:
        break

# 사전학습 모델 + 단어 문서 행렬 (cls token이 존재하지 않는 경우)

In [None]:
class Net(torch.nn.Module):
    def __init__(self, model_name):
        super().__init__()
        self.pre_model = AutoModel.from_pretrained(model_name)
      #  self.conv1d_block = torch.nn.Sequential(
      #      torch.nn.AdaptiveMaxPool1d(1),
      #      # batch,feature,1
      #      torch.nn.Flatten(),
      #      # batch,feature
      #      torch.nn.Dropout(0.2)
      #  )
        self.fc_out = torch.nn.Linear( self.pre_model.config.hidden_size  + 1024, 7)

    def forward(self, x_tdm, x):
        x = self.pre_model(**x)['last_hidden_state']
        x = x.permute(0,2,1)
        # batch,sequence,feature = > batch,feature,sequence로 바꿔준다. 
        x = self.conv1d_block(x)
        # batch,feature
        return self.fc_out(torch.cat([x_tdm,x],dim = 1))

In [None]:
model = Net(model_name)
model(batch["x_tdm"],batch['x'])

In [None]:
def train_loop(dataloader, model, loss_fn, optimizer, device):
    epoch_loss = 0
    model.train() # 학습 모드
    for batch in tqdm(dataloader):
        pred = model( batch["x_tdm"].to(device),batch["x"].to(device) )
        loss = loss_fn( pred, batch["y"].to(device) )

        optimizer.zero_grad()
        loss.backward()
        optimizer.step()

        epoch_loss += loss.item()

    epoch_loss /= len(dataloader)
    return epoch_loss

In [None]:
from sklearn.model_selection import KFold
from sklearn.metrics import accuracy_score
from sklearn.metrics import f1_score

n_splits = 5
cv = KFold(n_splits, shuffle=True, random_state=42)

batch_size = 8 # 배치 사이즈
loss_fn = torch.nn.BCEWithLogitsLoss() # 손실 객체
epochs = 100 # 최대 가능한 에폭수

In [None]:
is_holdout = True
reset_seeds(42) # 재현을 위해 시드고정
best_score_list = []
for i, (tri, vai) in enumerate( cv.split(train_arr) ):
    # 학습용 데이터로더 객체
    train_dt = ReviewDataset(tokenizer, train_tdm[tri],train_arr[tri], target[tri])
    train_dl = torch.utils.data.DataLoader(train_dt, batch_size=batch_size, shuffle=True)

    # 검증용 데이터로더 객체
    valid_dt = ReviewDataset(tokenizer, train_tdm[vai],train_arr[vai], target[vai])
    valid_dl = torch.utils.data.DataLoader(valid_dt, batch_size=batch_size, shuffle=False)

    # 모델 객체와 옵티마이저 객체 생성
    model = Net(model_name).to(device)
    optimizer = torch.optim.Adam( model.parameters(),lr=2e-5)

    best_score = 0 # 현재 최고 점수
    patience = 0 # 조기 종료 조건을 주기 위한 변수
    for epoch in range(epochs):
        train_loss = train_loop(train_dl, model, loss_fn, optimizer, device)
        valid_loss, pred = test_loop(valid_dl, model, loss_fn, device)

        pred = (pred > 0.5).astype(int) # 이진분류 문제에서 클래스 번호 결정
        score = f1_score(target[vai], pred,average = None).mean()

        print(train_loss, valid_loss, score)
        if score > best_score:
            best_score = score # 최고 점수 업데이트
            patience = 0
            torch.save(model.state_dict(), f"model_{i}.pth") # 최고 점수 모델 가중치 저장

        patience += 1
        if patience == 5:
            break

    print(f"{i}번째 폴드 최고 정확도: {best_score}")
    best_score_list.append(best_score)

    if is_holdout:
        break