Single-Core Scratch Implementation

In [None]:
import numpy as np
import time
import psutil
import os
import csv
from torchvision import datasets, transforms

process = psutil.Process(os.getpid())
output_csv = "scratch_cnn_singlecore.csv"


transform = transforms.Compose([transforms.ToTensor()])
cifar10 = datasets.CIFAR10(root='./data', train=True, download=True, transform=transform)
full_data = np.stack([np.array(img[0].numpy(), dtype=np.float32) for img in cifar10])

# CNN Weights and Biases
weights1 = np.random.rand(8, 3, 3, 3).astype(np.float32)
bias1 = np.random.rand(8).astype(np.float32)
weights2 = np.random.rand(16, 8, 3, 3).astype(np.float32)
bias2 = np.random.rand(16).astype(np.float32)
weights3 = np.random.rand(32, 16, 3, 3).astype(np.float32)
bias3 = np.random.rand(32).astype(np.float32)

# CNN Operations
def relu(x):
    return np.maximum(0, x)

def conv2d(x, w, b):
    out_ch, in_ch, k, _ = w.shape
    h, w_ = x.shape[1], x.shape[2]
    out = np.zeros((out_ch, h - k + 1, w_ - k + 1), dtype=np.float32)
    for oc in range(out_ch):
        for ic in range(in_ch):
            for i in range(h - k + 1):
                for j in range(w_ - k + 1):
                    out[oc, i, j] += np.sum(x[ic, i:i+k, j:j+k] * w[oc, ic])
        out[oc] += b[oc]
    return out

def max_pool2d(x, size=2, stride=2):
    c, h, w = x.shape
    out_h = h // stride
    out_w = w // stride
    pooled = np.zeros((c, out_h, out_w), dtype=np.float32)
    for ch in range(c):
        for i in range(out_h):
            for j in range(out_w):
                pooled[ch, i, j] = np.max(x[ch, i*stride:i*stride+size, j*stride:j*stride+size])
    return pooled

def forward_pass(x, w1, b1, w2, b2, w3, b3):
    x = conv2d(x, w1, b1)
    x = relu(x)
    x = max_pool2d(x)
    x = conv2d(x, w2, b2)
    x = relu(x)
    x = max_pool2d(x)
    x = conv2d(x, w3, b3)
    x = relu(x)
    x = max_pool2d(x)
    return x

# Logging CSV
with open(output_csv, mode='w', newline='') as file:
    writer = csv.writer(file)
    writer.writerow(["Image_Index", "CPU_Usage(%)", "RAM_Usage(MB)", "Time_Per_Image(s)"])

    print("\n[Scratch CNN - Single Core] Starting forward pass on 50,000 images...")
    total_start = time.time()

    for i, image in enumerate(full_data):
        start = time.time()
        output = forward_pass(image, weights1, bias1, weights2, bias2, weights3, bias3)
        end = time.time()

        time_per_image = end - start


        if i % 500 == 0 or i == 0:
            cpu_percent = process.cpu_percent(interval=0.1)
            ram_usage = process.memory_info().rss / (1024 ** 2)
            print(f"Image {i}: CPU={cpu_percent:.2f}%, RAM={ram_usage:.2f} MB, Time={time_per_image:.4f}s")
        else:
            cpu_percent = ''
            ram_usage = ''

        writer.writerow([i, f"{cpu_percent}", f"{ram_usage}", f"{time_per_image:.6f}"])

    total_end = time.time()
    print(f"\nTotal time for 50,000 images: {total_end - total_start:.2f} seconds")
    print(f"Last output shape: {output.shape}")

print(f"Results saved to: {output_csv}")


Multi-Core Scratch Implementation

In [None]:
import torch
import torchvision
import torchvision.transforms as transforms
import numpy as np
from numba import njit, prange, set_num_threads
import psutil, time, csv, os

# Configure core counts for multi-core runs
core_counts = [1, 2, 4, 6, 8, 10]

# Set up CIFAR-10 dataset
transform = transforms.Compose([transforms.ToTensor()])
cifar10 = torchvision.datasets.CIFAR10(root='./data', train=True, download=True, transform=transform)
images = np.stack([np.array(img[0].numpy(), dtype=np.float32) for img in cifar10])  # Shape: [50000, 3, 32, 32]

# Initialize random weights and biases
weights1 = np.random.rand(8, 3, 3, 3).astype(np.float32)
bias1 = np.random.rand(8).astype(np.float32)
weights2 = np.random.rand(16, 8, 3, 3).astype(np.float32)
bias2 = np.random.rand(16).astype(np.float32)
weights3 = np.random.rand(32, 16, 3, 3).astype(np.float32)
bias3 = np.random.rand(32).astype(np.float32)

# Define CNN forward pass from scratch with Numba JIT
@njit(parallel=True)
def relu(x):
    return np.maximum(0, x)

@njit(parallel=True)
def conv2d(x, w, b):
    out_ch, in_ch, k, _ = w.shape
    h, w_ = x.shape[1], x.shape[2]
    out = np.zeros((out_ch, h - k + 1, w_ - k + 1), dtype=np.float32)
    for oc in prange(out_ch):
        for ic in range(in_ch):
            for i in range(h - k + 1):
                for j in range(w_ - k + 1):
                    out[oc, i, j] += np.sum(x[ic, i:i + k, j:j + k] * w[oc, ic])
        out[oc] += b[oc]
    return out

@njit(parallel=True)
def max_pool2d(x, size=2, stride=2):
    c, h, w = x.shape
    out_h = h // stride
    out_w = w // stride
    pooled = np.zeros((c, out_h, out_w), dtype=np.float32)
    for ch in prange(c):
        for i in range(out_h):
            for j in range(out_w):
                pooled[ch, i, j] = np.max(x[ch, i*stride:i*stride+size, j*stride:j*stride+size])
    return pooled

@njit(parallel=True)
def forward_pass(x, w1, b1, w2, b2, w3, b3):
    x = conv2d(x, w1, b1)
    x = relu(x)
    x = max_pool2d(x)
    x = conv2d(x, w2, b2)
    x = relu(x)
    x = max_pool2d(x)
    x = conv2d(x, w3, b3)
    x = relu(x)
    x = max_pool2d(x)
    return x

# Prepare CSV
csv_file = "cnn_fromscratch_multicore_metrics.csv"
process = psutil.Process(os.getpid())

with open(csv_file, mode='w', newline='') as file:
    writer = csv.writer(file)
    writer.writerow(["Core_Count", "Image_Index", "CPU_Usage(%)", "RAM_Usage(MB)", "Time_Per_Image(s)"])

    for core_count in core_counts:
        set_num_threads(core_count)
        print(f"\n======== Running on {core_count} Core(s) ========")
        total_start = time.time()

        for i in range(len(images)):
            img = images[i]
            start = time.time()
            output = forward_pass(img, weights1, bias1, weights2, bias2, weights3, bias3)
            end = time.time()

            cpu_percent = process.cpu_percent(interval=None)
            ram_usage = process.memory_info().rss / (1024 ** 2)
            time_per_image = end - start

            if i % 1000 == 0:
                print(f"[Cores={core_count}] Image {i}: CPU={cpu_percent:.2f}%, RAM={ram_usage:.2f} MB, Time={time_per_image:.4f}s")

            writer.writerow([core_count, i, f"{cpu_percent:.2f}", f"{ram_usage:.2f}", f"{time_per_image:.6f}"])

        total_end = time.time()
        print(f"[Cores={core_count}] Total Time: {total_end - total_start:.2f} seconds")
        print(f"[Cores={core_count}] Output shape: {output.shape}")

Single-Core PyTorch Implementation

In [None]:
import torch
import torchvision
import torchvision.transforms as transforms
import torch.nn as nn
import torch.optim as optim
import time
import psutil
import csv
import os

# PyTorch 1 thread
torch.set_num_threads(1)
torch.set_num_interop_threads(1)
print(f"[Single-core mode] PyTorch using {torch.get_num_threads()} thread(s)")

# Define transform
transform = transforms.Compose([transforms.ToTensor(),
                                transforms.Normalize((0.5, 0.5, 0.5), (0.5, 0.5, 0.5))])


trainset = torchvision.datasets.CIFAR10(root='./data', train=True, download=True, transform=transform)
trainloader = torch.utils.data.DataLoader(trainset, batch_size=1, shuffle=True)

testset = torchvision.datasets.CIFAR10(root='./data', train=False, download=True, transform=transform)
testloader = torch.utils.data.DataLoader(testset, batch_size=1, shuffle=False)

# CNN Model
class SimpleCNN(nn.Module):
    def __init__(self):
        super(SimpleCNN, self).__init__()
        self.conv = nn.Sequential(
            nn.Conv2d(3, 16, kernel_size=3, padding=1),
            nn.ReLU(),
            nn.MaxPool2d(2),
            nn.Conv2d(16, 32, kernel_size=3, padding=1),
            nn.ReLU(),
            nn.MaxPool2d(2),
        )
        self.fc = nn.Sequential(
            nn.Flatten(),
            nn.Linear(32 * 8 * 8, 100),
            nn.ReLU(),
            nn.Linear(100, 10)
        )

    def forward(self, x):
        x = self.conv(x)
        x = self.fc(x)
        return x

# Initialize model
model = SimpleCNN()
criterion = nn.CrossEntropyLoss()
optimizer = optim.SGD(model.parameters(), lr=0.01)

# CSV for logging
csv_file = 'pytorch_cnn_singlecore_with_accuracy.csv'
with open(csv_file, mode='w', newline='') as f:
    writer = csv.writer(f)
    writer.writerow(["Epoch", "Batch_Index", "Loss", "Training_Accuracy(%)", "Testing_Accuracy(%)", "CPU_Usage(%)", "RAM_Usage(MB)", "Time_Per_Batch(s)"])

# Calculate accuracy
def calculate_accuracy(loader):
    model.eval()
    correct = 0
    total = 0
    with torch.no_grad():
        for images, labels in loader:
            outputs = model(images)
            _, predicted = torch.max(outputs, 1)
            total += labels.size(0)
            correct += (predicted == labels).sum().item()
    return 100 * correct / total

# Training loop
num_epochs = 10
model.train()

for epoch in range(num_epochs):
    running_loss = 0.0
    correct_train = 0
    total_train = 0
    for batch_idx, (images, labels) in enumerate(trainloader):
        start = time.perf_counter()

        # Forward pass
        outputs = model(images)
        loss = criterion(outputs, labels)

        # Backward pass
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()

        # Calculate training accuracy
        _, predicted = torch.max(outputs, 1)
        total_train += labels.size(0)
        correct_train += (predicted == labels).sum().item()

        end = time.perf_counter()

        # Metrics
        cpu = psutil.cpu_percent(interval=None)
        ram = psutil.Process(os.getpid()).memory_info().rss / (1024 * 1024)
        elapsed = end - start
        train_accuracy = 100 * correct_train / total_train

        # Calculate testing accuracy at the end of each epoch
        test_accuracy = calculate_accuracy(testloader)

        with open(csv_file, mode='a', newline='') as f:
            writer = csv.writer(f)
            writer.writerow([epoch + 1, batch_idx, f"{loss.item():.4f}", f"{train_accuracy:.2f}", f"{test_accuracy:.2f}", f"{cpu:.2f}", f"{ram:.2f}", f"{elapsed:.6f}"])

        if batch_idx % 100 == 0:
            print(f"[Epoch {epoch+1}] Batch {batch_idx}: Loss={loss.item():.4f}, Train Accuracy={train_accuracy:.2f}%, Test Accuracy={test_accuracy:.2f}%, CPU={cpu:.2f}%, RAM={ram:.2f}MB, Time={elapsed:.6f}s")

print(f"CNN training (single-core) complete. Logged to: {csv_file}")



Multi-Core PyTorch Implementation

In [None]:
import torch
import torchvision
import torchvision.transforms as transforms
import torch.nn as nn
import torch.optim as optim
import time
import psutil
import csv
import os

# Use all physical CPU cores
torch.set_num_threads(psutil.cpu_count(logical=False))
torch.set_num_interop_threads(psutil.cpu_count(logical=False))
print(f"[Multi-core mode] PyTorch using {torch.get_num_threads()} thread(s)")

# Transforms
transform = transforms.Compose([
    transforms.ToTensor(),
    transforms.Normalize((0.5, 0.5, 0.5), (0.5, 0.5, 0.5))
])

# CIFAR-10 datasets
trainset = torchvision.datasets.CIFAR10(root='./data', train=True, download=True, transform=transform)
trainloader = torch.utils.data.DataLoader(trainset, batch_size=64, shuffle=True)

testset = torchvision.datasets.CIFAR10(root='./data', train=False, download=True, transform=transform)
testloader = torch.utils.data.DataLoader(testset, batch_size=64, shuffle=False)

# CNN model
class SimpleCNN(nn.Module):
    def __init__(self):
        super(SimpleCNN, self).__init__()
        self.conv = nn.Sequential(
            nn.Conv2d(3, 16, kernel_size=3, padding=1),
            nn.ReLU(),
            nn.MaxPool2d(2),
            nn.Conv2d(16, 32, kernel_size=3, padding=1),
            nn.ReLU(),
            nn.MaxPool2d(2),
        )
        self.fc = nn.Sequential(
            nn.Flatten(),
            nn.Linear(32 * 8 * 8, 100),
            nn.ReLU(),
            nn.Linear(100, 10)
        )

    def forward(self, x):
        x = self.conv(x)
        x = self.fc(x)
        return x

# Init
model = SimpleCNN()
criterion = nn.CrossEntropyLoss()
optimizer = optim.SGD(model.parameters(), lr=0.01)

# CSV log file
csv_file = 'pytorch_cnn_multicore_optimized.csv'
with open(csv_file, mode='w', newline='') as f:
    writer = csv.writer(f)
    writer.writerow(["Epoch", "Batch_Index", "Loss", "Training_Accuracy(%)", "Testing_Accuracy(%)", "CPU_Usage(%)", "RAM_Usage(MB)", "Time_Per_Batch(s)"])

# Accuracy function
def calculate_accuracy(loader):
    model.eval()
    correct = 0
    total = 0
    with torch.no_grad():
        for images, labels in loader:
            outputs = model(images)
            _, predicted = torch.max(outputs, 1)
            total += labels.size(0)
            correct += (predicted == labels).sum().item()
    return 100 * correct / total

# Training
num_epochs = 10
model.train()

for epoch in range(num_epochs):
    correct_train = 0
    total_train = 0
    for batch_idx, (images, labels) in enumerate(trainloader):
        start = time.perf_counter()

        # Forward/backward/update
        outputs = model(images)
        loss = criterion(outputs, labels)
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()

        # Training accuracy
        _, predicted = torch.max(outputs, 1)
        total_train += labels.size(0)
        correct_train += (predicted == labels).sum().item()
        train_accuracy = 100 * correct_train / total_train

        end = time.perf_counter()
        elapsed = end - start

        # Logging condition
        should_log = (batch_idx == 0) or (batch_idx % 10000 == 0) or (batch_idx == len(trainloader) - 1)
        if should_log:
            test_accuracy = calculate_accuracy(testloader)
            cpu = psutil.cpu_percent(interval=None)
            ram = psutil.Process(os.getpid()).memory_info().rss / (1024 * 1024)

            with open(csv_file, mode='a', newline='') as f:
                writer = csv.writer(f)
                writer.writerow([
                    epoch + 1, batch_idx, f"{loss.item():.4f}",
                    f"{train_accuracy:.2f}", f"{test_accuracy:.2f}",
                    f"{cpu:.2f}", f"{ram:.2f}", f"{elapsed:.6f}"
                ])

            print(f"[Epoch {epoch+1}] Batch {batch_idx}: Loss={loss.item():.4f}, Train Acc={train_accuracy:.2f}%, Test Acc={test_accuracy:.2f}%, CPU={cpu:.2f}%, RAM={ram:.2f}MB, Time={elapsed:.6f}s")

print(f"CNN training (multi-core) complete. Logged to: {csv_file}")