In [123]:
import torch
import torch.nn as nn
import matplotlib.pyplot as plt
import numpy as np
import pandas as pd
from sklearn.feature_extraction.text import TfidfVectorizer
from sklearn.metrics import classification_report, precision_score, recall_score, f1_score, accuracy_score
import torch.nn.functional as F
import os

In [124]:
# set device to GPU if available
device = 'cuda' if torch.cuda.is_available() else 'cpu'
print(f'{device} selected')

cpu selected


In [125]:
emotions_dict = {'ang': 0,
                'hap': 1,
                'sad': 2,
                'fea': 3,
                'sur': 4,
                'neu': 5
}

In [126]:
x_test_text = pd.read_csv('../data/text_test.csv')
y_test_text = x_test_text['label']

x_test_audio = pd.read_csv('../data/audio_test_v0.csv')
y_test_audio = x_test_audio['label']

y_test = y_test_audio  # since y_train_audio == y_train_text

In [127]:
# Concatenate the 'transcription' columns directly
transcription_test = x_test_text['transcription']

tfidf = TfidfVectorizer(sublinear_tf=True, min_df=5, norm='l2', encoding='latin-1', ngram_range=(1, 2), stop_words='english')

# Apply TfidfVectorizer
x_test_text = tfidf.fit_transform(transcription_test).toarray()

print(x_test_text.shape)

(1960, 422)


In [128]:
combined_x_test = np.concatenate((np.array(x_test_audio[x_test_audio.columns[2:]]), x_test_text), axis=1)

print(combined_x_test.shape)

(1960, 698)


In [129]:
# need to make dummy input channel for CNN input feature tensor
X_test = np.expand_dims(combined_x_test,1)

# convert emotion labels from list back to numpy arrays for PyTorch to work with
y_test = np.array(y_test)

In [135]:
class BasicBlock(nn.Module):
    expansion = 1

    def __init__(self, in_planes, planes, stride=1):
        super(BasicBlock, self).__init__()
        self.conv1 = nn.Conv1d(in_planes, planes, kernel_size=3, stride=stride, padding=1, bias=False)
        self.bn1 = nn.BatchNorm1d(planes)
        self.conv2 = nn.Conv1d(planes, planes, kernel_size=3, stride=1, padding=1, bias=False)
        self.bn2 = nn.BatchNorm1d(planes)

        self.shortcut = nn.Sequential()
        if stride != 1 or in_planes != self.expansion*planes:
            self.shortcut = nn.Sequential(
                nn.Conv1d(in_planes, self.expansion*planes, kernel_size=1, stride=stride, bias=False),
                nn.BatchNorm1d(self.expansion*planes)
            )

    def forward(self, x):
        out = F.relu(self.bn1(self.conv1(x)))
        out = self.bn2(self.conv2(out))
        out += self.shortcut(x)
        out = F.relu(out)
        return out

class ResNet1D(nn.Module):
    def __init__(self, block, num_blocks, num_classes=10):
        super(ResNet1D, self).__init__()
        self.in_planes = 64

        self.conv1 = nn.Conv1d(1, 64, kernel_size=7, stride=2, padding=3, bias=False)
        self.bn1 = nn.BatchNorm1d(64)
        self.layer1 = self._make_layer(block, 64, num_blocks[0], stride=2)
        self.layer2 = self._make_layer(block, 128, num_blocks[1], stride=2)
        self.layer3 = self._make_layer(block, 256, num_blocks[2], stride=2)
        self.layer4 = self._make_layer(block, 512, num_blocks[3], stride=2)
        self.layer5 = self._make_layer(block, 1024, num_blocks[3], stride=2)
        self.linear = nn.Linear(1024*block.expansion, num_classes)

    def _make_layer(self, block, planes, num_blocks, stride):
        strides = [stride] + [1]*(num_blocks-1)
        layers = []
        for stride in strides:
            layers.append(block(self.in_planes, planes, stride))
            self.in_planes = planes * block.expansion
        #layers.append(nn.Dropout(dropout))  # Add dropout layer after the block
        return nn.Sequential(*layers)

    def forward(self, x):
        out = F.relu(self.bn1(self.conv1(x)))
        out = self.layer1(out)
        out = self.layer2(out)
        out = self.layer3(out)
        out = self.layer4(out)
        out = self.layer5(out)
        out = F.avg_pool1d(out, out.size(2))
        out = out.view(out.size(0), -1)
        out_logits = self.linear(out)
        out_softmax = F.softmax(out_logits, dim=1)
        return out_logits, out_softmax

In [138]:
# define loss function: CrossEntropyLoss()
def criterion(predictions, targets):
    return nn.CrossEntropyLoss()(input=predictions, target=targets)

In [151]:
def make_validate_fnc(model,criterion):
    def validate(X,Y):

        # don't want to update any network parameters on validation passes: don't need gradient
        # wrap in torch.no_grad to save memory and compute in validation phase:
        with torch.no_grad():

            # set model to validation phase i.e. turn off dropout and batchnorm layers
            model.eval()

            # get predictions
            output_logits, output_softmax = model(X)
            predictions = torch.argmax(output_softmax,dim=1)

            # compute the accuracy and loss
            accuracy = torch.sum(Y==predictions)/float(len(Y))
            loss = criterion(output_logits,Y)
            
            # Convert predictions and true labels to numpy arrays
            predictions_np = predictions.cpu().numpy()
            y_true_np = Y.cpu().numpy()

            # Compute classification report
            report = classification_report(y_true_np, predictions_np)

            # Calculate precision, recall, and F1 score
            precision = precision_score(y_true_np, predictions_np, average='weighted')
            recall = recall_score(y_true_np, predictions_np, average='weighted')
            f1 = f1_score(y_true_np, predictions_np, average='weighted')
            acc = accuracy_score(y_true_np, predictions_np)

        return loss.item(), accuracy * 100, predictions, precision * 100, recall * 100, f1 * 100, acc * 100, report
    return validate

In [152]:
optimizer = None

def load_checkpoint(optimizer, model, filename):
    checkpoint_dict = torch.load(filename)
    epoch = checkpoint_dict['epoch']
    model.load_state_dict(checkpoint_dict['model'])
    if optimizer is not None:
        optimizer.load_state_dict(checkpoint_dict['optimizer'])
    return epoch

In [153]:
# pick load folder
load_folder = '../data/model_checkpoints/'
#load_folder = '../savedModel'

# pick the epoch to load
epoch = '000'

model_name = f'CNN1D_v2-{epoch}.pkl'

# make full load path
load_path = os.path.join(load_folder, model_name)

## instantiate empty model and populate with params from binary
#model = CNN1D(len(emotions_dict))
model = ResNet1D(BasicBlock, [2, 2, 2, 2], num_classes=len(emotions_dict))
#optimizer = torch.optim.SGD(model.parameters(),lr=0.001, weight_decay=1e-4, momentum=0.4)
load_checkpoint(optimizer, model, load_path)

print(f'Loaded model from {load_path}')

Loaded model from ../data/model_checkpoints/CNN1D_v2-000.pkl


In [154]:
# Move the model to GPU if available
model.to(device)

# reinitialize validation function with model from chosen checkpoint
validate = make_validate_fnc(model,criterion)

# Convert to tensors
X_test_tensor = torch.tensor(X_test,device=device).float()
y_test_tensor = torch.tensor(y_test,dtype=torch.long,device=device)

test_loss, test_acc, predicted_emotions, precision, recall, f1, acc, report = validate(X_test_tensor,y_test_tensor)

print(f'Test accuracy is {test_acc:.2f}%')

print(f'Test precision is {precision:.2f}')
print(f'Test recall is {recall:.2f}')
print(f'Test F1 is {f1:.2f}')
print(f'Test accuracy is {acc:.2f}')

print('\n',report)

Test accuracy is 62.30%
Test precision is 61.83
Test recall is 62.30
Test F1 is 61.85
Test accuracy is 62.30

               precision    recall  f1-score   support

           0       0.45      0.45      0.45       206
           1       0.44      0.40      0.42       315
           2       0.55      0.63      0.59       618
           3       1.00      1.00      1.00       245
           4       0.99      1.00      1.00       238
           5       0.48      0.38      0.42       338

    accuracy                           0.62      1960
   macro avg       0.65      0.64      0.65      1960
weighted avg       0.62      0.62      0.62      1960

