# import

In [1]:
import torch
from transformers import AutoTokenizer, AutoModel
import torch.nn as nn
import matplotlib.pyplot as plt
from torch.utils.data import Dataset, DataLoader
from torch.utils.data import TensorDataset
import numpy as np
import pandas as pd
import pytorch_lightning as pl
import kss
from torch.optim.lr_scheduler import LambdaLR
import torch.optim as optim
import torch.nn.functional as F
from sklearn.manifold import TSNE
import matplotlib.pyplot as plt
from sklearn.decomposition import PCA
import math
from sklearn.model_selection import train_test_split
from torch.optim import Adam

In [2]:
device = torch.device('mps:0' if torch.backends.mps.is_available() else 'cpu')

In [3]:
df = pd.read_json('./data/전세사기_라벨링.json')
df

Unnamed: 0,docs,vecs,label
0,"[이르면 연말부터 전세사기 등 부동산 의심거래 AI로 잡는다, 이르면 연말쯤 전세사...",,0
1,"[[영상] 경찰, 경기 동탄 전세사기 관련 임대인 등 압수수색, 경기 화성 동탄신도...",,0
2,"['전세사기 방지' 세입자 대응 강화…집주인 체납 등 정보 공개, 급격히 늘고 있는...",,0
3,"[올해 1분기 전국 전월세 갱신 4건 중 1건 '감액계약', 기존 계약보다 더 낮은...",,1
4,"[""보증금 돌려달라"" 구제신청 역대 최대…전세사기 '경고등', [앵커] 최근 40대...",,1
...,...,...,...
1082,"[""집주인 밀린 세금 있나""... 임차인 '납세증명' 요구권, 법으로 보장, 체납·...",,0
1083,"[경찰, 화성 동탄 오피스텔 전세사기 임대인 등 압수수색, 오전 10시 30분부터 ...",,0
1084,"[""전세금 못 받아 새 집 계약금 날릴 판""... 아파트 60% '비상', 작년 2...",,1
1085,"[尹, 마이크 잡고 회의 주도...생방송 예정보다 1시간 넘겨, ""윤석열이라는 사람...",,0


# DataSet & Data Loader

In [42]:
X_train, X_test, y_train, y_test = train_test_split(df.iloc[:,0].tolist(), df.iloc[:,1].tolist(), test_size=0.2, random_state=7, stratify=df.iloc[:,1])

In [43]:
X_train, X_val, y_train, y_val = train_test_split(X_train, y_train, test_size=0.2, random_state=7, stratify=y_train)

## Old Ver.

In [None]:
class MyDataset(Dataset):
  def __init__(self, data, label):
    super().__init__()
    self.docs = data
    self.label = label

  def __len__(self):
      return len(self.docs)

  def __getitem__(self, index):
    news = self.docs[index]
    target = self.label[index]
    return news, target

train_data = MyDataset(data=X_train, label=y_train)
val_data = MyDataset(data=X_val, label=y_val)
test_data = MyDataset(data=X_test, label=y_test)

batch_size = 32
train_loader = DataLoader(train_data, batch_size=batch_size, shuffle=True, drop_last=True, collate_fn=lambda x: tuple(zip(*x)))
val_loader = DataLoader(val_data, batch_size=batch_size, shuffle=True, drop_last=True, collate_fn=lambda x: tuple(zip(*x)))
test_loader = DataLoader(test_data, batch_size=batch_size, shuffle=True, drop_last=True, collate_fn=lambda x: tuple(zip(*x)))

## New Ver.

In [4]:
class MyDataset(Dataset):
  def __init__(self, file_path):
    super().__init__()
    self.tokenizer = AutoTokenizer.from_pretrained("snunlp/KR-Medium", do_lower_case=False)
    self.bert = AutoModel.from_pretrained("snunlp/KR-Medium")
    for param in self.bert.parameters():
        param.requires_grad = False
        
    df = pd.read_json(file_path)
    for idx in range(len(df)) :
      tokenized = self.tokenizer(df.iloc[idx,0], padding='longest', return_tensors='pt')
      contextualized_sentences = self.bert(**tokenized)
      sentence_embeddings = contextualized_sentences.pooler_output
      df.iat[idx,1] = sentence_embeddings
    self.vecs = df['vecs']
    self.label = df['label']

  def __len__(self):
      return len(self.vecs)

  def __getitem__(self, index):
    docs = self.vecs[index]
    label = self.label[index]
    return docs, label

In [5]:
train_data = MyDataset('data/전세사기_라벨링.json')
train_loader = DataLoader(train_data, batch_size=32, shuffle=True, drop_last=True, collate_fn=lambda x: tuple(zip(*x)))

In [254]:
# TEST
# sentence_embeddings = j[0]
# max_len = max([e.size(0) for e in sentence_embeddings])
# padded_embeddings = torch.zeros(len(sentence_embeddings), max_len, sentence_embeddings[0].size(1))
# for i, emb in enumerate(sentence_embeddings):
#     seq_len = emb.size(0)
#     padded_embeddings[i, :seq_len, :] = emb
    
# random_tensor = torch.randn(padded_embeddings.size(0), 1, padded_embeddings.size(2))
# batch_tensor = torch.cat((random_tensor, padded_embeddings), dim=1)
# batch_t = batch_tensor.permute(1 ,0 ,2).float()

# padding_mask = batch_t.sum(dim=-1).permute(1 ,0) == 0
# title_level = nn.TransformerEncoder(nn.TransformerEncoderLayer(d_model=768, nhead=1),num_layers=1)
# output_batch = title_level(batch_t.float(), src_key_padding_mask=padding_mask)
# output_batch = output_batch.permute(1 ,0 ,2)

# Model Structure

## Old Ver.

In [45]:
# for문 Ver.
class Model(nn.Module):
  def __init__(self, num_classes, input_dim, num_heads, num_layers):
    super().__init__()
    self.fc = nn.Linear(input_dim, num_classes)
    self.tokenizer = AutoTokenizer.from_pretrained("snunlp/KR-Medium", do_lower_case=False)
    self.bert = AutoModel.from_pretrained("snunlp/KR-Medium")
    for param in self.bert.parameters():
        param.requires_grad = False
    self.title_level = nn.TransformerEncoder(
      nn.TransformerEncoderLayer(d_model=input_dim, nhead=num_heads),
      num_layers=num_layers)
    self.sentecne_level = nn.TransformerEncoder(
      nn.TransformerEncoderLayer(d_model=input_dim, nhead=num_heads),
      num_layers=num_layers)

  def forward(self, data):
    doc_ems = torch.empty((len(data), 768)).to(device)
    for num, doc in enumerate(data):
      tokenized = self.tokenizer(doc, padding='longest', return_tensors='pt').to(device)
      with torch.no_grad():
        contextualized_sentences = self.bert(**tokenized)
      sentence_embeddings = contextualized_sentences.pooler_output
      title = sentence_embeddings[0, :]
      transformer_output = self.title_level(sentence_embeddings)[1:, :]
      combined_input = torch.cat((title.unsqueeze(1).view(1, 768), transformer_output), dim=0)
      doc_ems[num] = self.sentecne_level(combined_input)[0]
    out = self.fc(doc_ems)
    predicted_label = torch.argmax(out).to(device)
    return out, predicted_label

## New Batch Ver.

In [6]:
class Doc_Encoder(nn.Module):
    def __init__(self, num_classes, num_heads, num_layers):
        super().__init__()
        self.fc = nn.Linear(768, num_classes)
        self.sentecne_level = nn.TransformerEncoder(
          nn.TransformerEncoderLayer(d_model=768, nhead=num_heads),
          num_layers=num_layers)

    def forward(self, batch):
        max_len = max([e.size(0) for e in batch])
        padded_embeddings = torch.zeros(len(batch), max_len, batch[0].size(1)).to(device)
        for i, emb in enumerate(batch):
            seq_len = emb.size(0)
            padded_embeddings[i, :seq_len, :] = emb
        random_tensor = torch.randn(padded_embeddings.size(0), 1, padded_embeddings.size(2)).to(device)
        batch_tensor = torch.cat((random_tensor, padded_embeddings), dim=1)
        batch_tensor = batch_tensor.permute(1 ,0 ,2).float()
        padding_mask = batch_tensor.sum(dim=-1).permute(1 ,0) == 0
        output_batch = self.sentecne_level(batch_tensor.float(), src_key_padding_mask=padding_mask)
        output_batch = output_batch.permute(1 ,0 ,2)
        doc_vecs = output_batch[:,0,:]
        return doc_vecs

In [7]:
class Model(nn.Module) :
    def __init__(self, num_classes, num_heads, num_layers) :
        super().__init__()
        self.fc = nn.Linear(768, num_classes)
        self.encoder = Doc_Encoder(num_classes, num_heads, num_layers)
    
    def forward(self, batch) :
        doc_vecs = self.encoder(batch)
        out = self.fc(doc_vecs)
        return out

# Parameter & Schedular

In [8]:
num_epochs = 1
num_classes = 2
input_dim = 768
num_heads = 2
num_layers = 2
model = Model(num_classes, num_heads, num_layers).to(device)
criterion = nn.CrossEntropyLoss().to(device)
optimizer = torch.optim.Adam(model.parameters(), lr=0.001, weight_decay=0.001)

# Training

In [248]:
def train(model, dataloader):
    model.train()
    total_loss = 0.0

    for batch_data, target in dataloader:
        batch_data, target = batch_data.to(device), target.to(device)
        optimizer.zero_grad()
        outputs = model(batch_data)
        loss = criterion(outputs, target)
        loss.backward()
        optimizer.step()
        total_loss += loss.item()
    print()

In [9]:
model.train()  # 모델을 훈련 모드로 설정

for batch_idx, (data, target) in enumerate(train_loader):
    data = tuple(d.to(device) for d in data)
    target = torch.tensor(target).long().to(device)
    optimizer.zero_grad()
    output = model(data) 
    loss = criterion(output, target)
    print(f'Batch {batch_idx+1}, Loss: {loss.item()}')
    loss.backward()
    optimizer.step()

Batch 1, Loss: 0.7501271367073059
Batch 2, Loss: 1.423123836517334
Batch 3, Loss: 6.371459007263184
Batch 4, Loss: 2.942178726196289
Batch 5, Loss: 2.4286482334136963
Batch 6, Loss: 2.924318790435791
Batch 7, Loss: 1.4806333780288696
Batch 8, Loss: 0.730457067489624
Batch 9, Loss: 1.3733482360839844
Batch 10, Loss: 0.5560861229896545
Batch 11, Loss: 0.49686524271965027
Batch 12, Loss: 0.6685193181037903
Batch 13, Loss: 0.7135567665100098
Batch 14, Loss: 0.7412486672401428
Batch 15, Loss: 0.764586329460144
Batch 16, Loss: 0.6955113410949707
Batch 17, Loss: 0.6259647607803345
Batch 18, Loss: 0.6809948682785034
Batch 19, Loss: 0.8341611623764038
Batch 20, Loss: 0.6566769480705261
Batch 21, Loss: 0.6850721836090088
Batch 22, Loss: 0.6530672907829285
Batch 23, Loss: 0.6493446826934814
Batch 24, Loss: 0.7069054245948792
Batch 25, Loss: 0.6934828162193298
Batch 26, Loss: 0.718299150466919
Batch 27, Loss: 0.6881782412528992
Batch 28, Loss: 0.7267142534255981
Batch 29, Loss: 0.7038354277610779


# Test

In [12]:
device = 'cpu'
model = model.to(device)
total_correct = 0
total_samples = 0

with torch.no_grad():
    for batch_data, target in train_loader:
        target = torch.tensor(target).long()
        target = target.to(device)

        out = model(batch_data)
        predicted = torch.argmax(out, dim=1)
        total_samples += target.size(0)
        total_correct += (predicted == target).sum().item()

accuracy = total_correct / total_samples

In [13]:
print(accuracy)

0.6117424242424242


# few-shot learning code

In [None]:
class few_show_Model(nn.Module):
    def __init__(self):
        self.encoder = Doc_Encoder(768,1,1)
        self.fc_layer = nn.Linear(768,768)
        self.similarity = F.cosine_similarity()
        self.sigmoid = nn.Sigmoid()
        
    def forward(self, doc_pair):
        doc_vecs = self.encoder(doc_pair)
        doc_vecs = self.fc_layer(doc_pair)
        similarity = self.similarity(doc_vecs)
        out = self.sigmoid(similarity)
        return out

In [None]:
# 같은 class 내 2개씩 pair & 서로 다른 class 1개씩 pair해서 모두 훈련