In [None]:
!pip install pandas
!pip install numpy
!pip install tqdm
!pip install torch
!pip install transformers
!pip install sklearn

In [29]:
import pandas as pd
import numpy as np
from tqdm.auto import tqdm as tqdm_auto
import copy

import torch
import torch.nn as nn

from transformers import AutoModel, BertTokenizerFast
from sklearn.metrics import accuracy_score, f1_score



In [30]:
device = 'cuda' if torch.cuda.is_available() else 'cpu'


In [31]:
#무작위성 제한
torch.manual_seed(42)
torch.backends.cudnn.deterministic = True
torch.backends.cudnn.benchmark = False

In [33]:
train_data= pd.read_pickle("train_data.pickle")
test_data= pd.read_pickle("test_data.pickle")

<h1>Batch CNN 사용</h1>

In [None]:
#fine-tuned 모델은 첨부되어 있음
num_label = 7
path="JKKANG/ALBERT-kor-emotion"
albert = AutoModel.from_pretrained(path, num_labels= num_label).to(device)
tokenizer = BertTokenizerFast.from_pretrained(path)

In [34]:
#batch 생성을 위한 
def dataset(data):
    save=[]
    k=data['Segment ID'][0][0:15]
    for i,seg in enumerate(data['Segment ID']):

        if seg[7:13]=='script':
            seg=seg[0:15]
            if  k==seg:
                k=seg
                continue
            else: 
                k=seg
                save.append(i)
        else:
            seg=seg[0:14]
            if  k==seg: 
                k=seg
                continue
            else: 
                k=seg
                save.append(i)
            
    return save


#train data
train_load=dataset(train_data)

train_dataset=[]
start=0
for i in train_load:
    
    train_dataset.append(train_data.loc[start:i])
    start=i
train_dataset.append(train_data.loc[start:])


#test data
test_load=dataset(test_data)

test_dataset=[]
start=0
for i in test_load:
    
    test_dataset.append(test_data.loc[start:i])
    start=i
test_dataset.append(test_data.loc[start:])

In [35]:
#batch CNN 사용


#Model
class MY_Model(nn.Module):
    def __init__(self):
        super(MY_Model, self).__init__()
 
        
        self.feature= nn.Sequential(
            nn.Conv1d(768, 768,kernel_size=3, padding=1))
        
        self.audio= nn.Sequential(
            nn.Linear(88,44),
            nn.Linear(44, 22),
            nn.Linear(22,7))
        
        self.feature_final= nn.Sequential(
            nn.Linear(775, 775),
            nn.ReLU(),
            nn.Linear(775, 500),
            nn.ReLU(),
            nn.Linear(500, 200),
            nn.ReLU(),
            nn.Linear(200, 100),
            nn.ReLU(),
            nn.Linear(100, 7))
        

        
    def forward(self, audio_data, text_data, albert, tokenizer):
        
        text_data= tokenizer(text_data,truncation=True,padding=True,return_token_type_ids=False, return_tensors="pt")
        
        with torch.no_grad(): #albert는 fine-tuning 한 모델을 가져다 씀 
            text_data = {k:v.to(device) for k,v in text_data.items()}
            out_text= albert(**text_data)    
           
        out_text=out_text.last_hidden_state

            
        out=out_text[:,0]     
        out1=torch.transpose(out,0,1)
        out1=self.feature(out1)
        out1= torch.transpose(out1,0,1)
        
        audio= self.audio(audio_data)
        out2= torch.concat([out1,audio],dim=1)
        
        final= self.feature_final(out2)   
        
            
        return final
        





In [36]:
model = MY_Model().to(device)

In [37]:
# Training

optimizer= torch.optim.AdamW(model.parameters(), lr=0.001, amsgrad=True )
loss = nn.CrossEntropyLoss().to(device)

epochs = 10
loss_history = []
accuracy_history=[]
best = {"acc": 0}

for epoch in tqdm_auto(range(epochs)):
    epoch_loss=0
    for i in train_dataset:
        text_data=list(i['text_data'])
        audio_data=torch.tensor(i['audio_data'].reset_index(drop=True),dtype=torch.float32).to(device)
        label= torch.tensor(i['label'].reset_index(drop=True),dtype=torch.long).to(device)
        
        # model 계산
        preds = model(audio_data, text_data,  albert, tokenizer)
        
        # cost 계산
        
        
        cost= loss(preds, label)
        optimizer.zero_grad()
        cost.backward()
        optimizer.step()

        epoch_loss+=cost.item()
    
    
    epoch_loss=epoch_loss/len(train_load)
    loss_history.append(epoch_loss)
    
    
    print(f'---------{epoch+1}---------')
    print(f'loss : {epoch_loss}')
    
    
    
    corrects=torch.tensor([]).to(device)
    preds=torch.tensor([]).to(device)
    target=torch.tensor([]).to(device)
    
    model.eval()
    with torch.no_grad():
        
        for i in test_dataset:
            text_v=list(i['text_data'])
            audio_v=torch.tensor(i['audio_data'].reset_index(drop=True),dtype=torch.float32).to(device)
            label_v= torch.tensor(i['label'].reset_index(drop=True),dtype=torch.long).to(device)
            
            out_v= model(audio_v, text_v, albert, tokenizer)
            result_v= torch.argmax(out_v, dim=1)
            correct_v= label_v==result_v
            
            corrects= torch.cat([corrects,correct_v],dim=0)
            preds=torch.cat([preds,result_v],dim=0)
            target=torch.cat([target, label_v],dim=0)
    accuracy = corrects.sum().item() / len(corrects)
    accuracy_history.append(accuracy)
    if accuracy > best["acc"]:
        
        best["state"] = copy.deepcopy(model)
        best["acc"] = accuracy
        best["epoch"] = epoch + 1
    
    model.train()
    

    
    
    print(f"val acc:{accuracy}")
        
        

  0%|          | 0/10 [00:00<?, ?it/s]

---------1---------
loss : 0.47856827569705185
val acc:0.7427341227125942
---------2---------
loss : 0.2476697007150116
val acc:0.7481162540365985
---------3---------
loss : 0.1908617793326357
val acc:0.7502691065662002


KeyboardInterrupt: 

In [None]:
#모델 저장
torch.save(best['state'],'model_BatchCNN.pt')
model2= torch.load('model_BatchCNN.pt')

In [None]:
##성능 평가
preds=torch.tensor([]).to(device)
target=torch.tensor([]).to(device)
    
model2.eval()
with torch.no_grad():
        
    for i in test_dataset:
        text_t=list(i['text_data'])
        audio_t=torch.tensor(i['audio_data'].reset_index(drop=True),dtype=torch.float32).to(device)
        label_t= torch.tensor(i['label'].reset_index(drop=True),dtype=torch.long).to(device)
            
        out_t= model2(audio_t, text_t, albert, tokenizer)
        result_t= torch.argmax(out_t, dim=1)
        
        preds=torch.cat([preds,result_t],dim=0)
        target=torch.cat([target, label_t],dim=0)
    
preds.int()
target.int()



def compute_metrics(preds, target):
    labels= target.tolist()
    preds = preds.tolist()
    macro_f1 = f1_score(labels, preds, average='macro')
    weighted_f1 = f1_score(labels, preds, average='weighted')
    acc = accuracy_score(labels, preds)
    return {"accuracy": acc , "weighted f1": weighted_f1, "macro f1": macro_f1}

compute_metrics(preds, target)

_____________________________________

<h1>Batch CNN 사용 안함</h1>

In [None]:

#batch CNN 사용


#Model
class MY_Model(nn.Module):
    def __init__(self):
        super(MY_Model, self).__init__()

        
        self.audio= nn.Sequential(
            nn.Linear(88,44),
            nn.Linear(44, 22),
            nn.Linear(22,7))
        
        self.feature_final= nn.Sequential(
            nn.Linear(775, 775),
            nn.ReLU(),
            nn.Linear(775, 500),
            nn.ReLU(),
            nn.Linear(500, 200),
            nn.ReLU(),
            nn.Linear(200, 100),
            nn.ReLU(),
            nn.Linear(100, 7))
        

        
    def forward(self, audio_data, text_data, albert, tokenizer):
        
        text_data= tokenizer(text_data,truncation=True,padding=True,return_token_type_ids=False, return_tensors="pt")
        
        with torch.no_grad(): #albert는 fine-tuning 한 모델을 가져다 씀 
            text_data = {k:v.to(device) for k,v in text_data.items()}
            out_text= albert(**text_data)    
           
        out_text=out_text.last_hidden_state

            
        out=out_text[:,0]     
        
        
        audio= self.audio(audio_data)
        out2= torch.concat([out,audio],dim=1)
        
        final= self.feature_final(out2)   
        
            
        return final

In [None]:
model = MY_Model().to(device)

In [None]:
# Training

optimizer= torch.optim.AdamW(model.parameters(), lr=0.001, amsgrad=True )
loss = nn.CrossEntropyLoss().to(device)

epochs = 10
loss_history = []
accuracy_history=[]
best = {"acc": 0}

for epoch in tqdm_auto(range(epochs)):
    epoch_loss=0
    for i in train_dataset:
        text_data=list(i['text_data'])
        audio_data=torch.tensor(i['audio_data'].reset_index(drop=True),dtype=torch.float32).to(device)
        label= torch.tensor(i['label'].reset_index(drop=True),dtype=torch.long).to(device)
        
        # model 계산
        preds = model(audio_data, text_data,  albert, tokenizer)
        
        # cost 계산
        
        
        cost= loss(preds, label)
        optimizer.zero_grad()
        cost.backward()
        optimizer.step()

        epoch_loss+=cost.item()
    
    
    epoch_loss=epoch_loss/len(train_load)
    loss_history.append(epoch_loss)
    
    
    print(f'---------{epoch+1}---------')
    print(f'loss : {epoch_loss}')
    
    
    
    corrects=torch.tensor([]).to(device)
    preds=torch.tensor([]).to(device)
    target=torch.tensor([]).to(device)
    
    model.eval()
    with torch.no_grad():
        
        for i in test_dataset:
            text_v=list(i['text_data'])
            audio_v=torch.tensor(i['audio_data'].reset_index(drop=True),dtype=torch.float32).to(device)
            label_v= torch.tensor(i['label'].reset_index(drop=True),dtype=torch.long).to(device)
            
            out_v= model(audio_v, text_v, albert, tokenizer)
            result_v= torch.argmax(out_v, dim=1)
            correct_v= label_v==result_v
            
            corrects= torch.cat([corrects,correct_v],dim=0)
            preds=torch.cat([preds,result_v],dim=0)
            target=torch.cat([target, label_v],dim=0)
    accuracy = corrects.sum().item() / len(corrects)
    accuracy_history.append(accuracy)
    if accuracy > best["acc"]:
        
        best["state"] = copy.deepcopy(model)
        best["acc"] = accuracy
        best["epoch"] = epoch + 1
    
    model.train()
    

    
    
    print(f"val acc:{accuracy}")
        

  0%|          | 0/30 [00:00<?, ?it/s]

---------1---------
loss : 0.38870653388322896
val acc:0.7376715810879512
---------2---------
loss : 0.18953997458803518
val acc:0.733604473817997
---------3---------
loss : 0.17370416933753116
val acc:0.735638027452974
---------4---------
loss : 0.1693848330544491
val acc:0.7397051347229283
---------5---------
loss : 0.15834349870060593
val acc:0.7397051347229283
---------6---------
loss : 0.15436347905384668
val acc:0.7391967463141841
---------7---------
loss : 0.14526111164006442
val acc:0.7361464158617184
---------8---------
loss : 0.1451472968286206
val acc:0.730045754956787
---------9---------
loss : 0.14162354969832255
val acc:0.7397051347229283
---------10---------
loss : 0.1328808819827948
val acc:0.7366548042704626
---------11---------
loss : 0.1317584620351076
val acc:0.7346212506354856
---------12---------
loss : 0.1248557152680675
val acc:0.7325876970005084
---------13---------
loss : 0.12648422254003816
val acc:0.7305541433655313
---------14---------
loss : 0.126051114275