#필요한 패키지 다운

In [None]:
!pip install torchmetrics
!pip install imbalanced-learn

# 데이터 불러오기

In [None]:
import numpy as  np
import pandas as pd
import pickle
import torch

train = pd.read_pickle('./dataset/KEMDy20_v1_1/sensor/bio_train/train_origin.pkl')
test = pd.read_pickle('./dataset/KEMDy20_v1_1/sensor/bio_train/test_origin.pkl')

# padding 및 SMOTE 진행한 데이터 불러오기
temp_x = np.load('./dataset/KEMDy20_v1_1/sensor/bio_train/temp_x.npy')
eda_x = np.load('./dataset/KEMDy20_v1_1/sensor/bio_train/eda_x.npy')
t_y = np.load('./dataset/KEMDy20_v1_1/sensor/bio_train/t_y.npy')

증강한 train 데이터 로더에 필요한 모듈 정의

In [None]:
import torch
import pandas as pd
from torch import nn
from torch import optim
from torch.utils.data import Dataset, DataLoader

class SmoteCustomDataset(Dataset):
    def __init__(self, temp , eda, y):

        self.temp_ten = torch.Tensor(temp)
        self.eda_ten = torch.Tensor(eda)
        
        self.length = len(y)
        self.t_y = torch.LongTensor(y)

    def __getitem__(self, index):
        t = self.temp_ten[index]
        e = self.eda_ten[index]
        y = self.t_y[index]
        return t,e,y

    def __len__(self):
        return self.length

#BIO Sensor Data 모델 선언

In [None]:
## 모델 선언

import torch
import torch.nn.init

device = 'cuda' if torch.cuda.is_available() else 'cpu'

class bio_CNN(torch.nn.Module): 
  def __init__(self): 
    super(bio_CNN, self).__init__() 

    self.conv1 = torch.nn.Sequential(
        torch.nn.Conv1d(in_channels=1, out_channels=4, kernel_size=3, stride=1), 
        torch.nn.BatchNorm1d(4),
        torch.nn.ReLU(),
    )

    self.conv2 = torch.nn.Sequential(
        torch.nn.Conv1d(in_channels=4, out_channels=8, kernel_size=3, stride=1),
        torch.nn.BatchNorm1d(8),
        torch.nn.ReLU(),
    )

    self.conv3 = torch.nn.Sequential(
        torch.nn.Conv1d(in_channels=8, out_channels=12, kernel_size=3, stride=1),
        torch.nn.BatchNorm1d(12),
        torch.nn.ReLU(),
    )

    self.conv4 = torch.nn.Sequential(
        torch.nn.Conv1d(in_channels=12, out_channels=16, kernel_size=3, stride=1),
        torch.nn.BatchNorm1d(16),
        torch.nn.ReLU(),
    )


    self.fc1 = torch.nn.Sequential(
        torch.nn.Linear(in_features=133*32, out_features=1000, bias=True),
        torch.nn.ReLU(),
        torch.nn.Linear(in_features=1000, out_features=500, bias=True),
        torch.nn.ReLU(),
        torch.nn.Linear(in_features=500, out_features=128, bias=True),
    )

    self.fc2 = torch.nn.Sequential(
        torch.nn.Linear(in_features=128, out_features=7, bias=True),
    )

    self.relu = torch.nn.ReLU()



  def forward(self, t, e):
    temp, eda = t, e
    temp = torch.reshape(temp,(-1,1,141))
    eda = torch.reshape(eda,(-1,1,141))

    out_t = self.conv1(temp)
    out_t = self.conv2(out_t)
    out_t = self.conv3(out_t)
    out_t = self.conv4(out_t)

    out_e = self.conv1(eda)  
    out_e = self.conv2(out_e)
    out_e = self.conv3(out_e)
    out_e = self.conv4(out_e)

    out = torch.cat([out_t,out_e], dim=1)
    out = torch.flatten(out,1)

    out = self.fc1(out)
    result = self.fc2(self.relu(out))

    return result

모델 선언 및 최적화 함수 설정

In [None]:
device = 'cuda' if torch.cuda.is_available() else 'cpu'
model = bio_CNN().to(device)
loss_fn = torch.nn.CrossEntropyLoss().to(device) 
optimizer = torch.optim.Adam(params=model.parameters(), lr=0.001, weight_decay=2e-4)

In [None]:
train_dataset = SmoteCustomDataset(temp_x, eda_x ,t_y)
train_dataloader = DataLoader(train_dataset, batch_size=64, shuffle=True, drop_last=False)

#모델학습

In [None]:
from torchmetrics import F1Score, Accuracy

model.train()

for epoch in range(100):
    cost = 0.0
    labels_list = []
    predictions_list = []

    for t, e, y in train_dataloader:
        t = t.to(device)
        e = e.to(device)
        y = y.to(device)

        output = model(t, e)
        loss = loss_fn(output, y)
        labels_list.extend(y)
        predictions = torch.max(output,1)[1].to(device)
        predictions_list.extend(predictions)

        optimizer.zero_grad()
        loss.backward()
        optimizer.step()

        cost += loss
    cost = cost / len(train_dataloader)
    predictions_list = torch.tensor(predictions_list)
    labels_list = torch.tensor(labels_list)

    f1 = F1Score(task='multiclass', num_classes=7)
    f1_score = f1(predictions_list, labels_list)
    auc = Accuracy(task='multiclass', num_classes=7)
    auc_score = auc(predictions_list, labels_list)

    if (epoch + 1) % 10 == 0:
        print(f"Epoch : {epoch+1:4d}, Cost : {cost:.3f}, F1 : {f1_score:.3f}, Auc: {auc_score:.3f}")

모델 F1 socre 및 Accuracy 확인

In [None]:
from torchmetrics import F1Score, Accuracy
import torch
from tqdm import tqdm

labels_list = []
predictions_list = []

with torch.no_grad():
  for t, e, y in tqdm(train_dataloader):
      t = t.to(device)
      e = e.to(device)
      y = y.to(device)

      output = model(t,e)

      labels_list.extend(y)
      predictions = torch.max(output,1)[1].to(device)
      predictions_list.extend(predictions)

  predictions_list = torch.tensor(predictions_list)
  labels_list = torch.tensor(labels_list)

  f1 = F1Score(task='multiclass', num_classes=7)
  f1_score = f1(predictions_list, labels_list)
  auc = Accuracy(task='multiclass', num_classes=7)
  auc_score = auc(predictions_list, labels_list)

  print('f1:',f1_score, 'auc:',auc_score)

Class별 예측 값 개수 확인

In [None]:
label_dic = {0:'angry', 1:'disqust',2:'fear', 3:'happy', 4:'neutral', 5:'sad', 6:'surprise'}
label_origin = [label_dic[x] for x in labels_list.tolist()]
pred_origin = [label_dic[x] for x in predictions_list.tolist()]

from collections import Counter
label_counter = Counter(label_origin)
pred_counter = Counter(pred_origin)

print("Label:\n", label_counter.most_common(), end='\n\n')
print("Pred:\n", pred_counter.most_common())

혼동행렬 확인

In [None]:
from torchmetrics.classification import MulticlassConfusionMatrix
preds = torch.tensor(predictions_list)
target = torch.tensor(labels_list)
confmat = MulticlassConfusionMatrix(task="multiclass", num_classes=7)
confmat(preds, target) 

Class별 F1 score확인

In [None]:
from sklearn  import metrics
print(metrics.classification_report(predictions_list, labels_list))

#모델 평가

Test 데이터로더에 필요한 모듈 정의 및 데이터 불러오기

In [None]:
import torch
import pandas as pd
from torch import nn
from torch import optim
from torch.utils.data import Dataset, DataLoader

def get_numpy_from_nonfixed_2d_array(data, fixed_length=141):
    rows = []
    for a in data:
        rows.append(np.pad(a, (0, fixed_length), 'constant', constant_values=0)[:fixed_length])
    return np.concatenate(rows, axis=0).reshape(-1, fixed_length)

class CustomDataset(Dataset):
    def __init__(self, file_path):
        df = pd.read_pickle(file_path)

        self.temp_array = get_numpy_from_nonfixed_2d_array(df.temp.values, fixed_length=141)
        self.eda_array = get_numpy_from_nonfixed_2d_array(df.eda.values, fixed_length=141) 
        self.temp_ten = torch.Tensor(self.temp_array)
        self.eda_ten = torch.Tensor(self.eda_array)
        
        self.train_y = df.emotion_id.values
        self.length = len(df)

        self.train_y = torch.LongTensor(self.train_y)

    def __getitem__(self, index):
        t = self.temp_ten[index]
        e = self.eda_ten[index]
        y = self.train_y[index]
        return t,e,y

    def __len__(self):
        return self.length

In [None]:
test_dataset = CustomDataset("./dataset/KEMDy20_v1_1/sensor/bio_train/test_origin.pkl")
test_dataloader = DataLoader(test_dataset, batch_size=64, shuffle=False, drop_last=False)

Test F1 score 및 Accuracy확인

In [None]:
from torchmetrics import F1Score

labels_list = []
predictions_list = []
extraction = []
result = []

model.eval()

with torch.no_grad():
  for t, e, y in test_dataloader:
      t = t.to(device)
      e = e.to(device)
      y = y.to(device)

      output = model(t,e)
      result.append(output)

      labels_list.extend(y)
      predictions = torch.max(output,1)[1].to(device)
      predictions_list.extend(predictions)

  predictions_list = torch.tensor(predictions_list)
  labels_list = torch.tensor(labels_list)

  f1 = F1Score(task='multiclass', num_classes=7)
  f1_score = f1(predictions_list, labels_list)
  auc = Accuracy(task='multiclass', num_classes=7)
  auc_score = auc(predictions_list, labels_list)

  print('f1:',f1_score, 'auc:',auc_score)

Class별로 예측 값 개수 확인

In [None]:
label_dic = {0:'angry', 1:'disqust',2:'fear', 3:'happy', 4:'neutral', 5:'sad', 6:'surprise'}
label_origin = [label_dic[x] for x in labels_list.tolist()]
pred_origin = [label_dic[x] for x in predictions_list.tolist()]

from collections import Counter
label_counter = Counter(label_origin)
pred_counter = Counter(pred_origin)

print("Label:\n", label_counter.most_common(), end='\n\n')
print("Pred:\n", pred_counter.most_common())

혼동행렬 확인

In [None]:
from torchmetrics.classification import MulticlassConfusionMatrix
preds = torch.tensor(predictions_list)
target = torch.tensor(labels_list)
confmat = MulticlassConfusionMatrix(task="multiclass", num_classes=7)
confmat(preds, target) 

Class별 F1 Score확인

In [None]:
from sklearn  import metrics
print(metrics.classification_report(predictions_list, labels_list))

#앙상블을 위한 결과값 저장

In [None]:
# 예측 결과 데이터 프레임으로 만들기
pred_output = torch.cat(result, 0)
pred_np = pred_output.detach().cpu().numpy()
pred_df = pd.DataFrame(pred_np, columns=['angry', 'disqust','fear', 'happy', 'neutral', 'sad', 'surprise'])
pred_df

In [None]:
# segment_id와 결합
test_result = pd.concat([test['segment_id'],pred_df],axis=1)
test_result

In [None]:
# 최종 결과 데이터 프레임 저장
test_result.to_pickle("./dataset/KEMDy20_v1_1/sensor/result/bio_result.pkl")