In [4]:
import torch
import pickle
import random
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F
import torch_geometric.nn as gnn

from torch.utils.data import Dataset, DataLoader, random_split
from sklearn.model_selection import train_test_split
from torch_geometric.data import Data

In [5]:
from net import PSOGNN
from pso import PSO 
from base_function import *

In [6]:
splited_path = r'/Users/admin/Code/deepso/splited_data.pkl'
with open(splited_path, 'rb') as f:
    dataset = pickle.load(f)


In [7]:
train_set = dataset['train']
print(train_set[0])

{'dim': 1, 'func_type': 'ackley', 'params': [16.072515478646675, 0.2580391494351727, 4.971520742724884]}


In [8]:
def create_batches(dataset, batch_size): 
    random.shuffle(dataset)
    for i in range(0, len(dataset), batch_size): 
        batch = dataset[i:i + batch_size]
        yield batch

In [9]:
batch_size = 16
mini_batch_size = 4
num_epochs = 10

UB = 1
LB = 0
num_particles = 100

def train(dataloader, accumulate_step):
    for epoch in range(num_epochs):
        print(f"Epoch {epoch+1}/{num_epochs}")
        function_loader = create_batches(dataloader, batch_size)

        for batch in function_loader:
            # print(batch)
            for i in range(0, len(batch), mini_batch_size):
                mini_batch = batch[i : i + mini_batch_size]
                # print('mini_batch', mini_batch)
                for func in mini_batch: 
                    # print('func',func)
                    # break
                    dim = func['dim']
                    func_type = func['func_type']
                    params = func['params']

                    def function(x):
                        function_instance = Function.get_function(func_type, x, params)
                        return function_instance.evaluate_function()
                    
                    X = torch.rand(num_particles, dim)
                    model = PSOGNN(node_input_dim=dim)
                    optimizer = torch.optim.Adam(model.parameters(), lr=0.001)

                    Pso = PSO(X, function, model, LB, UB)
                    position_best, best, mean_fitness = Pso.run()

                    loss = mean_fitness
                    loss = loss/accumulate_step
                    loss.backward()
                    
                    if (i + 1) % accumulate_step == 0:
                        optimizer.step()
                        optimizer.zero_grad()




In [10]:

train(train_set, accumulate_step=4)

Epoch 1/10


In [None]:
def evaluate(dataloader):
    total_loss = 0
    total_functions = 0
    function_loader = create_batches(dataloader, batch_size)
    
    # Evaluation mode (disable gradients)
    with torch.no_grad():
        for batch in function_loader:
            for i in range(0, len(batch), mini_batch_size):
                mini_batch = batch[i : i + mini_batch_size]
                
                for func in mini_batch: 
                    print('func', func)
                    
                    dim = func['dim']
                    func_type = func['func_type']
                    params = func['params']
                    
                    def function(x):
                        function_instance = Function.get_function(func_type, x, params)
                        return function_instance.evaluate_function()

                    X = torch.rand(num_particles, dim)
                    
                    Pso = PSO(X, function, LB, UB)
                    position_best, best, mean_fitness = Pso.run()

                    total_loss += mean_fitness
                    total_functions += 1

    average_loss = total_loss / total_functions if total_functions > 0 else 0
    return average_loss