In [1]:
import torch

print("PyTorch version:", torch.__version__)
if torch.cuda.is_available():
    print(f"Number of GPUs available: {torch.cuda.device_count()}")
    for i in range(torch.cuda.device_count()):
        print(torch.cuda.get_device_name(i))
else:
    print("No GPUs found, using CPU instead.")

PyTorch version: 2.1.1+cu121
Number of GPUs available: 2
NVIDIA GeForce RTX 3080
NVIDIA GeForce RTX 3080


In [2]:
import random
from sklearn.metrics import f1_score
import torch
import torch.nn as nn
import torch.nn.functional as F
import numpy as np
from sklearn.model_selection import train_test_split

torch.cuda.set_device(1)

population_size = 10  # You can adjust this number based on your requirement
num_generations = 20  # Number of generations for the genetic algorithm

class TwoLayerNet(nn.Module):
    def __init__(self, input_size, hidden_size, num_classes):
        super(TwoLayerNet, self).__init__()
        self.fc1 = nn.Linear(input_size, hidden_size)
        self.fc2 = nn.Linear(hidden_size, num_classes)

    def forward(self, x, activation_func=F.relu):
        out = activation_func(self.fc1(x))
        out = self.fc2(out)
        return out

# Load the data
X_train = np.load('./04_hw_data/train_X.npy')
y_train = np.load('./04_hw_data/train_y.npy')
X_test = np.load('./04_hw_data/test_X.npy')
y_test = np.load('./04_hw_data/test_y.npy')

# Split the training data into training and validation sets
X_train, X_val, y_train, y_val = train_test_split(X_train, y_train, test_size=0.2)

# Assuming each image in the dataset is 28x28
X_train = X_train.reshape(X_train.shape[0], -1)
X_val = X_val.reshape(X_val.shape[0], -1)
X_test = X_test.reshape(X_test.shape[0], -1)

def train_model(model, X_train, y_train, batch_size, activation_func, epochs=5):
    criterion = nn.CrossEntropyLoss()
    optimizer = torch.optim.SGD(model.parameters(), lr=0.01)

    # Reshape and convert data to PyTorch tensors
    X_train_tensor = torch.tensor(X_train, dtype=torch.float32)
    y_train_tensor = torch.tensor(y_train, dtype=torch.int64)

    # Training loop
    for epoch in range(epochs):
        # Create mini-batches
        for i in range(0, len(X_train_tensor), batch_size):
            inputs = X_train_tensor[i:i+batch_size]
            labels = y_train_tensor[i:i+batch_size]

            # Forward pass
            outputs = model(inputs, activation_func)
            loss = criterion(outputs, labels)

            # Backward pass and optimization
            optimizer.zero_grad()
            loss.backward()
            optimizer.step()

def calculate_fitness(X_train, y_train, X_val, y_val, batch_size, activation_func):
    model = TwoLayerNet(X_train.shape[1], 128, 10)  # Assuming 128 hidden units and 10 classes
    train_model(model, X_train, y_train, batch_size, activation_func)
    # Evaluate the model
    X_val_tensor = torch.tensor(X_val, dtype=torch.float32)
    y_val_tensor = torch.tensor(y_val, dtype=torch.int64)
    outputs = model(X_val_tensor, activation_func)
    predicted = torch.argmax(outputs, 1)
    f1 = f1_score(y_val_tensor.numpy(), predicted.numpy(), average='macro')
    return f1

def select_parents(population, fitness_scores):
    if not fitness_scores:
        raise ValueError("Fitness scores list is empty.")

    total_fitness = sum(fitness_scores)

    # Handle case where total fitness is 0
    if total_fitness == 0:
        # If all fitness scores are 0, treat each individual as equally likely to be chosen
        selection_probs = [1 / len(fitness_scores)] * len(fitness_scores)
    else:
        # Calculate selection probabilities
        selection_probs = [f / total_fitness for f in fitness_scores]

    selected_indices = np.random.choice(range(len(population)), size=len(population)//2, p=selection_probs)
    parents = [population[i] for i in selected_indices]
    return parents

def crossover(parent1, parent2):
    crossover_point = random.randint(1, len(parent1) - 1)
    child1 = parent1[:crossover_point] + parent2[crossover_point:]
    child2 = parent2[:crossover_point] + parent1[crossover_point:]
    return child1, child2

def mutate(child, activation_funcs, batch_sizes, mutation_rate=0.1):
    if random.random() < mutation_rate:
        gene_to_mutate = random.choice(['batch_size', 'activation_func'])
        if gene_to_mutate == 'batch_size':
            child[0] = random.choice(batch_sizes)
        else:
            child[1] = random.choice(activation_funcs)
    return child

activation_funcs = [F.relu, torch.sigmoid, torch.tanh]
batch_sizes = [16, 32, 64, 128, 256, 512, 1024]

if population_size <= 0:
    raise ValueError("Population size must be a positive integer")

population = [[random.choice(batch_sizes), random.choice(activation_funcs)] for _ in range(population_size)]

# Ensure the population list is not empty
if not population:
    raise ValueError("Population is empty. Check population initialization.")

# Calculate initial fitness scores
fitness_scores = [calculate_fitness(X_train, y_train, X_val, y_val, individual[0], individual[1]) for individual in population]

# Check if fitness_scores list is empty after initialization
if not fitness_scores:
    raise ValueError("Initial fitness scores list is empty. Check the calculate_fitness function.")

for generation in range(num_generations):
    # Recalculate fitness scores for the current population
    fitness_scores = [calculate_fitness(X_train, y_train, X_val, y_val, individual[0], individual[1]) for individual in population]
    
    # Check if fitness_scores list is empty in the loop
    if not fitness_scores:
        raise ValueError(f"Fitness scores list is empty in generation {generation}. Check the calculate_fitness function.")

    # Selection
    parents = select_parents(population, fitness_scores)
    
    # Crossover
    children = []
    if len(parents) > 1:
        for i in range(0, len(parents) - 1, 2):
            child1, child2 = crossover(parents[i], parents[i+1])
            children.extend([child1, child2])

        # Handling the case where the number of parents is odd
        if len(parents) % 2 != 0:
            last_parent = parents[-1]
            second_last_parent = parents[-2]
            child1, child2 = crossover(last_parent, second_last_parent)
            children.extend([child1, child2])
    else:
        # Handle the scenario when there aren't enough parents
        # This could include reusing existing parents or creating new random individuals
        # For example, reusing existing parents:
        if parents:
            children = [list(parent) for parent in parents]  # Duplicate each parent
    
    # Mutation
    mutated_children = [mutate(child, activation_funcs, batch_sizes) for child in children]
    
    # Update population (age-based selection can be applied here)
    population = mutated_children  # For simplicity, replacing the old population

    # Plotting or printing the average and highest fitness score of the population

best_individual = max(zip(population, fitness_scores), key=lambda x: x[1])[0]

# Assuming best_individual is a list [best_batch_size, best_activation_func]
best_batch_size, best_activation_func = best_individual

# Create a new model instance
final_model = TwoLayerNet(X_train.shape[1], 128, 10)

# Train the final model
train_model(final_model, np.concatenate((X_train, X_val)), np.concatenate((y_train, y_val)), best_batch_size, best_activation_func)

# Evaluate the model on the test set
X_test_tensor = torch.tensor(X_test, dtype=torch.float32)
y_test_tensor = torch.tensor(y_test, dtype=torch.int64)
outputs = final_model(X_test_tensor, best_activation_func)
predicted = torch.argmax(outputs, 1)
test_f1 = f1_score(y_test_tensor.numpy(), predicted.numpy(), average='macro')

print(f"Test F1 Score: {test_f1}")

ValueError: Fitness scores list is empty in generation 5. Check the calculate_fitness function.

In [1]:
import random
import numpy as np
import torch
import torch.nn as nn
import torch.nn.functional as F
from sklearn.metrics import f1_score
from sklearn.model_selection import train_test_split
import logging

# Set up logging
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')

# Neural Network Class
class TwoLayerNet(nn.Module):
    def __init__(self, input_size, hidden_size, num_classes):
        super(TwoLayerNet, self).__init__()
        self.fc1 = nn.Linear(input_size, hidden_size)
        self.fc2 = nn.Linear(hidden_size, num_classes)

    def forward(self, x, activation_func=F.relu):
        out = activation_func(self.fc1(x))
        out = self.fc2(out)
        return out
    
class GeneticAlgorithm:
    def __init__(self, X_train, y_train, X_val, y_val, population_size, num_generations):
        self.X_train, self.y_train = X_train, y_train
        self.X_val, self.y_val = X_val, y_val
        self.population_size = population_size
        self.num_generations = num_generations
        self.population = self._initialize_population()
        self.activation_funcs = [F.relu, torch.sigmoid, torch.tanh]
        self.batch_sizes = [16, 32, 64, 128, 256, 512, 1024]

    def _initialize_population(self):
        return [[random.choice(self.batch_sizes), random.choice(self.activation_funcs)] for _ in range(self.population_size)]

    def calculate_fitness(self, individual):
        model = TwoLayerNet(self.X_train.shape[1], 128, 10)
        train_model(model, self.X_train, self.y_train, individual[0], individual[1])
        X_val_tensor = torch.tensor(self.X_val, dtype=torch.float32)
        y_val_tensor = torch.tensor(self.y_val, dtype=torch.int64)
        outputs = model(X_val_tensor, individual[1])
        predicted = torch.argmax(outputs, 1)
        f1 = f1_score(y_val_tensor.numpy(), predicted.numpy(), average='macro')
        return f1

    def select_parents(self, fitness_scores):
        total_fitness = sum(fitness_scores)
        selection_probs = [f / total_fitness for f in fitness_scores]
        selected_indices = np.random.choice(range(len(self.population)), size=len(self.population)//2, p=selection_probs)
        return [self.population[i] for i in selected_indices]

    def crossover(self, parent1, parent2):
        crossover_point = random.randint(1, len(parent1) - 1)
        return parent1[:crossover_point] + parent2[crossover_point:], parent2[:crossover_point] + parent1[crossover_point:]

    def mutate(self, child):
        mutation_rate = 0.1
        if random.random() < mutation_rate:
            gene_to_mutate = random.choice(['batch_size', 'activation_func'])
            if gene_to_mutate == 'batch_size':
                child[0] = random.choice(self.batch_sizes)
            else:
                child[1] = random.choice(self.activation_funcs)
        return child

    def run(self):
        for generation in range(self.num_generations):
            logging.info(f"Generation {generation+1}/{self.num_generations}")
            fitness_scores = [self.calculate_fitness(individual) for individual in self.population]
            parents = self.select_parents(fitness_scores)
            children = []
            for i in range(0, len(parents), 2):
                child1, child2 = self.crossover(parents[i], parents[min(i+1, len(parents)-1)])
                children.extend([child1, child2])
            mutated_children = [self.mutate(child) for child in children]
            self.population = mutated_children

            # Optionally, log or print the best fitness score in this generation
            best_fitness = max(fitness_scores)
            logging.info(f"Best Fitness in Generation {generation+1}: {best_fitness}")

class DataHandler:
    def __init__(self, train_X_path, train_y_path, test_X_path, test_y_path):
        self.train_X_path = train_X_path
        self.train_y_path = train_y_path
        self.test_X_path = test_X_path
        self.test_y_path = test_y_path

    def load_data(self):
        X_train = np.load(self.train_X_path)
        y_train = np.load(self.train_y_path)
        X_test = np.load(self.test_X_path)
        y_test = np.load(self.test_y_path)

        X_train, X_val, y_train, y_val = train_test_split(X_train, y_train, test_size=0.2)
        X_train = X_train.reshape(X_train.shape[0], -1)
        X_val = X_val.reshape(X_val.shape[0], -1)
        X_test = X_test.reshape(X_test.shape[0], -1)

        return X_train, y_train, X_val, y_val, X_test, y_test
    
# Main execution
def main():
    try:
        data_handler = DataHandler('./04_hw_data/train_X.npy', './04_hw_data/train_y.npy',
                                   './04_hw_data/test_X.npy', './04_hw_data/test_y.npy')
        X_train, y_train, X_val, y_val, X_test, y_test = data_handler.load_data()

        ga = GeneticAlgorithm(X_train, y_train, X_val, y_val, 10, 20)
        ga.run()
    except Exception as e:
        logging.error(f"An error occurred: {e}")

if __name__ == "__main__":
    main()

2023-12-06 14:51:11,062 - ERROR - An error occurred: 'GeneticAlgorithm' object has no attribute 'batch_sizes'
