In [1]:
import torch
import torch.nn as nn
from torch.utils.data import DataLoader
import pandas as pd
from sklearn.model_selection import train_test_split
import utils
from sklearn.metrics import accuracy_score, confusion_matrix
import datetime
import json
import utils.NN_building as NN_building








pygame 2.6.1 (SDL 2.28.4, Python 3.11.10)
Hello from the pygame community. https://www.pygame.org/contribute.html


In [2]:
# Parameters

model_type =  'renato' # 'classifier' or 'regressor' or 'renato'
use_trap_info = True
ntraps = 3
lags = 3
random_split = True
test_size = 0.2
scale = False
learning_rate =1e-3
batch_size = 64
epochs = 5



parameters = {
    'model_type': model_type,
    'use_trap_info': use_trap_info,
    'ntraps': ntraps,
    'lags': lags,
    'random_split': random_split,
    'test_size': test_size,
    'scale': scale,
    'learning_rate': learning_rate,
    'batch_size': batch_size,
    'epochs': epochs   
}



In [3]:
device = (
    "cuda"
    if torch.cuda.is_available()
    else "mps"
    if torch.backends.mps.is_available()
    else "cpu"
)
print(f"Using {device} device")

Using cpu device


In [4]:
# data import and preprocessing
data = pd.read_csv(f'./results/final_df_lag{lags}_ntraps{ntraps}.csv')
n = data.shape[0]
nplaca_index = data['nplaca']
data.drop(columns=['nplaca','distance0'], inplace=True) # drop distance0 because it is always zero
ovos_flag = data['novos'].apply(lambda x: 1 if x > 0 else 0)#.rename('ovos_flag', inplace=True)

# divide columns into groups
days_columns = [f'days{i}_lag{j}' for i in range(ntraps) for j in range(1, lags+1)]
distance_columns = [f'distance{i}' for i in range(1,ntraps)]
eggs_columns = [f'trap{i}_lag{j}' for i in range(ntraps) for j in range(1, lags+1)]


In [5]:
# definition of x and y
if model_type == 'classifier':
    y = ovos_flag
elif model_type == 'regressor':
    y = data['novos']
elif model_type == 'renato':
    y = pd.concat([ovos_flag.rename('ovos_flag', inplace=True),data['novos']],axis=1)
    

if use_trap_info:
    x = data.drop(columns=['novos'])
else:
    drop_cols = ['novos'] + days_columns + distance_columns
    x = data.drop(columns=drop_cols)

In [6]:
# train test split
train_size = 1 - test_size

if random_split:
    x_train, x_test, y_train, y_test = train_test_split(x, y, test_size=test_size, random_state=42, stratify=ovos_flag)
else:
    y_train = y.iloc[:int(n*train_size)]
    y_test = y.iloc[int(n*train_size):]
    x_train = x.iloc[:int(n*train_size)]
    x_test = x.iloc[int(n*train_size):]    

In [7]:
# scaling
if scale:
    x_train, x_test, max_eggs = NN_building.scale_column(x_train, x_test, eggs_columns)
    if use_trap_info:
        x_train, x_test, max_distance = NN_building.scale_column(x_train, x_test, distance_columns)
        x_train, x_test, max_days = NN_building.scale_column(x_train, x_test, days_columns)


    if model_type == 'regressor':
        y_train = y_train/max_eggs
        y_test = y_test/max_eggs
    elif model_type == 'renato':
        y_train['novos'] = y_train['novos']/max_eggs
        y_test['novos'] = y_test['novos']/max_eggs

In [8]:
# transform to tensors
xtrain = torch.tensor(x_train.values, dtype=torch.float32).to(device)
xtest = torch.tensor(x_test.values, dtype=torch.float32).to(device)
if model_type == 'classifier':
    ytrain = torch.tensor(y_train.values, dtype=torch.long).to(device)
    ytest = torch.tensor(y_test.values, dtype=torch.long).to(device)
elif model_type == 'regressor':
    ytrain = torch.tensor(y_train.values, dtype=torch.float32).to(device)
    ytest = torch.tensor(y_test.values, dtype=torch.float32).to(device)
elif model_type == 'renato':
    ytrain = torch.tensor(y_train.values, dtype=torch.float32).to(device)
    ytest = torch.tensor(y_test.values, dtype=torch.float32).to(device)
    
train_dataset = NN_building.CustomDataset(xtrain, ytrain,model_type)
test_dataset = NN_building.CustomDataset(xtest, ytest,model_type)
train_dataloader = DataLoader(train_dataset, batch_size=batch_size, shuffle=random_split)
test_dataloader = DataLoader(test_dataset, batch_size=batch_size, shuffle=random_split)

  self.features = torch.tensor(features, dtype=torch.float32)
  self.targets = torch.tensor(targets, dtype=torch.float32)


In [9]:
# Network structure
if use_trap_info:
    model_input = lags*ntraps + ntraps-1 + ntraps*lags # sum  of eggs, distances minus one and days
else:
    model_input = lags*ntraps
    
if model_type == 'classifier':
    model_output = 2
elif model_type == 'regressor':
    model_output = 1
elif model_type == 'renato':
    model_output = 2 # returns logit of cross entropy
                     # and lambda of exponential distribution



class NeuralNetwork(nn.Module):
    def __init__(self):
        super().__init__()
     
        self.layer1 = nn.Linear(model_input, 20)
        self.layer2 = nn.Linear(20, 10)
        self.layer3 = nn.Linear(10, 5)    
        self.theta = nn.Linear(5, model_output)
        self.exp = nn.Linear(5 + model_output, 1) # exponential distribution

        
    def forward(self, x):
        out1 = self.layer1(x)
        out2 = self.layer2(out1)
        out3 = self.layer3(out2)
        logit = self.theta(out3)
        lamb = self.exp(torch.cat((out3, logit), dim=1)) # concat weights and logits. To use class instead, change to torch.cat((out3, logit.argmax(1)), dim=1) and self.exp
        lamb = nn.ReLU()(lamb)+0.00001 # noise added
        return logit, lamb



In [10]:
# Create train and test loops
def train_loop(dataloader, model, loss_class, loss_reg, optimizer,batch_size):
    size = xtrain.shape[0]
    model.train()
    for batch, (xtest, ytest) in enumerate(dataloader):
        # Compute prediction and loss
        print(ytest[:,0])
        optimizer.zero_grad()
        logit, lamb = model(xtest)
        loss1 = loss_class(logit, ytest[:,0].long())
        loss2 = loss_reg(lamb, ytest[:,1])
        totalLoss = loss1 + loss2
        # Backpropagation
        totalLoss.backward()
        optimizer.step()
'''
        if batch % 100 == 0:
            loss, current = totalLoss.item(), batch * batch_size + len(xtest)
            print(f"loss: {loss:>7f}  [{current:>5d}/{size:>5d}]")
'''


def test_loop(dataloader, model, loss_class, loss_reg,model_type):
    # Set the model to evaluation mode - important for batch normalization and dropout layers
    # Unnecessary in this situation but added for best practices
    model.eval()
    size = len(dataloader.dataset)
    num_batches = len(dataloader)
    loss1, loss2, acc_class, acc_reg, error_reg  = 0, 0, 0, 0, 0 

    # Evaluating the model with torch.no_grad() ensures that no gradients are computed during test mode
    # also serves to reduce unnecessary gradient computations and memory usage for tensors with requires_grad=True
    with torch.no_grad():
        for X, y in dataloader:
            y_long =  y[:,0].long()
            y_float = y[:,1]
            if model_type == 'classifier':
                pred = model(X)
                acc += (pred.argmax(1) == y).type(torch.float).sum().item()
            elif model_type == 'regressor':
                pred = torch.round(model(X))
                acc += ((pred.round() == y).type(torch.float)).sum().item()
            elif model_type == 'renato':
                logit, lamb = model(X)
                yclass = torch.argmax(logit, dim=1)
                xx = torch.linspace(0, 5000, 5001)
                xx = xx.repeat(lamb.shape[0], 1)
                yy = loss_reg.pdf(lamb, xx)
                y_float = yy.argmax(1)
                acc_class += (yclass == y_long).type(torch.float).sum().item()
                acc_reg += (torch.round(lamb) == y_float).type(torch.float).sum().item()
                error_reg += ((lamb - y_float)**2).sum().item()
                loss1 += loss_class(logit, y_long).item()
                loss2 += loss_reg(lamb, y_float).item()

    loss1 /= num_batches
    loss2 /= num_batches
    acc_class /= size
    acc_reg /= size
    error_reg /= size
    return loss1, loss2, acc_class, acc_reg, error_reg

In [11]:
model = NeuralNetwork().to(device)

if model_type == 'classifier':
    loss_fn = nn.CrossEntropyLoss()
elif model_type == 'regressor':
    loss_fn = nn.MSELoss()
elif model_type == 'renato':
    loss_class = nn.CrossEntropyLoss()
    loss_reg = NN_building.CustomLoss()

optimizer = torch.optim.Adam(model.parameters(), lr=learning_rate)


loss1_hist = []
loss2_hist = []
acc_class_hist = []
acc_reg_hist = []
error_reg_hist = []
for t in range(epochs):
    print(f"Epoch {t+1}\n-------------------------------")
    train_loop(train_dataloader, model, loss_class, loss_reg, optimizer, batch_size)
    loss1, loss2, acc_class, acc_reg, error_reg = test_loop(test_dataloader, model, loss_class, loss_reg,model_type)


    loss1_hist.append(loss1)
    loss2_hist.append(loss2)
    acc_class_hist.append(acc_class)
    acc_reg_hist.append(acc_reg)
    error_reg_hist.append(error_reg)
    torch.save(model.state_dict(), f'./results/NN/save_parameters/model{model_type}_lags{lags}_ntraps{3}_epoch{t}.pth')
    
    
print("Done!")
utils.play_ending_song()
utils.stop_ending_song(2)

Epoch 1
-------------------------------
tensor([1., 0., 1., 1., 0., 1., 1., 0., 1., 0., 1., 0., 1., 1., 0., 1., 0., 1.,
        1., 0., 0., 0., 0., 0., 1., 1., 0., 1., 0., 0., 0., 0., 0., 0., 1., 0.,
        1., 1., 1., 0., 1., 0., 0., 0., 1., 1., 0., 0., 0., 0., 1., 0., 1., 1.,
        0., 0., 0., 0., 1., 0., 0., 0., 1., 1.])
tensor([1., 1., 0., 0., 0., 1., 1., 0., 1., 0., 0., 0., 1., 0., 1., 1., 0., 0.,
        1., 1., 1., 0., 1., 0., 0., 1., 0., 0., 0., 0., 0., 0., 1., 0., 1., 1.,
        0., 1., 0., 0., 0., 0., 0., 1., 0., 1., 1., 1., 1., 0., 0., 1., 0., 0.,
        0., 1., 0., 0., 0., 1., 0., 0., 0., 1.])
tensor([0., 0., 1., 0., 0., 1., 0., 0., 1., 0., 0., 0., 1., 1., 0., 0., 0., 0.,
        0., 1., 1., 0., 1., 1., 0., 0., 1., 1., 1., 0., 0., 0., 0., 1., 0., 0.,
        0., 1., 1., 1., 1., 0., 1., 1., 1., 0., 0., 0., 1., 1., 0., 0., 0., 1.,
        1., 0., 1., 0., 1., 1., 0., 0., 0., 1.])
tensor([0., 0., 0., 0., 0., 0., 1., 1., 0., 1., 1., 1., 0., 1., 0., 1., 1., 0.,
        1., 0

In [12]:
if model_type == 'classifier':
    yhat = model(xtest).argmax(1).cpu().numpy()
elif model_type == 'regressor':
    yhat = model(xtest).round().cpu().detach().numpy() 
elif model_type == 'renato':
    logit, lamb = model(xtest)
    yhat = torch.round(lamb).cpu().detach().numpy()

    
if model_type == 'classifier':
    print(accuracy_score(y_test, yhat))
    print(confusion_matrix(y_test, yhat, normalize='true', labels=[0,1]))


In [15]:
structure_path = f'./results/NN/save_structure/model{model_type}_lags{lags}_ntraps{3}_structure.pth'

new_results = {
    'model': model_type,
    'net_structure': structure_path,
    'repetition': 1,
    'parameters': parameters,
    'date': datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S"),
    'accuracy': [acc_class_hist, acc_reg_hist],
    'loss': [loss1_hist, loss2_hist],
    'yhat': yhat.tolist(),
    'ytest': ytest.cpu().numpy().tolist(),
}


In [20]:
structure_path = f'./results/NN/save_structure/model{model_type}_lags{lags}_ntraps{3}_structure.pth'
torch.save(model, structure_path)

new_results = {
    'model': model_type,
    'net_structure': structure_path,
    'repetition': 1,
    'parameters': parameters,
    'date': datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S"),
    'accuracy': [acc_class_hist, acc_reg_hist],
    'loss': [loss1_hist, loss2_hist],
    'yhat': yhat.tolist(),
    'ytest': ytest.cpu().numpy().tolist(),
}


# Load the existing JSON file
with open('./results/NN/model_accuracies.json', 'r') as f:
    results = json.load(f)
    # Update with new results


for item in results:
    if item['parameters'] == new_results['parameters'] and item['net_structure'] == new_results['net_structure']:
        new_results['repetition'] = item['repetition'] + 1
results.append(new_results)



with open('./results/NN/model_accuracies.json', 'w') as file:
    json.dump(results, file, indent=4)