In [1]:
import torch
import torch.nn as nn
import torch.optim as optim
import pandas as pd
import numpy as np

In [221]:
# Returns data tensors for images and labels (binary values 0 and 1)
def load_data(filepath='train.csv'):
    data = pd.read_csv(filepath)
    labels = data['label'].values
    pixels = data.drop('label', axis=1).values
    
    # Convert to binary (0 or 1)
    pixels = (pixels > 127).astype(np.float32)
    
    # Convert to PyTorch tensors
    pixels_tensor = torch.FloatTensor(pixels)
    labels_tensor = torch.LongTensor(labels)
    
    return pixels_tensor, labels_tensor

In [222]:
def split_data(X, y, train_ratio=0.8):
    """
    Split the data into training and testing sets.

    Parameters:
        X (torch.Tensor): The feature tensor (pixels).
        y (torch.Tensor): The label tensor.
        train_ratio (float): The proportion of data to use for training.

    Returns:
        X_train, X_test, y_train, y_test (torch.Tensor): Split datasets.
    """
    # Calculate the split index
    total_samples = X.shape[0]
    train_size = int(total_samples * train_ratio)
    test_size = total_samples - train_size

    # Randomly split the dataset
    train_indices = torch.randperm(total_samples)[:train_size]
    test_indices = torch.randperm(total_samples)[train_size:]

    X_train = X[train_indices]
    y_train = y[train_indices]
    X_test = X[test_indices]
    y_test = y[test_indices]

    return X_train, X_test, y_train, y_test


In [3]:
# Define the ScalableLinear layer without bias
class ScalableLinear(nn.Module):
    def __init__(self, in_features, out_features):
        super().__init__()
        self.weight = nn.Parameter(torch.randn(out_features, in_features, dtype=torch.float32))  # Initialize weights

    def forward(self, x):
        return torch.mm(x, self.weight.t())

    def scale_weights(self, target_min, target_max):
        """Scale the weights of the layer to a desired integer range."""
        with torch.no_grad():
            # Get the min and max values of the layer's weights
            weight_min = self.weight.min()
            weight_max = self.weight.max()

            # Compute scaling factor
            scale = (target_max - target_min) / (weight_max - weight_min)
            zero_point = target_min - weight_min * scale

            # Apply scaling to weights
            quantized_weights = torch.round(self.weight * scale + zero_point)

            # Clip to the target range (make sure no value goes outside the desired range)
            quantized_weights = torch.clamp(quantized_weights, target_min, target_max)

            # Update weights with quantized values
            self.weight.data = quantized_weights


 # - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - 
# Define the neural network with scalable layers
class ScalableNet(nn.Module):
    def __init__(self, input_size=784):
        super().__init__()
        self.layer1 = ScalableLinear(input_size, 64)
        self.layer2 = ScalableLinear(64, 64)
        self.layer3 = ScalableLinear(64, 32)
        self.layer4 = ScalableLinear(32, 10)

    def forward(self, x):
        x = self.layer1(x)
        x = torch.relu(x)
        x = self.layer2(x)
        x = torch.relu(x)
        x = self.layer3(x)
        x = torch.relu(x)
        x = self.layer4(x)
        return x

    # Helper function that scales weights directly
    def scale_weights(self, target_min, target_max):
        """Scale weights for all layers."""
        self.layer1.scale_weights(target_min, target_max)
        self.layer2.scale_weights(target_min, target_max)
        self.layer3.scale_weights(target_min, target_max)
        self.layer4.scale_weights(target_min, target_max)

In [4]:
# Entry point in training loop that scales our weights
def gradual_scale_weights(model, initial_target_min, initial_target_max, final_target_min, final_target_max, step_size, epoch, max_epochs):
    """
    Gradually scale the weights of each layer after each epoch.
    """
    # Compute the scaling range for this epoch based on the progress in training
    scale_min = initial_target_min + (final_target_min - initial_target_min) * (epoch / max_epochs)
    scale_max = initial_target_max + (final_target_max - initial_target_max) * (epoch / max_epochs)

    # Apply gradual scaling to each layer
    model.scale_weights(target_min=int(scale_min), target_max=int(scale_max))

In [395]:
# Load the data
X, y = load_data('train.csv')

# Split into training and testing datasets
X_train, X_test, y_train, y_test = split_data(X, y, train_ratio=0.8)

print(f"Training set size: {X_train.size(0)} samples")
print(f"Testing set size: {X_test.size(0)} samples")

# Train the model
model = ScalableNet()

Training set size: 33600 samples
Testing set size: 8400 samples


In [396]:
epochs=10
batch_size = 4096 * 2 * 2
initial_target_min= -64
initial_target_max= 63
final_target_min= -32
final_target_max= 31
step_size= 0.1
criterion = nn.CrossEntropyLoss()
optimizer = optim.AdamW(model.parameters(), lr=0.1)
n_samples = X_train.shape[0]
n_batches = n_samples // batch_size

In [397]:
# Training loop
for epoch in range(epochs):
    total_loss = 0
    correct = 0
    
    for i in range(n_batches):
        start_idx = i * batch_size
        end_idx = start_idx + batch_size
        batch_X = X_train[start_idx:end_idx]
        batch_y = y_train[start_idx:end_idx]
        
        # Forward pass
        outputs = model(batch_X)
        loss = criterion(outputs, batch_y)
        
        # Backward pas
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()
        
        total_loss += loss.item()
        _, predicted = torch.max(outputs.data, 1)
        correct += (predicted == batch_y).sum().item()
    
    avg_loss = total_loss / n_samples
    accuracy = correct / n_samples
    print(f'Epoch [{epoch+1}/{epochs}], Loss: {avg_loss:.4f}, Accuracy: {accuracy:.4f}')

    # Scale the weights using 
    # Gradual weight scaling after each epoch
    gradual_scale_weights(model, initial_target_min, initial_target_max, final_target_min, final_target_max, step_size, epoch, epochs)

Epoch [1/10], Loss: 0.1421, Accuracy: 0.0944
Epoch [2/10], Loss: 5380.9279, Accuracy: 0.1633
Epoch [3/10], Loss: 3282.7863, Accuracy: 0.1609
Epoch [4/10], Loss: 2145.0448, Accuracy: 0.1681
Epoch [5/10], Loss: 1372.6731, Accuracy: 0.1910
Epoch [6/10], Loss: 905.9057, Accuracy: 0.2171
Epoch [7/10], Loss: 596.5300, Accuracy: 0.2249
Epoch [8/10], Loss: 337.1610, Accuracy: 0.2298
Epoch [9/10], Loss: 213.9863, Accuracy: 0.2350
Epoch [10/10], Loss: 140.0001, Accuracy: 0.2428


In [391]:
# Check min and max values before and after scaling
print("Before scaling:")
for layer in model.children():
    if isinstance(layer, ScalableLinear):
        print(f"Layer weights min: {layer.weight.min().item()}, max: {layer.weight.max().item()}")

# # Apply scaling
# model.scale_weights(target_min=-128, target_max=127)

print("\nAfter scaling:")
for layer in model.children():
    if isinstance(layer, ScalableLinear):
        print(f"Layer weights min: {layer.weight.min().item()}, max: {layer.weight.max().item()}")


Before scaling:
Layer weights min: -35.0, max: 34.0
Layer weights min: -35.0, max: 34.0
Layer weights min: -35.0, max: 34.0
Layer weights min: -35.0, max: 34.0

After scaling:
Layer weights min: -35.0, max: 34.0
Layer weights min: -35.0, max: 34.0
Layer weights min: -35.0, max: 34.0
Layer weights min: -35.0, max: 34.0


In [332]:
# Test the model after scaling the weights
def test_model(model, X_test, y_test):
    model.eval()  # Set the model to evaluation mode
    criterion = nn.CrossEntropyLoss()
    print(X_test.shape)
    # Evaluate on the test set
    with torch.no_grad():
        outputs = model(X_test)
        loss = criterion(outputs, y_test)
        
        _, predicted = torch.max(outputs.data, 1)
        correct = (predicted == y_test).sum().item()
        accuracy = correct / y_test.size(0)
        
    print(f"Test Loss: {loss.item():.4f}, Test Accuracy: {accuracy:.4f}")
    
# Assuming you have the test set X_test and y_test available
# Run the evaluation after scaling the weights
model.scale_weights(target_min=-32, target_max=31)

# Test the model after scaling
test_model(model, X_test, y_test)


torch.Size([8400, 784])
Test Loss: 7401.6953, Test Accuracy: 0.0995


In [200]:
for i, layer in enumerate(model.children()):
        if isinstance(layer, ScalableLinear):  # Ensure that the layer is of type ScalableLinear
            print(f"Layer {i+1} Weights:\n", layer.weight.data)

Layer 1 Weights:
 tensor([[ -4.,   4.,  -4.,  ...,   8.,  -4.,   0.],
        [  4.,   0.,   8.,  ...,   0.,  -8.,  15.],
        [ 17.,   4., -13.,  ...,   8.,  11.,   8.],
        ...,
        [  8.,   4.,  -4.,  ...,  11., -11.,  -8.],
        [  4., -11.,   0.,  ...,   8.,   0.,  17.],
        [ -4.,  15.,  21.,  ...,   0.,  -4.,   4.]])
Layer 2 Weights:
 tensor([[ 19.,  23.,  12.,  ...,  -4.,  17., -13.],
        [  0.,  27.,   8.,  ...,   8.,   8.,   8.],
        [ -4.,   8.,   0.,  ...,   8.,   8.,  -8.],
        ...,
        [ 12.,   8.,  -4.,  ...,  -8.,  -4.,  19.],
        [ -4.,  -4.,   0.,  ...,  12., -23.,  23.],
        [  4.,   4.,   0.,  ...,   0.,  -4.,   4.]])
Layer 3 Weights:
 tensor([[ -4.,  -1.,  -8.,  ...,   4.,   8., -17.],
        [ -8.,   4.,  -8.,  ...,  12.,  -4.,   8.],
        [  0., -12.,  -8.,  ...,   8.,  12.,   4.],
        ...,
        [-21.,   8.,   4.,  ...,  -1.,  -8.,   4.],
        [ -5.,  -8.,   0.,  ...,  -8., -27., -36.],
        [ -4.,   4., 

In [27]:
import csv

In [44]:
def save_weights_as_hex(model):
    weight_matrices = [model.layer1.weight.data, model.layer2.weight.data, model.layer3.weight.data, model.layer4.weight.data]

    for idx, weight_matrix in enumerate(weight_matrices, start=1):
        # Flatten weight matrix
        flattened_weights = weight_matrix.flatten().cpu().numpy()
        
        # Open corresponding file for saving weights
        with open(f'matrix{idx}.mif', 'w') as file:
            for weight in flattened_weights:
                # Convert directly to integer
                int_weight = int(weight.item())
                # Format as 8-digit unsigned hexadecimal
                hex_weight = f"{int_weight & 0xFFFFFFFF:08X}"
                # Write only the value
                file.write(f"{hex_weight}\n")

def save_random_image(X_train, y_train):
    # Randomly select an image index
    idx = random.randint(0, X_train.size(0) - 1)
    print(y_train[idx])
    
    # Get the corresponding image data
    image_data = X_train[idx].numpy()  # Convert to numpy array
    
    # Open file to save the image data
    with open(f'random_image.txt', 'w') as file:
        for pixel in image_data:
            # Map binary pixel directly to integer values (0 or 1)
            int_pixel = int(pixel)
            # Format as 8-digit unsigned hexadecimal
            hex_pixel = f"{int_pixel & 0xFFFFFFFF:08X}"
            # Write only the value
            file.write(f"{hex_pixel}\n")


In [42]:
import random
# Save the weights as signed 8 hexadecimal digits in index: value pairs
save_weights_as_hex(model)

In [45]:
save_random_image(X_train, y_train)

tensor(4)
