In [1]:
import pandas as pd
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
from sklearn.model_selection import train_test_split
from sklearn.pipeline import Pipeline
from sklearn.metrics import accuracy_score, recall_score, precision_score, f1_score, fbeta_score, classification_report, confusion_matrix
from torch.utils.data import DataLoader, TensorDataset
from imblearn.over_sampling import SMOTE

### Utilities

In [2]:
# Check if CUDA is available
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

#loading data
data = pd.read_csv('./data/processed_data.csv')

# Features and labels
X = data.drop('y', axis=1)
y = data['y']

# Split into training and test sets
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.1, random_state=2024)
# Split training set into training and validation sets
X_train, X_val, y_train, y_val = train_test_split(X_train, y_train, test_size=0.1, random_state=2024)

In [3]:
def get_data_loader(imb=False, batch_size=64):

    if imb:
        
        print("use smote")
        # use smote to oversampling
        smote = SMOTE(random_state=2024)
        X_train_resampled, y_train_resampled = smote.fit_resample(X_train, y_train)

        X_train_tensor = torch.tensor(X_train_resampled.values, dtype=torch.float32).to(device)
        y_train_tensor = torch.tensor(y_train_resampled.values, dtype=torch.float32).view(-1, 1).to(device)
    else:
        
        X_train_tensor = torch.tensor(X_train.values, dtype=torch.float32).to(device)
        y_train_tensor = torch.tensor(y_train.values, dtype=torch.float32).view(-1, 1).to(device)
        
    
    X_val_tensor = torch.tensor(X_val.values, dtype=torch.float32).to(device)
    X_test_tensor = torch.tensor(X_test.values, dtype=torch.float32).to(device)
    
    y_val_tensor = torch.tensor(y_val.values, dtype=torch.float32).view(-1, 1).to(device)
    y_test_tensor = torch.tensor(y_test.values, dtype=torch.float32).view(-1, 1).to(device)
    
    
    # Create data loaders
    train_dataset = TensorDataset(X_train_tensor, y_train_tensor)
    val_dataset = TensorDataset(X_val_tensor, y_val_tensor)
    test_dataset = TensorDataset(X_test_tensor, y_test_tensor)
    train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)
    val_loader = DataLoader(val_dataset, batch_size=batch_size, shuffle=False)
    test_loader = DataLoader(test_dataset, batch_size=batch_size, shuffle=False)

    return train_loader, val_loader, test_loader, y_val_tensor, y_test_tensor

In [4]:
def evaluate_val(epoch, val_f2_list):
    model.eval()
    val_loss = 0.0
    with torch.no_grad():
        y_pred_val = []
        
        for X_batch, y_batch in val_loader:

            X_batch, y_batch = X_batch.to(device), y_batch.to(device)            
            outputs = model(X_batch)
            loss = criterion(outputs, y_batch)
            val_loss += loss.item()
            
            predicted = outputs.round().view(-1)
            y_pred_val.extend(predicted.tolist())
    
    val_loss /= len(val_loader)
    y_pred_val = torch.tensor(y_pred_val)
    
    val_f2 = fbeta_score(y_val_tensor.cpu(), y_pred_val.cpu(), beta=2)
    val_f2_list.append(val_f2)
    
    # print(f'Epoch [{epoch+1}/{num_epochs}], Train Loss: {loss.item():.4f}, Val Loss: {val_loss:.4f}, Val F2-score: {val_f2:.4f}',end=' ')

def evaluate_test(epoch, test_precision_list, test_recall_list, test_f1_list, test_f2_list):
    model.eval()
    with torch.no_grad():
        
        y_pred_test = []
        
        for X_batch, _ in test_loader:
            
            X_batch = X_batch.to(device)            
            outputs = model(X_batch)
            predicted = outputs.round().view(-1)
            y_pred_test.extend(predicted.tolist())

    y_pred_test = torch.tensor(y_pred_test)

    test_precision = precision_score(y_test_tensor.cpu(), y_pred_test.cpu(), zero_division=1)
    test_precision_list.append(test_precision)
    test_recall = recall_score(y_test_tensor.cpu(), y_pred_test.cpu())
    test_recall_list.append(test_recall)
    test_f1 = f1_score(y_test_tensor.cpu(), y_pred_test.cpu())
    test_f1_list.append(test_f1)
    test_f2 = fbeta_score(y_test_tensor.cpu(), y_pred_test.cpu(), beta=2)
    test_f2_list.append(test_f2)
    
    # print(f', Test F2-score:{test_f2:.4f} ,Test Recall: {test_recall:.4f}')

def initialize_weights(m):
    if isinstance(m, nn.Linear):
        nn.init.xavier_uniform_(m.weight)
        if m.bias is not None:
            nn.init.constant_(m.bias, 0)

In [5]:
def training_model(model, num_epochs, train_loader, val_loader, test_loader, y_val_tensor, y_test_tensor):
    # Best record list
    val_f2_list = []

    test_recall_list = []
    test_precision_list = []
    test_f1_list = []
    test_f2_list = []
    
    
    for epoch in range(num_epochs):
        model.train()
        for X_batch, y_batch in train_loader:
            # Move data to GPU
            X_batch, y_batch = X_batch.to(device), y_batch.to(device)
            
            outputs = model(X_batch)
            loss = criterion(outputs, y_batch)
            
            optimizer.zero_grad()
            loss.backward()
            torch.nn.utils.clip_grad_norm_(model.parameters(), max_norm=1.0)  # Gradient clipping
            optimizer.step()
        
        evaluate_val(epoch, val_f2_list)
        evaluate_test(epoch, test_precision_list, test_recall_list, test_f1_list, test_f2_list)
        
    print("Done")
    best_index = val_f2_list.index(max(val_f2_list))
    print("Best Val Score In Epoch:", best_index+1)
    print("TestDataset Best Record")
    print(f'Test Precision:{test_precision_list[best_index]:.4f}, Test Recll:{test_recall_list[best_index]:.4f}')
    print(f'Test F1-score:{test_f1_list[best_index]:.4f}, Test F2-score:{test_f2_list[best_index]:.4f}')

### Model

In [6]:
'''NN'''
class NN(nn.Module):
    def __init__(self, input_layer_dim):
        super(NN, self).__init__()
        self.fc1 = nn.Linear(input_layer_dim, 512)
        self.fc2 = nn.Linear(512, 256)
        self.fc3 = nn.Linear(256, 128)
        self.fc4 = nn.Linear(128, 64)
        self.fc5 = nn.Linear(64, 32)
        self.fc6 = nn.Linear(32, 1)
        self.relu = nn.ReLU()
        self.dropout = nn.Dropout(p=0.5)
        self.sigmoid = nn.Sigmoid()
    
    def forward(self, x):
        x = self.relu(self.fc1(x))
        x = self.dropout(x)
        x = self.relu(self.fc2(x))
        x = self.dropout(x)
        x = self.relu(self.fc3(x))
        x = self.dropout(x)
        x = self.relu(self.fc4(x))
        x = self.dropout(x)
        x = self.relu(self.fc5(x))
        x = self.dropout(x)
        x = self.sigmoid(self.fc6(x))
        return x

In [7]:
'''CNN'''
class CNN(nn.Module):
    def __init__(self, input_layer_dim):
        super(CNN, self).__init__()
        self.conv1 = nn.Conv1d(in_channels=1, out_channels=32, kernel_size=5)
        self.pool1 = nn.MaxPool1d(kernel_size=5, stride=1)
        self.conv2 = nn.Conv1d(in_channels=32, out_channels=64, kernel_size=5)
        self.pool2 = nn.MaxPool1d(kernel_size=5, stride=1)
        # self.conv3 = nn.Conv1d(in_channels=64, out_channels=128, kernel_size=5)
        # self.pool3 = nn.MaxPool1d(kernel_size=5, stride=1)
        
        self.fc1 = nn.Linear(3968, 64)  # adjust dim78
        self.fc2 = nn.Linear(64, 32)
        self.fc3 = nn.Linear(32, 1)
        self.relu = nn.ReLU()
        self.dropout = nn.Dropout(p=0.5)
        self.sigmoid = nn.Sigmoid()
    
    def forward(self, x):
        x = x.unsqueeze(1)  # one-dim
        x = self.relu(self.conv1(x))
        x = self.pool1(x)
        x = self.relu(self.conv2(x))
        x = self.pool2(x)
        # x = self.relu(self.conv3(x))
        # x = self.pool3(x)
        x = x.view(x.size(0), -1)  # faltten
        x = self.relu(self.fc1(x))
        x = self.dropout(x)
        x = self.relu(self.fc2(x))
        x = self.dropout(x)
        x = self.sigmoid(self.fc3(x))
        return x

In [8]:
'''DilatedCNN'''
class DilatedCNN(nn.Module):
    def __init__(self, input_layer_dim):
        super(DilatedCNN, self).__init__()
        self.conv1 = nn.Conv1d(in_channels=1, out_channels=32, kernel_size=6, dilation=3)
        self.pool1 = nn.MaxPool1d(kernel_size=5, stride=1)
        self.conv2 = nn.Conv1d(in_channels=32, out_channels=64, kernel_size=6, dilation=3)
        self.pool2 = nn.MaxPool1d(kernel_size=5, stride=1)
        self.conv3 = nn.Conv1d(in_channels=64, out_channels=128, kernel_size=6, dilation=3)
        self.pool3 = nn.MaxPool1d(kernel_size=5, stride=1)
        self.fc1 = nn.Linear(2688, 64)  # adjust dim78
        self.fc2 = nn.Linear(64, 32)
        self.fc3 = nn.Linear(32, 1)
        self.relu = nn.ReLU()
        self.dropout = nn.Dropout(p=0.5)
        self.sigmoid = nn.Sigmoid()
    
    def forward(self, x):
        x = x.unsqueeze(1)  # one-dim
        x = self.relu(self.conv1(x))
        x = self.pool1(x)
        x = self.relu(self.conv2(x))
        x = self.pool2(x)
        x = self.relu(self.conv3(x))
        x = self.pool3(x)
        x = x.view(x.size(0), -1)  # faltten
        x = self.relu(self.fc1(x))
        x = self.dropout(x)
        x = self.relu(self.fc2(x))
        x = self.dropout(x)
        x = self.sigmoid(self.fc3(x))
        return x

### Training

In [24]:
'''NN without SMOTE'''
# Define model, loss function, optimizer
model = NN(X_train.values.shape[1]).to(device) # X_train.values.shape[1] is input_layer_dim
model.apply(initialize_weights) # Initial weight

criterion = nn.BCELoss()

optimizer = optim.Adam(model.parameters(), lr=0.001)
# Get data_loader
train_loader, val_loader, test_loader, y_val_tensor, y_test_tensor = get_data_loader(imb=False, batch_size=512)
# Train the model
num_epochs = 200
training_model(model, num_epochs, train_loader, val_loader, test_loader, y_val_tensor, y_test_tensor)

Done
Best Val Score In Epoch: 28
TestDataset Best Record
Test Precision:0.5357, Test Recll:0.7290
Test F1-score:0.6176, Test F2-score:0.6799


In [32]:
'''NN with SMOTE'''
# Define model,| loss function, optimizer
model = NN(X_train.values.shape[1]).to(device) # X_train.values.shape[1] is input_layer_dim
model.apply(initialize_weights) # Initial weight

criterion = nn.BCELoss()

optimizer = optim.Adam(model.parameters(), lr=0.001)
# Get data_loader
train_loader, val_loader, test_loader, y_val_tensor, y_test_tensor = get_data_loader(imb=True, batch_size=512)
# Train the model
num_epochs = 200
training_model(model, num_epochs, train_loader, val_loader, test_loader, y_val_tensor, y_test_tensor)

use smote
Done
Best Val Score In Epoch: 4
TestDataset Best Record
Test Precision:0.4247, Test Recll:0.8542
Test F1-score:0.5673, Test F2-score:0.7105


In [27]:
'''CNN without SMOTE'''
# Define model, loss function, optimizer
model = CNN(X_train.values.shape[1]).to(device) # X_train.values.shape[1] is input_layer_dim
model.apply(initialize_weights) # Initial weight

criterion = nn.BCELoss()

optimizer = optim.Adam(model.parameters(), lr=0.001)
# Get data_loader
train_loader, val_loader, test_loader, y_val_tensor, y_test_tensor = get_data_loader(imb=False, batch_size=512)
# Train the model
num_epochs = 200
training_model(model, num_epochs, train_loader, val_loader, test_loader, y_val_tensor, y_test_tensor)

Done
Best Val Score In Epoch: 152
TestDataset Best Record
Test Precision:0.5444, Test Recll:0.6411
Test F1-score:0.5888, Test F2-score:0.6191


In [10]:
'''CNN with SMOTE'''
# Define model, loss function, optimizer
model = CNN(X_train.values.shape[1]).to(device) # X_train.values.shape[1] is input_layer_dim
model.apply(initialize_weights) # Initial weight

criterion = nn.BCELoss()

optimizer = optim.Adam(model.parameters(), lr=0.001)
# Get data_loader
train_loader, val_loader, test_loader, y_val_tensor, y_test_tensor = get_data_loader(imb=True, batch_size=512)
# Train the model
num_epochs = 200
training_model(model, num_epochs, train_loader, val_loader, test_loader, y_val_tensor, y_test_tensor)

use smote
Done
Best Val Score In Epoch: 55
TestDataset Best Record
Test Precision:0.3940, Test Recll:0.8897
Test F1-score:0.5462, Test F2-score:0.7109


In [26]:
'''DilatedCNNCNN without SMOTE'''
# Define model, loss function, optimizer
model = DilatedCNN(X_train.values.shape[1]).to(device) # X_train.values.shape[1] is input_layer_dim
model.apply(initialize_weights) # Initial weight

criterion = nn.BCELoss()

optimizer = optim.Adam(model.parameters(), lr=0.001)
# Get data_loader
train_loader, val_loader, test_loader, y_val_tensor, y_test_tensor = get_data_loader(imb=False, batch_size=512)
# Train the model
num_epochs = 200
training_model(model, num_epochs, train_loader, val_loader, test_loader, y_val_tensor, y_test_tensor)

Done
Best Val Score In Epoch: 38
TestDataset Best Record
Test Precision:0.5134, Test Recll:0.7495
Test F1-score:0.6094, Test F2-score:0.6864


In [20]:
'''DilatedCNNCNN with SMOTE'''
# Define model, loss function, optimizer
model = DilatedCNN(X_train.values.shape[1]).to(device) # X_train.values.shape[1] is input_layer_dim
model.apply(initialize_weights) # Initial weight

criterion = nn.BCELoss()

optimizer = optim.Adam(model.parameters(), lr=0.001)
# Get data_loader
train_loader, val_loader, test_loader, y_val_tensor, y_test_tensor = get_data_loader(imb=True, batch_size=512)
# Train the model
num_epochs = 200
training_model(model, num_epochs, train_loader, val_loader, test_loader, y_val_tensor, y_test_tensor)

use smote
Done
Best Val Score In Epoch: 44
TestDataset Best Record
Test Precision:0.4319, Test Recll:0.8542
Test F1-score:0.5738, Test F2-score:0.7145
