<a href="https://colab.research.google.com/github/nikalitt1/Clementine-Agriculture/blob/main/Population_based_NN.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [59]:
# Modified version of your script to use a small CNN instead of MLP
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader
from torchvision.datasets import Omniglot
import torchvision.transforms as transforms
from sklearn.preprocessing import LabelEncoder
from sklearn.model_selection import train_test_split
import numpy as np
from sklearn.metrics import accuracy_score
import random

# ====== Setup ======
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
torch.manual_seed(42)
np.random.seed(42)
random.seed(42)

# ====== Load Omniglot Dataset ======
transform = transforms.Compose([
    transforms.Resize((28, 28)),
    transforms.ToTensor(),
])

train_dataset = Omniglot(root='./data', background=True, download=True, transform=transform)
test_dataset = Omniglot(root='./data', background=False, download=True, transform=transform)

full_data = train_dataset + test_dataset

X, y = [], []
for img, label in full_data:
    X.append(img.numpy())
    y.append(label)

X = np.stack(X)
y = np.array(y)

max_classes = 50
mask = y < max_classes
X = X[mask]
y = y[mask]

label_encoder = LabelEncoder()
y_encoded = label_encoder.fit_transform(y)

X_trainval, X_test, y_trainval, y_test = train_test_split(
    X, y_encoded, test_size=0.15, random_state=42, stratify=y_encoded)
X_train, X_val, y_train, y_val = train_test_split(
    X_trainval, y_trainval, test_size=0.1765, random_state=42, stratify=y_trainval)

X_train_t = torch.tensor(X_train, dtype=torch.float32)
y_train_t = torch.tensor(y_train, dtype=torch.long)
X_val_t = torch.tensor(X_val, dtype=torch.float32)
y_val_t = torch.tensor(y_val, dtype=torch.long)
X_test_t = torch.tensor(X_test, dtype=torch.float32)
y_test_t = torch.tensor(y_test, dtype=torch.long)

batch_size = 64
train_loader = DataLoader(torch.utils.data.TensorDataset(X_train_t, y_train_t), batch_size=batch_size, shuffle=True)
val_loader = DataLoader(torch.utils.data.TensorDataset(X_val_t, y_val_t), batch_size=batch_size)
test_loader = DataLoader(torch.utils.data.TensorDataset(X_test_t, y_test_t), batch_size=batch_size)

# ====== Define CNN Model ======
class SmallCNN(nn.Module):
    def __init__(self, num_classes=max_classes):
        super(SmallCNN, self).__init__()
        self.net = nn.Sequential(
            nn.Conv2d(1, 32, kernel_size=3, padding=1),
            nn.ReLU(),
            nn.MaxPool2d(2),
            nn.Conv2d(32, 64, kernel_size=3, padding=1),
            nn.ReLU(),
            nn.MaxPool2d(2),
            nn.Flatten(),
            nn.Linear(64 * 7 * 7, 128),
            nn.ReLU(),
            nn.Dropout(0.2),
            nn.Linear(128, num_classes)
        )

    def forward(self, x):
        return self.net(x)

# ====== Evaluation function ======
def evaluate_model(model, dataloader):
    model.eval()
    all_preds, all_targets = [], []
    with torch.no_grad():
        for xb, yb in dataloader:
            xb, yb = xb.to(device), yb.to(device)
            xb = xb.view(-1, 1, 28, 28)
            preds = torch.argmax(model(xb), dim=1)
            all_preds.append(preds.cpu().numpy())
            all_targets.append(yb.cpu().numpy())
    return accuracy_score(np.concatenate(all_targets), np.concatenate(all_preds))

# ====== Training function ======
def train_one_epoch(model, optimizer, criterion, dataloader, model_id=None, epoch_num=None, log_every=100):
    model.train()
    running_loss = 0.0
    for batch_idx, (xb, yb) in enumerate(dataloader, 1):
        xb, yb = xb.to(device), yb.to(device)
        xb = xb.view(-1, 1, 28, 28)
        optimizer.zero_grad()
        loss = criterion(model(xb), yb)
        loss.backward()
        optimizer.step()
        running_loss += loss.item()
        if batch_idx % log_every == 0:
            avg_loss = running_loss / log_every
            running_loss = 0.0
            print(f"Model {model_id} - Epoch {epoch_num} - Batch {batch_idx}/{len(dataloader)} - Loss: {avg_loss:.4f}")

# ====== Evolutionary Training ======
def evolution_training_until_one(population_size=10, survival_rate=0.6, epochs_per_gen=10, log_every_batches=100):
    population, optimizers = [], []
    criterion = nn.CrossEntropyLoss()
    for _ in range(population_size):
        model = SmallCNN().to(device)
        optimizer = optim.Adam(model.parameters(), lr=0.001)
        population.append(model)
        optimizers.append(optimizer)

    best_model, best_optimizer, best_score = None, None, -float('inf')
    generation = 0

    while len(population) > 1:
        generation += 1
        print(f"\nðŸŒ± Generation {generation} | Population size: {len(population)}")

        for i, (model, optimizer) in enumerate(zip(population, optimizers)):
            for epoch in range(1, epochs_per_gen + 1):
                train_one_epoch(model, optimizer, criterion, train_loader, model_id=i+1, epoch_num=epoch, log_every=log_every_batches)

        scores = [evaluate_model(m, val_loader) for m in population]
        sorted_indices = np.argsort(scores)[::-1]

        if scores[sorted_indices[0]] > best_score:
            best_score = scores[sorted_indices[0]]
            best_model = population[sorted_indices[0]]
            best_optimizer = optimizers[sorted_indices[0]]
            print(f"  ðŸŽ‰ New best model with accuracy: {best_score:.4f}")

        survivors = max(1, int(len(population) * survival_rate))
        population = [population[i] for i in sorted_indices[:survivors]]
        optimizers = [optimizers[i] for i in sorted_indices[:survivors]]

        if best_model not in population:
            population.append(best_model)
            optimizers.append(best_optimizer)
            print("  ðŸ”„ Best model preserved with elitism")

        print(f"  âœ… Best Acc This Gen: {scores[sorted_indices[0]]:.4f} | Worst: {scores[sorted_indices[-1]]:.4f}")

    torch.save(best_model.state_dict(), "best_omniglot_cnn_model.pth")
    return best_model

# ====== Run Training ======
best_model = evolution_training_until_one(
    population_size=1000,
    survival_rate=0.6,
    epochs_per_gen=10,
    log_every_batches=100
)

# ====== Final Evaluation ======
test_accuracy = evaluate_model(best_model, test_loader)
print(f"\nðŸŽ¯ Final test accuracy on Omniglot: {test_accuracy * 100:.2f}%")



ðŸŒ± Generation 1 | Population size: 10000
  - Training model 1/10000
  - Training model 2/10000
  - Training model 3/10000
  - Training model 4/10000
  - Training model 5/10000
  - Training model 6/10000
  - Training model 7/10000
  - Training model 8/10000
  - Training model 9/10000
  - Training model 10/10000
  - Training model 11/10000
  - Training model 12/10000
  - Training model 13/10000
  - Training model 14/10000
  - Training model 15/10000
  - Training model 16/10000
  - Training model 17/10000
  - Training model 18/10000
  - Training model 19/10000
  - Training model 20/10000
  - Training model 21/10000
  - Training model 22/10000
  - Training model 23/10000
  - Training model 24/10000
  - Training model 25/10000
  - Training model 26/10000
  - Training model 27/10000
  - Training model 28/10000
  - Training model 29/10000
  - Training model 30/10000
  - Training model 31/10000
  - Training model 32/10000
  - Training model 33/10000
  - Training model 34/10000
  - Training 

OutOfMemoryError: CUDA out of memory. Tried to allocate 2.00 MiB. GPU 0 has a total capacity of 14.74 GiB of which 2.12 MiB is free. Process 6393 has 14.74 GiB memory in use. Of the allocated memory 13.39 GiB is allocated by PyTorch, and 1.22 GiB is reserved by PyTorch but unallocated. If reserved but unallocated memory is large try setting PYTORCH_CUDA_ALLOC_CONF=expandable_segments:True to avoid fragmentation.  See documentation for Memory Management  (https://pytorch.org/docs/stable/notes/cuda.html#environment-variables)

In [1]:
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader
from torchvision.datasets import Omniglot
import torchvision.transforms as transforms
from sklearn.preprocessing import LabelEncoder
from sklearn.model_selection import train_test_split
import numpy as np
from sklearn.metrics import accuracy_score

# Define device
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

# Resize transform: resize to 28x28 + ToTensor()
transform = transforms.Compose([
    transforms.Resize((28, 28)),  # Resize images to 28x28
    transforms.ToTensor(),        # Convert PIL image to tensor and scale to [0,1]
])

# Load Omniglot dataset (background=True for training, False for testing)
train_dataset = Omniglot(root='./data', background=True, download=True, transform=transform)
test_dataset = Omniglot(root='./data', background=False, download=True, transform=transform)

# Combine train and test for one dataset
full_data = train_dataset + test_dataset

# Extract data and labels into numpy arrays
X = []
y = []
for img, label in full_data:
    X.append(img.numpy())  # Keep as image tensor (1, 28, 28)
    y.append(label)

X = np.stack(X)
y = np.array(y)

# Limit to first 50 classes to keep problem manageable
max_classes = 50
mask = y < max_classes
X = X[mask]
y = y[mask]

# Encode labels 0..49
label_encoder = LabelEncoder()
y_encoded = label_encoder.fit_transform(y)

# Train/val/test split: 70% train, 15% val, 15% test
X_trainval, X_test, y_trainval, y_test = train_test_split(
    X, y_encoded, test_size=0.15, random_state=42, stratify=y_encoded
)
X_train, X_val, y_train, y_val = train_test_split(
    X_trainval, y_trainval, test_size=0.1765, random_state=42, stratify=y_trainval
)

# Convert to tensors
X_train_t = torch.tensor(X_train, dtype=torch.float32)
y_train_t = torch.tensor(y_train, dtype=torch.long)
X_val_t = torch.tensor(X_val, dtype=torch.float32)
y_val_t = torch.tensor(y_val, dtype=torch.long)
X_test_t = torch.tensor(X_test, dtype=torch.float32)
y_test_t = torch.tensor(y_test, dtype=torch.long)

# Create dataloaders
batch_size = 64
train_loader = DataLoader(torch.utils.data.TensorDataset(X_train_t, y_train_t), batch_size=batch_size, shuffle=True)
val_loader = DataLoader(torch.utils.data.TensorDataset(X_val_t, y_val_t), batch_size=batch_size)
test_loader = DataLoader(torch.utils.data.TensorDataset(X_test_t, y_test_t), batch_size=batch_size)

# ====== Define CNN Model ======
class OmniglotCNN(nn.Module):
    def __init__(self, num_classes=max_classes):
        super(OmniglotCNN, self).__init__()
        self.features = nn.Sequential(
            nn.Conv2d(1, 32, kernel_size=3, padding=1),  # input channel=1 (grayscale)
            nn.ReLU(),
            nn.MaxPool2d(2),  # 28x28 -> 14x14
            nn.Dropout(0.25),

            nn.Conv2d(32, 64, kernel_size=3, padding=1),
            nn.ReLU(),
            nn.MaxPool2d(2),  # 14x14 -> 7x7
            nn.Dropout(0.25)
        )
        self.classifier = nn.Sequential(
            nn.Flatten(),
            nn.Linear(64 * 7 * 7, 128),
            nn.ReLU(),
            nn.Dropout(0.5),
            nn.Linear(128, num_classes)
        )

    def forward(self, x):
        x = self.features(x)
        x = self.classifier(x)
        return x

# Evaluation function
def evaluate_model(model, dataloader):
    model.eval()
    all_preds = []
    all_targets = []
    with torch.no_grad():
        for xb, yb in dataloader:
            xb, yb = xb.to(device), yb.to(device)
            logits = model(xb)  # no flatten before CNN
            preds = torch.argmax(logits, dim=1)
            all_preds.append(preds.cpu().numpy())
            all_targets.append(yb.cpu().numpy())
    all_preds = np.concatenate(all_preds)
    all_targets = np.concatenate(all_targets)
    acc = accuracy_score(all_targets, all_preds)
    return acc

# Training function for one epoch
def train_one_epoch(model, optimizer, criterion, dataloader):
    model.train()
    running_loss = 0
    for xb, yb in dataloader:
        xb, yb = xb.to(device), yb.to(device)
        optimizer.zero_grad()
        logits = model(xb)
        loss = criterion(logits, yb)
        loss.backward()
        optimizer.step()
        running_loss += loss.item() * xb.size(0)
    return running_loss / len(dataloader.dataset)

# Instantiate model, loss, optimizer
model = OmniglotCNN().to(device)
criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr=0.001)

# Optional: Enable cudnn benchmark for performance if CUDA
if device.type == 'cuda':
    torch.backends.cudnn.benchmark = True

# Training loop
num_epochs = 150
for epoch in range(num_epochs):
    train_loss = train_one_epoch(model, optimizer, criterion, train_loader)
    if (epoch + 1) % 5 == 0 or epoch == 0:
        val_acc = evaluate_model(model, val_loader)
        print(f"Epoch {epoch+1}/{num_epochs} - Train Loss: {train_loss:.4f} - Val Accuracy: {val_acc:.4f}")

# Final evaluation on test set
test_acc = evaluate_model(model, test_loader) * 100
print(f"\nFinal Test Accuracy: {test_acc:.4f} %")


100%|â–ˆâ–ˆâ–ˆâ–ˆâ–ˆâ–ˆâ–ˆâ–ˆâ–ˆâ–ˆ| 9.46M/9.46M [00:00<00:00, 311MB/s]
100%|â–ˆâ–ˆâ–ˆâ–ˆâ–ˆâ–ˆâ–ˆâ–ˆâ–ˆâ–ˆ| 6.46M/6.46M [00:00<00:00, 366MB/s]


Epoch 1/150 - Train Loss: 3.9315 - Val Accuracy: 0.0199
Epoch 5/150 - Train Loss: 3.9133 - Val Accuracy: 0.0199
Epoch 10/150 - Train Loss: 3.8744 - Val Accuracy: 0.0565
Epoch 15/150 - Train Loss: 3.4445 - Val Accuracy: 0.1329
Epoch 20/150 - Train Loss: 2.9937 - Val Accuracy: 0.2027
Epoch 25/150 - Train Loss: 2.6437 - Val Accuracy: 0.3223
Epoch 30/150 - Train Loss: 2.3432 - Val Accuracy: 0.4086
Epoch 35/150 - Train Loss: 2.1276 - Val Accuracy: 0.4618
Epoch 40/150 - Train Loss: 1.9724 - Val Accuracy: 0.5050
Epoch 45/150 - Train Loss: 1.8483 - Val Accuracy: 0.5216
Epoch 50/150 - Train Loss: 1.7862 - Val Accuracy: 0.5415
Epoch 55/150 - Train Loss: 1.7716 - Val Accuracy: 0.5482
Epoch 60/150 - Train Loss: 1.6440 - Val Accuracy: 0.5382
Epoch 65/150 - Train Loss: 1.6386 - Val Accuracy: 0.5714
Epoch 70/150 - Train Loss: 1.5438 - Val Accuracy: 0.5880
Epoch 75/150 - Train Loss: 1.4940 - Val Accuracy: 0.5880
Epoch 80/150 - Train Loss: 1.4957 - Val Accuracy: 0.6146
Epoch 85/150 - Train Loss: 1.3926

In [60]:
import math

def generations_to_one(pop_size, survival_rate):
    # Calculate how many generations until population shrinks to 1 or less
    return 1 + math.ceil(math.log(1 / pop_size) / math.log(survival_rate))

def total_epochs_final_model(pop_size, survival_rate, epochs_per_gen):
    G = generations_to_one(pop_size, survival_rate)
    total = G * epochs_per_gen
    return total, G

# Example usage:
pop_size = 1000
survival_rate = 0.6
epochs_per_gen = 10  # for example


total, generations = total_epochs_final_model(pop_size, survival_rate, epochs_per_gen)
print(f"Generations: {generations}, Total epochs for final model: {total}")


Generations: 15, Total epochs for final model: 150


In [62]:
import torch
torch.cuda.empty_cache()