# Pigpen PyTorch Example
by Joe Norton

In [237]:
import torch
import torchvision
import torchvision.transforms as transforms
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim

In [238]:
import random
import os
from PIL import Image
from torch.utils.data import Dataset

class PigpenDataset(Dataset):
    def __init__(self, directory, transform=None):
        self.directory = directory
        self.transform = transform
        self.classes = ['A', 'B', 'C', 'D', 'E', 'F', 'G', 'H', 'I', 'J', 'K', 'L', 'M', 'N', 'O', 'P', 'Q', 'R', 'S', 'T', 'U', 'V', 'W', 'X', 'Y', 'Z', 'AA']
        self.images = []
        for cls in self.classes:
            cls_dir = os.path.join(directory, cls)
            self.images += [(os.path.join(cls_dir, img), cls) for img in os.listdir(cls_dir) if os.path.isfile(os.path.join(cls_dir, img))]

    def __len__(self):
        return len(self.images)

    def __getitem__(self, idx):
        img_path, cls = self.images[idx]
        image = Image.open(img_path).convert('L')
        if self.transform:
            image = self.transform(image)
        return image, self.classes.index(cls)


In [239]:
transform = transforms.Compose([
    transforms.ToTensor(),
    #transforms.Normalize((0.5,), (0.5,))  # Example normalization
])

train_dataset = PigpenDataset('train', transform=transform)
test_dataset = PigpenDataset('test', transform=transform)
valid_dataset = PigpenDataset('validation', transform=transform)

train_loader = torch.utils.data.DataLoader(train_dataset, batch_size=28, shuffle=True)
test_loader = torch.utils.data.DataLoader(test_dataset, batch_size=28, shuffle=False)
valid_loader = torch.utils.data.DataLoader(valid_dataset, batch_size=28, shuffle=False)


In [240]:
len(train_dataset), len(test_dataset), len(valid_dataset)

(40473, 13473, 13473)

In [241]:
import torch.nn as nn
import torch.nn.functional as F

# Get cpu, gpu or mps device for training.
device = (
    "cuda"
    if torch.cuda.is_available()
    else "mps"
    if torch.backends.mps.is_available()
    else "cpu"
)
print(f"Using {device} device")

class EnhancedCNN(nn.Module):
    def __init__(self):
        super(EnhancedCNN, self).__init__()
        self.flatten = nn.Flatten()
        self.conv_norm_drop_stack = nn.Sequential(
            nn.Conv2d(1, 32, kernel_size=3, padding=1),  # Assuming MNIST-like grayscale images
            nn.ReLU(),
            nn.MaxPool2d(2),  # Adjust pool size
            nn.Conv2d(32, 64, kernel_size=3, padding=1),
            nn.ReLU(),
            nn.MaxPool2d(2),  # Adjust pool size
            nn.Dropout2d(0.2)
        )

        # Adjust these sizes based on your actual output from convolutions
        self.linear_layers = nn.Sequential(
            nn.Linear(3136, 128),  # Adjust size according to output
            nn.ReLU(),
            nn.Linear(128, 28)  # 28 output classes
        )


    def forward(self, x):
        x = self.conv_norm_drop_stack(x)
        x = self.flatten(x)
        x = self.linear_layers(x)
        return x


model = EnhancedCNN().to(device)
print(model)


Using cpu device
EnhancedCNN(
  (flatten): Flatten(start_dim=1, end_dim=-1)
  (conv_norm_drop_stack): Sequential(
    (0): Conv2d(1, 32, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
    (1): ReLU()
    (2): MaxPool2d(kernel_size=2, stride=2, padding=0, dilation=1, ceil_mode=False)
    (3): Conv2d(32, 64, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
    (4): ReLU()
    (5): MaxPool2d(kernel_size=2, stride=2, padding=0, dilation=1, ceil_mode=False)
    (6): Dropout2d(p=0.2, inplace=False)
  )
  (linear_layers): Sequential(
    (0): Linear(in_features=3136, out_features=128, bias=True)
    (1): ReLU()
    (2): Linear(in_features=128, out_features=28, bias=True)
  )
)


In [242]:

# Create the network, define a loss function, and optimizer
loss_fn = nn.CrossEntropyLoss()
optimizer = optim.SGD(model.parameters(), lr=0.01, momentum=0.9)


In [243]:
for X, y in test_loader:
    print(f"Shape of X [N, C, H, W]: {X.shape}")
    print(f"Shape of y: {y.shape} {y.dtype}")
    break

Shape of X [N, C, H, W]: torch.Size([28, 1, 28, 28])
Shape of y: torch.Size([28]) torch.int64


In [244]:
def train(dataloader, model, loss_fn, optimizer):
    size = len(dataloader.dataset)
    model.train()
    for batch, (X, y) in enumerate(dataloader):
        X, y = X.to(device), y.to(device)

        # Compute prediction error
        pred = model(X)
        loss = loss_fn(pred, y)

        # Backpropagation
        loss.backward()
        optimizer.step()
        optimizer.zero_grad()

        if batch % 100 == 0:
            loss, current = loss.item(), (batch + 1) * len(X)
            print(f"loss: {loss:>7f}  [{current:>5d}/{size:>5d}]")

In [245]:
def test(dataloader, model, loss_fn):
    size = len(dataloader.dataset)
    num_batches = len(dataloader)
    model.eval()
    test_loss, correct = 0, 0
    with torch.no_grad():
        for X, y in dataloader:
            X, y = X.to(device), y.to(device)
            pred = model(X)
            test_loss += loss_fn(pred, y).item()
            correct += (pred.argmax(1) == y).type(torch.float).sum().item()
    test_loss /= num_batches
    correct /= size
    print(f"Test Error: \n Accuracy: {(100*correct):>0.1f}%, Avg loss: {test_loss:>8f} \n")

In [246]:
epochs = 5
for t in range(epochs):
    print(f"Epoch {t+1}\n-------------------------------")
    train(train_loader, model, loss_fn, optimizer)
    test(valid_loader, model, loss_fn)
print("Done!")

Epoch 1
-------------------------------
loss: 3.340752  [   28/40473]


loss: 3.021597  [ 2828/40473]
loss: 0.256468  [ 5628/40473]
loss: 0.099034  [ 8428/40473]
loss: 0.083159  [11228/40473]
loss: 0.327828  [14028/40473]
loss: 0.004263  [16828/40473]
loss: 0.008322  [19628/40473]
loss: 0.001982  [22428/40473]
loss: 0.092377  [25228/40473]
loss: 0.114578  [28028/40473]
loss: 0.002774  [30828/40473]
loss: 0.004458  [33628/40473]
loss: 0.000836  [36428/40473]
loss: 0.001324  [39228/40473]
Test Error: 
 Accuracy: 73.6%, Avg loss: 1.636913 

Epoch 2
-------------------------------
loss: 0.002860  [   28/40473]
loss: 0.000682  [ 2828/40473]
loss: 0.004994  [ 5628/40473]
loss: 0.000092  [ 8428/40473]
loss: 0.000666  [11228/40473]
loss: 0.006753  [14028/40473]
loss: 0.000970  [16828/40473]
loss: 0.002038  [19628/40473]
loss: 0.009531  [22428/40473]
loss: 0.056003  [25228/40473]
loss: 0.004283  [28028/40473]
loss: 0.003203  [30828/40473]
loss: 0.000194  [33628/40473]
loss: 0.003127  [36428/40473]
loss: 0.000100  [39228/40473]
Test Error: 
 Accuracy: 75.2%, Avg los

KeyboardInterrupt: 

In [None]:
test(test_loader, model, loss_fn)

Test Error: 
 Accuracy: 77.9%, Avg loss: 0.971381 

