In [None]:
!pip install transformers

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/


In [None]:
import torch
import random
import numpy as np

# set_seed(42) 
SEED = 42

random.seed(SEED)
np.random.seed(SEED)
torch.manual_seed(SEED)
torch.backends.cudnn.deterministic = True

In [None]:
from transformers import set_seed, AutoTokenizer, AutoConfig, AutoModelForSequenceClassification

model_config = AutoConfig.from_pretrained('tae898/emoberta-large', num_labels=7)
model = AutoModelForSequenceClassification.from_pretrained('tae898/emoberta-large', config=model_config)

tokenizer = AutoTokenizer.from_pretrained('tae898/emoberta-large')
# tokenizer.padding_side = "left" # Very Important
# tokenizer.pad_token = tokenizer.eos_token

model.resize_token_embeddings(len(tokenizer))
model.config.pad_token_id = model.config.eos_token_id

In [None]:
import pandas as pd

pd.read_csv("./data_info.csv", encoding="cp949")

Unnamed: 0,feature,information,type
0,ID,고유번호,object
1,Utterance,발화문,object
2,Speaker,발화자,object
3,Dialogue_ID,Dialogue 구분 번호,int
4,Target,감정,object


In [None]:
import os
import pandas as pd
from torch.utils.data import Dataset

class CustomDataset(Dataset):
    def __init__(self, train=True):
        super().__init__()
        self.train = train
        self.data = pd.read_csv(os.path.join('./', 'train.csv' if train else 'test.csv'))
        if self.train:
          self.data['Target'] = self.data['Target'].replace({'neutral':0, 'surprise':1, 'fear':2, 'sadness':3, 'joy':4, 'disgust':5, 'anger':6})

    def __len__(self):
        return len(self.data)
    
    def __getitem__(self, index):
        record = self.data.iloc[index]       
        text = record['Utterance']
        if self.train:
            return {'text': text, 'label': record['Target']}
        else:
            return {'text': text, 'label': '0'}

train_dataset = CustomDataset(train=True)

In [None]:
class ClassificationCollator(object):
    def __init__(self, tokenizer, max_seq_len=None):
        self.tokenizer = tokenizer
        self.max_seq_len = max_seq_len
        
        return
    
    def __call__(self, sequences):
        texts = [sequence['text'] for sequence in sequences]
        labels = [int(sequence['label']) for sequence in sequences]
        inputs = self.tokenizer(text=texts,
                                return_tensors='pt',
                                padding=True,
                                truncation=True,
                                max_length=self.max_seq_len)
        inputs.update({'labels': torch.tensor(labels)})
        
        return inputs

ClassificationCollator = ClassificationCollator(tokenizer=tokenizer, max_seq_len=512)

In [None]:
from torch.utils.data import DataLoader, random_split

train_size = int(len(train_dataset) * 0.8)
val_size = len(train_dataset) - train_size
train_dataset, val_dataset = random_split(train_dataset, [train_size, val_size])

train_dataloader = DataLoader(dataset=train_dataset,
                              batch_size=16,
                              shuffle=True,
                              collate_fn=ClassificationCollator)

val_dataloader = DataLoader(dataset=val_dataset,
                            batch_size=16,
                            shuffle=False,
                            collate_fn=ClassificationCollator)

In [None]:
from transformers import AdamW, get_cosine_schedule_with_warmup

total_epochs = 1

param_optimizer = list(model.named_parameters())
no_decay = ['bias', 'LayerNorm.bias', 'LayerNorm.weight']
optimizer_grouped_parameters = [
    {'params': [p for n, p in param_optimizer if not any(nd in n for nd in no_decay)], 'weight_decay': 0.01},
    {'params': [p for n, p in param_optimizer if any(nd in n for nd in no_decay)], 'weight_decay': 0.0}
]
optimizer = AdamW(optimizer_grouped_parameters,
                  lr=1e-5,
                  eps=1e-8)

num_train_steps = len(train_dataloader) * total_epochs
num_warmup_steps = int(num_train_steps * 0.1) 

lr_scheduler = get_cosine_schedule_with_warmup(optimizer,
                                               num_warmup_steps=num_warmup_steps,
                                               num_training_steps = num_train_steps)



In [None]:
import torch

def train(dataloader, optimizer, scheduler, device_):
    global model
    model.train()
    
    prediction_labels = []
    true_labels = []
    
    total_loss = []
    
    for batch in dataloader:
        true_labels += batch['labels'].numpy().flatten().tolist()
        batch = {k:v.type(torch.long).to(device_) for k, v in batch.items()}
        
        
        outputs = model(**batch)
        loss, logits = outputs[:2]
        logits = logits.detach().cpu().numpy()
        total_loss.append(loss.item())
        
        optimizer.zero_grad()
        loss.backward()
        torch.nn.utils.clip_grad_norm_(model.parameters(), 1.0) # prevent exploding gradient

        optimizer.step()
        scheduler.step()
        
        prediction_labels += logits.argmax(axis=-1).flatten().tolist()
    
    return true_labels, prediction_labels, total_loss

def validation(dataloader, device_):
    global model
    model.eval()
    
    prediction_labels = []
    true_labels = []
    
    total_loss = []
    
    for batch in dataloader:
        true_labels += batch['labels'].numpy().flatten().tolist()
        batch = {k:v.type(torch.long).to(device_) for k, v in batch.items()}
        
        with torch.no_grad():
            outputs = model(**batch)
            loss, logits = outputs[:2]
            logits = logits.detach().cpu().numpy()
            total_loss.append(loss.item())

            prediction_labels += logits.argmax(axis=-1).flatten().tolist()
        
    return true_labels, prediction_labels, total_loss

In [None]:
from sklearn.metrics import classification_report, accuracy_score

device = 'cuda' if torch.cuda.is_available() else 'cpu'
model.to(device)

all_loss = {'train_loss': [], 'val_loss': []}
all_acc = {'train_acc': [], 'val_acc': []}

for epoch in range(total_epochs):
    y, y_pred, train_loss = train(train_dataloader, optimizer, lr_scheduler, device)
    train_acc = accuracy_score(y, y_pred)
    
    y, y_pred, val_loss = validation(val_dataloader, device)
    val_acc = accuracy_score(y, y_pred)
    
    all_loss['train_loss'] += train_loss
    all_loss['val_loss'] += val_loss
    
    all_acc['train_acc'].append(train_acc)
    all_acc['val_acc'].append(val_acc)
    
    print(f'Epoch: {epoch}, train_loss: {torch.tensor(train_loss).mean():.3f}, train_acc: {train_acc:.3f}, val_loss: {torch.tensor(val_loss).mean():.3f}, val_acc: {val_acc:.3f}') 

In [None]:
model.save_pretrained('./best_model')
tokenizer.save_pretrained("./best_model/")

('/content/drive/MyDrive/[DACON]sentiment_classification/best_model/tokenizer_config.json',
 '/content/drive/MyDrive/[DACON]sentiment_classification/best_model/special_tokens_map.json',
 '/content/drive/MyDrive/[DACON]sentiment_classification/best_model/vocab.json',
 '/content/drive/MyDrive/[DACON]sentiment_classification/best_model/merges.txt',
 '/content/drive/MyDrive/[DACON]sentiment_classification/best_model/added_tokens.json',
 '/content/drive/MyDrive/[DACON]sentiment_classification/best_model/tokenizer.json')

In [None]:
test = pd.read_csv('./test.csv')
test_list = test["Utterance"].values.tolist()

In [None]:
from transformers import AutoTokenizer, AutoModelForSequenceClassification, pipeline

def sentiment_classification(modelname):
    result = list()
    final_labels = list()

    model = modelname

    tokenizer = AutoTokenizer.from_pretrained(model)    
    
    model = AutoModelForSequenceClassification.from_pretrained(model)

    classification = pipeline("sentiment-analysis", model=model, tokenizer=tokenizer, device=0)

    for i in range(len(test_list)):
        print(classification(test_list[i]))
        result.append(classification(test_list[i]))

    for i in range(len(result)):
        for j in range(len(result[i])):
            final_labels.append([result[i][j]['label'], result[i][j]['score']])
    
    return final_labels

In [None]:
inference01 = sentiment_classification("./best_model")

[{'label': 'joy', 'score': 0.995347797870636}]
[{'label': 'neutral', 'score': 0.9997523427009583}]
[{'label': 'neutral', 'score': 0.8349255323410034}]
[{'label': 'neutral', 'score': 0.9976422190666199}]
[{'label': 'sadness', 'score': 0.9960768818855286}]




[{'label': 'sadness', 'score': 0.9960797429084778}]
[{'label': 'sadness', 'score': 0.990605890750885}]
[{'label': 'sadness', 'score': 0.9989331364631653}]
[{'label': 'fear', 'score': 0.9866005778312683}]
[{'label': 'sadness', 'score': 0.9960768818855286}]
[{'label': 'sadness', 'score': 0.9960768818855286}]
[{'label': 'neutral', 'score': 0.9978815913200378}]
[{'label': 'sadness', 'score': 0.9891023635864258}]
[{'label': 'neutral', 'score': 0.7437227368354797}]
[{'label': 'sadness', 'score': 0.9989011287689209}]
[{'label': 'sadness', 'score': 0.7908764481544495}]
[{'label': 'neutral', 'score': 0.9992433786392212}]
[{'label': 'neutral', 'score': 0.5921871662139893}]
[{'label': 'sadness', 'score': 0.9991406202316284}]
[{'label': 'sadness', 'score': 0.9991406202316284}]
[{'label': 'neutral', 'score': 0.9978815913200378}]
[{'label': 'neutral', 'score': 0.5623016357421875}]
[{'label': 'neutral', 'score': 0.9984623193740845}]
[{'label': 'fear', 'score': 0.9959851503372192}]
[{'label': 'neutral

In [None]:
inference02 = sentiment_classification("tae898/emoberta-large")

[{'label': 'neutral', 'score': 0.49061161279678345}]
[{'label': 'neutral', 'score': 0.935129702091217}]
[{'label': 'neutral', 'score': 0.9329481720924377}]
[{'label': 'neutral', 'score': 0.8697671294212341}]
[{'label': 'joy', 'score': 0.7094044089317322}]
[{'label': 'joy', 'score': 0.6961237192153931}]
[{'label': 'joy', 'score': 0.7699287533760071}]
[{'label': 'joy', 'score': 0.9007085561752319}]
[{'label': 'sadness', 'score': 0.7103415727615356}]
[{'label': 'joy', 'score': 0.7094044089317322}]
[{'label': 'joy', 'score': 0.7094044089317322}]
[{'label': 'neutral', 'score': 0.96848464012146}]
[{'label': 'surprise', 'score': 0.45761367678642273}]
[{'label': 'neutral', 'score': 0.7680047154426575}]
[{'label': 'joy', 'score': 0.46369296312332153}]
[{'label': 'joy', 'score': 0.3338625729084015}]
[{'label': 'neutral', 'score': 0.5473726391792297}]
[{'label': 'joy', 'score': 0.390373557806015}]
[{'label': 'joy', 'score': 0.9153652787208557}]
[{'label': 'joy', 'score': 0.9153652787208557}]
[{'l

In [None]:
final_concatenation = list()

for i in range(len(inference01)):
    if max(float(inference01[i][1]), float(inference02[i][1])) == float(inference01[i][1]):
        final_concatenation.append(inference01[i][0])
    elif max(float(inference01[i][1]), float(inference02[i][1])) == float(inference02[i][1]):
        final_concatenation.append(inference02[i][0])
    elif float(inference01[i][0]) == float(inference02[i][0]) :
        final_concatenation.append(inference01[i][0])

In [None]:
submit = pd.read_csv('./sample_submission.csv')
submit['Target'] = final_concatenation
submit.to_csv('./concatenation_submission.csv', index=False)