In [1]:
import random
import pandas as pd
import numpy as np
import os

from sklearn.model_selection import train_test_split
from sklearn.feature_extraction.text import TfidfVectorizer
from sklearn import preprocessing
from sklearn.metrics import f1_score

import torch
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F
from torch.utils.data import Dataset, DataLoader

from tqdm.auto import tqdm

import warnings
warnings.filterwarnings(action='ignore') 

  from .autonotebook import tqdm as notebook_tqdm


In [2]:
device = torch.device('cuda') if torch.cuda.is_available() else torch.device('cpu')

In [3]:
CFG = {
    'EPOCHS':10,
    'LEARNING_RATE':1e-4,
    'BATCH_SIZE':256,
    'SEED':41
}

In [4]:
def seed_everything(seed):
    random.seed(seed)
    os.environ['PYTHONHASHSEED'] = str(seed)
    np.random.seed(seed)
    torch.manual_seed(seed)
    torch.cuda.manual_seed(seed)
    torch.backends.cudnn.deterministic = True
    torch.backends.cudnn.benchmark = True

seed_everything(CFG['SEED']) # Seed 고정

# 데이터 불러오기 및 정리

In [None]:
!unzip open.zip

In [37]:
df=pd.read_csv('train.csv')
df_copy=df.copy()

In [38]:
test=pd.read_csv('test.csv')
test_copy=test.copy()

## 벡터화 및 레이블링

In [39]:
df_type=df[['문장', '유형']]
df_polar=df[['문장', '극성']]
df_tense=df[['문장', '시제']]
df_certainty=df[['문장', '확실성']]

In [30]:
from sklearn.preprocessing import LabelEncoder

In [31]:
def encoding(df, label):
  encoder=LabelEncoder()
  df[label]=encoder.fit_transform(df[label])
  print({category: label for category, label in zip(encoder.classes_, encoder.transform(encoder.classes_))})


In [40]:
encoding(df_type, '유형')
encoding(df_polar, '극성')
encoding(df_tense, '시제')
encoding(df_certainty, '확실성')

{'대화형': 0, '사실형': 1, '예측형': 2, '추론형': 3}
{'긍정': 0, '미정': 1, '부정': 2}
{'과거': 0, '미래': 1, '현재': 2}
{'불확실': 0, '확실': 1}


In [52]:
train_type, val_type, train_type_label, val_type_label = train_test_split(df_type['문장'], df_type['유형'], test_size=0.2, random_state=CFG['SEED'], stratify=df_type['유형'])
train_polar, val_polar, train_polar_label, val_polar_label = train_test_split(df_polar['문장'], df_polar['극성'], test_size=0.2, random_state=CFG['SEED'], stratify=df_polar['극성'])
train_tense, val_tense, train_tense_label, val_tense_label = train_test_split(df_tense['문장'], df_tense['시제'], test_size=0.2, random_state=CFG['SEED'], stratify=df_tense['시제'])
train_certainty, val_certainty, train_certainty_label, val_certainty_label = train_test_split(df_certainty['문장'], df_certainty['확실성'], test_size=0.2, random_state=CFG['SEED'], stratify=df_certainty['확실성'])


In [64]:
train_type_label.reset_index(drop=True, inplace=True)
train_type.reset_index(drop=True, inplace=True)
val_type_label.reset_index(drop=True, inplace=True)
val_type.reset_index(drop=True, inplace=True)

train_polar_label.reset_index(drop=True, inplace=True)
train_polar.reset_index(drop=True, inplace=True)
val_polar_label.reset_index(drop=True, inplace=True)
val_polar.reset_index(drop=True, inplace=True)

train_tense_label.reset_index(drop=True, inplace=True)
train_tense.reset_index(drop=True, inplace=True)
val_tense_label.reset_index(drop=True, inplace=True)
val_tense.reset_index(drop=True, inplace=True)

train_certainty_label.reset_index(drop=True, inplace=True)
train_certainty.reset_index(drop=True, inplace=True)
val_certainty_label.reset_index(drop=True, inplace=True)
val_certainty.reset_index(drop=True, inplace=True)

In [None]:
vectorizer = TfidfVectorizer(min_df = 4, analyzer = 'word', ngram_range=(1, 2))
vectorizer.fit(np.array(df["문장"]))


In [65]:

train_type_vec = vectorizer.transform(train_type)
val_type_vec = vectorizer.transform(val_type)

train_polar_vec = vectorizer.transform(train_polar)
val_polar_vec = vectorizer.transform(val_polar)

train_tense_vec = vectorizer.transform(train_tense)
val_tense_vec = vectorizer.transform(val_tense)

train_certainty_vec = vectorizer.transform(train_certainty)
val_certainty_vec = vectorizer.transform(val_certainty)


In [50]:
test_vec = vectorizer.transform(test["문장"])

print(train_type_vec.shape, val_type_vec.shape, test_vec.shape)

(13232, 12028) (3309, 12028) (7090, 12028)


In [71]:
class CustomDataset(Dataset):
    def __init__(self, st_vec, st_labels):
        self.st_vec = st_vec
        self.st_labels = st_labels

    def __getitem__(self, index):
        st_vector = torch.FloatTensor(self.st_vec[index].toarray()).squeeze(0)
        if self.st_labels is not None:
            st_label = self.st_labels[index]
            return st_vector, st_label
        else:
            return st_vector

    def __len__(self):
        return len(self.st_vec.toarray())

In [90]:
next(iter(train_type_loader))[0]

tensor([[0., 0., 0.,  ..., 0., 0., 0.],
        [0., 0., 0.,  ..., 0., 0., 0.],
        [0., 0., 0.,  ..., 0., 0., 0.],
        ...,
        [0., 0., 0.,  ..., 0., 0., 0.],
        [0., 0., 0.,  ..., 0., 0., 0.],
        [0., 0., 0.,  ..., 0., 0., 0.]])

In [76]:
train_type_dataset = CustomDataset(train_type_vec, train_type_label)
train_type_loader = DataLoader(train_type_dataset, batch_size = CFG['BATCH_SIZE'], shuffle=True, num_workers=0)

val_type_dataset = CustomDataset(val_type_vec, val_type_label)
val_type_loader = DataLoader(val_type_dataset, batch_size = CFG['BATCH_SIZE'], shuffle=False, num_workers=0)

### 베이스모델

In [104]:
class BaseModel(nn.Module):
    def __init__(self, input_dim=9351):
        super(BaseModel, self).__init__()
        self.feature_extract = nn.Sequential(
            nn.Linear(in_features=input_dim, out_features=1024),
            nn.BatchNorm1d(1024),
            nn.LeakyReLU(),
            nn.Linear(in_features=1024, out_features=1024),
            nn.BatchNorm1d(1024),
            nn.LeakyReLU(),
            nn.Linear(in_features=1024, out_features=512),
            nn.BatchNorm1d(512),
            nn.LeakyReLU(),
        )
        self.type_classifier = nn.Sequential(
            nn.Dropout(p=0.3),
            nn.Linear(in_features=512, out_features=4),
        )
        self.polarity_classifier = nn.Sequential(
            nn.Dropout(p=0.3),
            nn.Linear(in_features=512, out_features=3),
        )
        self.tense_classifier = nn.Sequential(
            nn.Dropout(p=0.3),
            nn.Linear(in_features=512, out_features=3),
        )
        self.certainty_classifier = nn.Sequential(
            nn.Dropout(p=0.3),
            nn.Linear(in_features=512, out_features=2),
        )
            
    def forward(self, x):
        x = self.feature_extract(x)
        # 문장 유형, 극성, 시제, 확실성을 각각 분류
        type_output = self.type_classifier(x)

        return type_output

In [108]:
BaseModel(next(iter(train_type_loader))[0].to(device))

TypeError: empty(): argument 'size' must be tuple of ints, but found element of type Tensor at pos 2

In [107]:
def train(model, optimizer, train_loader, val_loader, scheduler, device, loader):
    model.to(device)
    
    criterion = {
        'type' : nn.CrossEntropyLoss().to(device),
        'polarity' : nn.CrossEntropyLoss().to(device),
        'tense' : nn.CrossEntropyLoss().to(device),
        'certainty' : nn.CrossEntropyLoss().to(device)
    }
    
    best_loss = 999999
    best_model = None
    
    for epoch in range(1, CFG['EPOCHS']+1):
        model.train()
        train_loss = []
        for sentence, label in tqdm(iter(loader)):
            sentence = sentence.to(device)
            label = label.to(device)
            
            optimizer.zero_grad()
            
            logit = model(sentence)
            
            loss = criterion['type'](type_logit, type_label)
            
            loss.backward()
            optimizer.step()
            
            train_loss.append(loss.item())
        
        val_type_f1 = validation(model, val_loader, criterion, device)
        print(f'Epoch : [{epoch}] Train Loss : [{np.mean(train_loss):.5f}] Val Loss : [{val_loss:.5f}] 유형 F1 : [{val_type_f1:.5f}]')
        
        if scheduler is not None:
            scheduler.step(val_loss)
            
        if best_loss > val_loss:
            best_loss = val_loss
            best_model = model
            
    return best_model

In [102]:
def validation(model, val_loader, criterion, device):
    model.eval()
    val_loss = []
    
    type_preds, polarity_preds, tense_preds, certainty_preds = [], [], [], []
    type_labels, polarity_labels, tense_labels, certainty_labels = [], [], [], []
    
    
    with torch.no_grad():
        for sentence, label in tqdm(iter(val_loader)):
            sentence = sentence.to(device)
            label = type_label.to(device)

            
            logit = model(sentence)
            
            loss = criterion['type'](type_logit, type_label)

            
            type_preds += type_logit.argmax(1).detach().cpu().numpy().tolist()
            type_labels += type_label.detach().cpu().numpy().tolist()
            
    type_f1 = f1_score(type_labels, type_preds, average='weighted')

    
    return type_f1

In [103]:
model = BaseModel()
model.eval()
optimizer = torch.optim.Adam(params = model.parameters(), lr = CFG["LEARNING_RATE"])
scheduler = torch.optim.lr_scheduler.ReduceLROnPlateau(optimizer, mode='min', factor=0.5, patience=2,threshold_mode='abs',min_lr=1e-8, verbose=True)

infer_model = train(model, optimizer, train_loader, val_loader, scheduler, device)

TypeError: train() missing 1 required positional argument: 'loader'

In [None]:
test_dataset = CustomDataset(test_vec, None)
test_loader = DataLoader(test_dataset, batch_size=CFG['BATCH_SIZE'], shuffle=False, num_workers=0)

In [None]:
def inference(model, test_loader, device):
    model.to(device)
    model.eval()
    
    type_preds, polarity_preds, tense_preds, certainty_preds = [], [], [], []
    
    with torch.no_grad():
        for sentence in tqdm(test_loader):
            sentence = sentence.to(device)
            
            type_logit, polarity_logit, tense_logit, certainty_logit = model(sentence)
            
            type_preds += type_logit.argmax(1).detach().cpu().numpy().tolist()
            polarity_preds += polarity_logit.argmax(1).detach().cpu().numpy().tolist()
            tense_preds += tense_logit.argmax(1).detach().cpu().numpy().tolist()
            certainty_preds += certainty_logit.argmax(1).detach().cpu().numpy().tolist()
            
    return type_preds, polarity_preds, tense_preds, certainty_preds

In [None]:
type_preds, polarity_preds, tense_preds, certainty_preds = inference(model, test_loader, device)

In [None]:
type_preds = type_le.inverse_transform(type_preds)
polarity_preds = polarity_le.inverse_transform(polarity_preds)
tense_preds = tense_le.inverse_transform(tense_preds)
certainty_preds = certainty_le.inverse_transform(certainty_preds)

In [None]:
predictions = []
for type_pred, polarity_pred, tense_pred, certainty_pred in zip(type_preds, polarity_preds, tense_preds, certainty_preds):
    predictions.append(type_pred+'-'+polarity_pred+'-'+tense_pred+'-'+certainty_pred)