#### Dependencies

In [3]:
import torch
import numpy as np
import pandas as pd
import yaml
import torchvision.transforms as transforms
import mnist_dataset

#### Dataset

Variables

In [11]:
# True: 20x20 MNIST, False: 28x28 MNIST
small_mnist = True
# True: Binarized Images, False: Grayscale Images 
binarize_images = True  
# True: Even distribution of samples, False: Original Mnist distribution 
evenly_partitioned = True
# Batch size
batch_size = 256

Dataset Transform

In [14]:
# function to binarize an image, threshold is tunable 
def binarize(image, threshold=0.5):
    return (image > threshold).float()  

# define the transformation logic based on the toggle
if binarize_images:
    transform = transforms.Compose([
        transforms.ToTensor(),
        transforms.Lambda(lambda x: binarize(x))  # apply binarization if enabled
    ])
else:
    transform = transforms.Compose([
        transforms.ToTensor()  # just convert to tensor if not binarizing
    ])

In [None]:
# Make Dataset Evenly Partitioned
if evenly_partitioned:
    # code below is used so that all classes have the same number of samples
    train_targets = train_loader.dataset.targets
    test_targets = test_loader.dataset.targets

    train_digits_total = []
    test_digits_total = []

    for i in range(10):
        curr_tot_train = torch.sum(train_targets == i).item()
        curr_tot_test = torch.sum(test_targets == i).item()    
        train_digits_total.append(curr_tot_train)
        test_digits_total.append(curr_tot_test)

    train_digits_total, test_digits_total

    # find the minimum number of samples across all classes
    min_samples_train = min(train_digits_total)
    min_samples_test = min(test_digits_total)

    # function to trim dataset to match the minimum samples for each class and shuffle indices
    def trim_dataset(dataset, targets, min_samples):
        indices = []
        for i in range(10):
            class_indices = (targets == i).nonzero(as_tuple=True)[0]  # get indices of class i
            class_indices = class_indices[:min_samples]  # trim to min_samples
            indices.extend(class_indices)

        # shuffle indices after collecting them
        indices = torch.tensor(indices)
        indices = indices[torch.randperm(indices.size(0))]  

        return Subset(dataset, indices)

    # trim both train and test datasets to ensure all classes have the same number of samples
    trimmed_train_dataset = trim_dataset(train_loader.dataset, train_targets, min_samples_train)
    trimmed_test_dataset = trim_dataset(test_loader.dataset, test_targets, min_samples_test)

    # create DataLoaders for the trimmed datasets
    trimmed_train_loader = DataLoader(trimmed_train_dataset, batch_size=batch_size, shuffle=True, pin_memory=True, drop_last=True)
    trimmed_test_loader = DataLoader(trimmed_test_dataset, batch_size=batch_size, shuffle=False, pin_memory=True, drop_last=True)

    # verify the lengths of the trimmed datasets
    len(trimmed_train_loader.dataset), len(trimmed_test_loader.dataset)

    train_dataset = trimmed_train_dataset
    test_dataset = trimmed_test_dataset
    train_loader = trimmed_train_loader
    test_loader = trimmed_test_loader

In [9]:
# populate with MNIST or other datasets we want to use

#### Model Hyperparameters

In [5]:
# populate by reading the hyperparameters for different models we want to test
# also converts csv into yaml config file

# converts the hyperparameters into a config.yaml file to train multiple models
first_in_dim = 256 # 16x16
filename = "mnist_config_16x16.yaml"  

# Read the CSV file into a DataFrame
df = pd.read_csv("hyperparameters.csv")

# Convert the DataFrame to a list of dictionaries
models = df.to_dict(orient="records")

# Create the YAML structure
yaml_structure = {"models": {}}

# Function to round the number to the nearest multiple of the output size
def round_to_nearest_multiple(value, multiple):
    return multiple * round(value / multiple)

# populate the YAML structure with models
for i, model in enumerate(models, start=1):
    # zero-pad model names to 3 digits 
    model_name = f"model_{str(i).zfill(3)}"
    layers_config = {}
    
    for layer in range(1, model["H"] + 1):
        # zero-pad layer names to 3 digits
        layer_name = f"LogicLayer{str(layer).zfill(3)}"
        
        # Adjust in_dim to the nearest multiple of 10
        in_dim = first_in_dim if layer == 1 else round_to_nearest_multiple(model["W"], 10)
        
        # Adjust out_dim to the nearest multiple of 10
        out_dim = round_to_nearest_multiple(model["W"], 10)
        
        layers_config[layer_name] = {
            "in_dim": in_dim,
            "out_dim": out_dim,
            "device": "cuda",
            "implementation": "cuda",
            "connections": "random",
            "grad_factor": 2, # we can try different grad_factor values as well
        }
    
    yaml_structure["models"][model_name] = {
        "input_dim": first_in_dim, 
        "output_size": 10, # for MNIST classification
        "tau": model["tau"],
        "learning_rate": model["lr"],
        "layers_config": layers_config,
    }

# Save to a YAML file
with open(f'{filename}', "w") as file:
    yaml.dump(yaml_structure, file, default_flow_style=False)

print(f"YAML file '{filename}' generated successfully.")

YAML file 'mnist_config_16x16.yaml' generated successfully.


#### Model Definition

In [2]:
# defines the class for the DiffLogic architecture, etc

In [None]:
class DiffLogic(nn.Module):
    def __init__(self, layers_config, output_size, tau=30):
        """
        Initializes the DiffLogic model with the specified layer configurations, output size, and temperature parameter.

        Args:
            layers_config (dict): Configuration for each logic layer, including dimensions, device, implementation, connections, and grad factor.
            output_size (int): The number of output groups.
            tau (int): Temperature parameter for the GroupSum operation.
        """
        super(DiffLogic, self).__init__()
        self.flatten = nn.Flatten()
        
        layers = []
        for layer_name, config in layers_config.items():
            layer = LogicLayer(
                in_dim=config['in_dim'],
                out_dim=config['out_dim'],
                device=config['device'],
                implementation=config['implementation'],
                connections=config['connections'],
                grad_factor=config['grad_factor']       
            )
            layers.append(layer)
            print(layer)
        
        self.logic_layers = nn.Sequential(*layers)
        
        self.group = GroupSum(k=output_size, tau=tau)
    
    def forward(self, x):
        """
        Forward pass of the DiffLogic model.

        Args:
            x (torch.Tensor): Input tensor.

        Returns:
            torch.Tensor: Output tensor after processing through the logic layers and grouping operation.
        """
        # Move tensor to GPU
        if torch.cuda.is_available():
            x = x.to('cuda')          
        x = self.flatten(x)
        logits = self.logic_layers(x)
        group = self.group(logits)
        return group

#### Model Training

In [3]:
# training loop for all models

#### Model Testing

In [4]:
# testing loop to test inferences

#### Logic Gate and Connection Extraction

In [8]:
# extracts the learned gates and connections from the difflogic model

#### Model Optimization (?)

In [7]:
# maybe remove the unused nodes to increase inference speed / decrease energy consumption?

#### Verilog Conversion

In [6]:
# converts the learned gates to verilog or vhdl 