In [1]:
pip install nir

Note: you may need to restart the kernel to use updated packages.


In [6]:
pip install torch torchvision

Note: you may need to restart the kernel to use updated packages.


In [2]:
pip install snntorch

Note: you may need to restart the kernel to use updated packages.


In [9]:
pip install torch torchvision torchaudio --index-url https://download.pytorch.org/whl/cu126

Looking in indexes: https://download.pytorch.org/whl/cu126
Collecting torchaudio
  Downloading https://download.pytorch.org/whl/cu126/torchaudio-2.6.0%2Bcu126-cp312-cp312-win_amd64.whl.metadata (6.8 kB)
Collecting torch
  Downloading https://download.pytorch.org/whl/cu126/torch-2.6.0%2Bcu126-cp312-cp312-win_amd64.whl.metadata (28 kB)
Downloading https://download.pytorch.org/whl/cu126/torchaudio-2.6.0%2Bcu126-cp312-cp312-win_amd64.whl (4.2 MB)
   ---------------------------------------- 0.0/4.2 MB ? eta -:--:--
   -- ------------------------------------- 0.3/4.2 MB ? eta -:--:--
   --------------- ------------------------ 1.6/4.2 MB 5.2 MB/s eta 0:00:01
   -------------------------------- ------- 3.4/4.2 MB 6.9 MB/s eta 0:00:01
   ---------------------------------------- 4.2/4.2 MB 7.6 MB/s eta 0:00:00
Downloading https://download.pytorch.org/whl/cu126/torch-2.6.0%2Bcu126-cp312-cp312-win_amd64.whl (2496.1 MB)
   ---------------------------------------- 0.0/2.5 GB ? eta -:--:--
   ------

  You can safely remove it manually.


In [1]:
import torch

def check_cuda():
    print(torch.version.cuda)
    cuda_is_ok = torch.cuda.is_available()
    print(f"CUDA Enabled: {cuda_is_ok}")

In [None]:
print(check_cuda())

12.6
CUDA Enabled: True
None


In [14]:
import torch
import torch.nn as nn
import torch.nn.functional as F
import snntorch as snn
from snntorch import surrogate
from snntorch import functional as SF
import torchvision
import numpy as np
from tqdm import tqdm

class SCNN(nn.Module):
    def __init__(self, num_inputs=1, num_hidden=16, num_outputs=10, beta=0.95):
        super().__init__()
        
        # Input convolutional layer
        self.conv1 = nn.Conv2d(num_inputs, num_hidden, kernel_size=5, stride=2, padding=1)
        self.conv2 = nn.Conv2d(num_hidden, num_hidden, kernel_size=3, stride=1, padding=1)
        self.pool1 = nn.AvgPool2d(kernel_size=2)
        
        # Second convolutional layer
        self.conv3 = nn.Conv2d(num_hidden, 8, kernel_size=3, stride=1, padding=1)
        self.pool2 = nn.AvgPool2d(kernel_size=2)
        
        # Calculate the size of flattened features
        self.to_linear = None
        self._get_conv_output_size((1, 28, 28))
        
        # Flattening and output layer
        self.flatten = nn.Flatten()
        self.fc1 = nn.Linear(self.to_linear, num_outputs)
        
        # Spiking neuron dynamics
        self.lif1 = snn.Leaky(beta=beta, spike_grad=surrogate.fast_sigmoid())
        self.lif2 = snn.Leaky(beta=beta, spike_grad=surrogate.fast_sigmoid())
        self.lif3 = snn.Leaky(beta=beta, spike_grad=surrogate.fast_sigmoid())
        self.lif_out = snn.Leaky(beta=beta, spike_grad=surrogate.fast_sigmoid())
        
        # Initialize states
        self.mem1 = self.mem2 = self.mem3 = self.mem_out = None
    
    def _get_conv_output_size(self, shape):
        """
        Calculate the output size of the convolutional layers
        """
        with torch.no_grad():
            # Create a dummy input tensor
            batch_size = 1
            input = torch.zeros(batch_size, *shape)
            
            # Pass it through the convolutional layers
            x = self.conv1(input)
            x = self.conv2(x)
            x = self.pool1(x)
            x = self.conv3(x)
            x = self.pool2(x)
            
            # Get the flattened size
            self.to_linear = x.numel()
            print(f"Flattened size for fc1: {self.to_linear}")
    
    def forward(self, x):
        # Initialize or reset membrane potential at first timestep
        batch_size = x.size(0)
        
        # Calculate the output shape of each layer
        conv1_shape = self._conv_output_shape(x.shape, kernel_size=5, stride=2, padding=1)
        conv2_shape = self._conv_output_shape(conv1_shape, kernel_size=3, stride=1, padding=1)
        pool1_shape = self._pool_output_shape(conv2_shape, kernel_size=2, stride=2, padding=0)
        conv3_shape = self._conv_output_shape(pool1_shape, kernel_size=3, stride=1, padding=1)
        pool2_shape = self._pool_output_shape(conv3_shape, kernel_size=2, stride=2, padding=0)
        
        if self.mem1 is None:
            self.mem1 = torch.zeros(batch_size, 16, conv1_shape[0], conv1_shape[1], device=x.device)
            self.mem2 = torch.zeros(batch_size, 16, conv2_shape[0], conv2_shape[1], device=x.device)
            self.mem3 = torch.zeros(batch_size, 8, pool2_shape[0], pool2_shape[1], device=x.device)
            self.mem_out = torch.zeros(batch_size, 10, device=x.device)
        
        # First convolutional layer with LIF neuron
        cur1 = self.conv1(x)
        spk1, self.mem1 = self.lif1(cur1, self.mem1)
        
        # Second convolutional layer with LIF neuron
        cur2 = self.conv2(spk1)
        spk2, self.mem2 = self.lif2(cur2, self.mem2)
        spk2 = self.pool1(spk2)
        
        # Third convolutional layer with LIF neuron
        cur3 = self.conv3(spk2)
        spk3, self.mem3 = self.lif3(cur3, self.mem3)
        spk3 = self.pool2(spk3)
        
        # Flatten and output layer
        flat = self.flatten(spk3)
        cur_out = self.fc1(flat)
        spk_out, self.mem_out = self.lif_out(cur_out, self.mem_out)
        
        return spk_out
    
    def _conv_output_shape(self, input_shape, kernel_size, stride, padding):
        """
        Calculate the output shape of a convolutional layer
        """
        if isinstance(input_shape, torch.Size):
            height, width = input_shape[-2], input_shape[-1]
        else:
            height, width = input_shape
        
        # Convert to tuples if not already
        if isinstance(kernel_size, int):
            kernel_size = (kernel_size, kernel_size)
        if isinstance(stride, int):
            stride = (stride, stride)
        if isinstance(padding, int):
            padding = (padding, padding)
        
        h_out = (height + 2 * padding[0] - kernel_size[0]) // stride[0] + 1
        w_out = (width + 2 * padding[1] - kernel_size[1]) // stride[1] + 1
        
        return (h_out, w_out)
    
    def _pool_output_shape(self, input_shape, kernel_size, stride, padding):
        """
        Calculate the output shape of a pooling layer
        """
        return self._conv_output_shape(input_shape, kernel_size, stride, padding)
    
    def reset_states(self):
        self.mem1 = self.mem2 = self.mem3 = self.mem_out = None

# Load N-MNIST dataset
def load_nmnist():
    # Placeholder - in a real implementation, you would load the actual N-MNIST dataset
    # This is just a mock implementation using MNIST for demonstration
    transform = torchvision.transforms.Compose([
        torchvision.transforms.ToTensor(),
    ])
    
    train_dataset = torchvision.datasets.MNIST(
        root="./data", train=True, download=True, transform=transform
    )
    test_dataset = torchvision.datasets.MNIST(
        root="./data", train=False, download=True, transform=transform
    )
    
    train_loader = torch.utils.data.DataLoader(
        train_dataset, batch_size=128, shuffle=True, num_workers=0
    )
    test_loader = torch.utils.data.DataLoader(
        test_dataset, batch_size=128, shuffle=False, num_workers=0
    )
    
    return train_loader, test_loader

# Training function
def train(model, train_loader, optimizer, num_steps=100):
    model.train()
    device = next(model.parameters()).device
    
    loss_fn = SF.ce_rate_loss()
    
    for epoch in range(5):  # 5 epochs
        running_loss = 0
        correct = 0
        total = 0
        
        for data, targets in tqdm(train_loader):
            data = data.to(device)
            targets = targets.to(device)
            
            # Zero gradients
            optimizer.zero_grad()
            
            # Initialize spike recording
            spike_record = []
            
            # Forward pass through time
            for step in range(num_steps):
                spk_out = model(data)
                spike_record.append(spk_out)
            
            # Calculate loss
            spike_record = torch.stack(spike_record, dim=0)
            loss = loss_fn(spike_record, targets)
            
            # Calculate accuracy
            pred = torch.sum(spike_record, dim=0).argmax(dim=1)
            correct += (pred == targets).sum().item()
            total += targets.size(0)
            
            # Backward pass
            loss.backward()
            optimizer.step()
            
            # Reset model states for next iteration
            model.reset_states()
            
            running_loss += loss.item()
            
        print(f"Epoch {epoch+1}, Loss: {running_loss/len(train_loader)}, Accuracy: {correct/total}")

# Test function
def test(model, test_loader, num_steps=100):
    model.eval()
    device = next(model.parameters()).device
    
    correct = 0
    total = 0
    
    with torch.no_grad():
        for data, targets in tqdm(test_loader):
            data = data.to(device)
            targets = targets.to(device)
            
            # Initialize spike recording
            spike_record = []
            
            # Forward pass through time
            for step in range(num_steps):
                spk_out = model(data)
                spike_record.append(spk_out)
            
            # Calculate accuracy
            spike_record = torch.stack(spike_record, dim=0)
            pred = torch.sum(spike_record, dim=0).argmax(dim=1)
            correct += (pred == targets).sum().item()
            total += targets.size(0)
            
            # Reset model states for next iteration
            model.reset_states()
    
    print(f"Test Accuracy: {correct/total}")
    return correct/total

# Main training routine
def main():
    device = torch.device("cuda")
    
    # Create model and optimizer
    model = SCNN().to(device)
    optimizer = torch.optim.Adam(model.parameters(), lr=1e-3)
    
    # Load data
    train_loader, test_loader = load_nmnist()
    
    # Train the model
    train(model, train_loader, optimizer)
    
    # Test the model
    accuracy = test(model, test_loader)
    
    # Save the model
    torch.save(model.state_dict(), "scnn_model.pth")
    return model

# Run the training
model = main()

Flattened size for fc1: 72


  2%|▏         | 8/469 [00:04<03:51,  1.99it/s]


KeyboardInterrupt: 

## Исправить

In [29]:
import sys
import os

# Добавляем директорию, содержащую модуль 'nir', в sys.path
sys.path.append("F:/vs_file/NIR")

import nir



In [56]:
import torch
import numpy as np
import nir  # Ensure this module is correctly imported
from nir.serialization import write  # Import the write function

def convert_to_nir(model):
    # Словарь для хранения узлов, ключи - имена узлов
    node_dict = {}
    # Список для хранения ребер
    edges = []
    
    # Создаем узлы
    
    # Входной узел
    input_node_name = "input"
    node_dict[input_node_name] = nir.Input(input_type=np.array([1, 28, 28]))
    
    # Первый сверточный слой
    conv1_node_name = "conv1"
    node_dict[conv1_node_name] = nir.Conv2d(
        input_shape=(28, 28),
        weight=model.conv1.weight.data.cpu().numpy(),
        stride=(2, 2),
        padding=(1, 1),
        dilation=(1, 1),
        groups=1,
        bias=model.conv1.bias.data.cpu().numpy() if model.conv1.bias is not None else None
    )
    
    # Первый LIF слой
    lif1_node_name = "lif1"
    node_dict[lif1_node_name] = nir.LIF(
        tau=np.array([1/(1-model.lif1.beta)]),
        v_leak=np.array([0.0]),
        r=np.array([1.0]),
        v_threshold=np.array([1.0])
    )
    
    # Второй сверточный слой
    conv2_node_name = "conv2"
    node_dict[conv2_node_name] = nir.Conv2d(
        input_shape=(14, 14),  # Adjust input shape based on previous layer's output
        weight=model.conv2.weight.data.cpu().numpy(),
        stride=(1, 1),
        padding=(1, 1),
        dilation=(1, 1),
        groups=1,
        bias=model.conv2.bias.data.cpu().numpy() if model.conv2.bias is not None else None
    )
    
    # Второй LIF слой
    lif2_node_name = "lif2"
    node_dict[lif2_node_name] = nir.LIF(
        tau=np.array([1/(1-model.lif2.beta)]),
        v_leak=np.array([0.0]),
        r=np.array([1.0]),
        v_threshold=np.array([1.0])
    )
    
    # Первый пулинговый слой
    pool1_node_name = "pool1"
    node_dict[pool1_node_name] = nir.SumPool2d(
        kernel_size=(2, 2),
        stride=(2, 2),
        padding=(0, 0)
    )
    
    # Третий сверточный слой
    conv3_node_name = "conv3"
    node_dict[conv3_node_name] = nir.Conv2d(
        input_shape=(7, 7),  # Adjust input shape based on previous layer's output
        weight=model.conv3.weight.data.cpu().numpy(),
        stride=(1, 1),
        padding=(1, 1),
        dilation=(1, 1),
        groups=1,
        bias=model.conv3.bias.data.cpu().numpy() if model.conv3.bias is not None else None
    )
    
    # Третий LIF слой
    lif3_node_name = "lif3"
    node_dict[lif3_node_name] = nir.LIF(
        tau=np.array([1/(1-model.lif3.beta)]),
        v_leak=np.array([0.0]),
        r=np.array([1.0]),
        v_threshold=np.array([1.0])
    )
    
    # Второй пулинговый слой
    pool2_node_name = "pool2"
    node_dict[pool2_node_name] = nir.SumPool2d(
        kernel_size=(2, 2),
        stride=(2, 2),
        padding=(0, 0)
    )
    
    # Слой Flatten
    flatten_node_name = "flatten"
    node_dict[flatten_node_name] = nir.Flatten(
        start_dim=1,
        end_dim=3
    )
    
    # Полносвязный слой
    fc1_node_name = "fc1"
    node_dict[fc1_node_name] = nir.Affine(
        weight=model.fc1.weight.data.cpu().numpy(),
        bias=model.fc1.bias.data.cpu().numpy() if model.fc1.bias is not None else None
    )
    
    # Выходной LIF слой
    lif_out_node_name = "lif_out"
    node_dict[lif_out_node_name] = nir.LIF(
        tau=np.array([1/(1-model.lif_out.beta)]),
        v_leak=np.array([0.0]),
        r=np.array([1.0]),
        v_threshold=np.array([1.0])
    )
    
    # Выходной узел
    output_node_name = "output"
    node_dict[output_node_name] = nir.Output(output_type=np.array([10]))
    
    # Добавляем ребра
    edges = [
        (input_node_name, conv1_node_name),
        (conv1_node_name, lif1_node_name),
        (lif1_node_name, conv2_node_name),
        (conv2_node_name, lif2_node_name),
        (lif2_node_name, pool1_node_name),
        (pool1_node_name, conv3_node_name),
        (conv3_node_name, lif3_node_name),
        (lif3_node_name, pool2_node_name),
        (pool2_node_name, flatten_node_name),
        (flatten_node_name, fc1_node_name),
        (fc1_node_name, lif_out_node_name),
        (lif_out_node_name, output_node_name)
    ]
    
    # Создаем граф NIR
    nir_graph = nir.NIRGraph(
        nodes=node_dict,
        edges=edges
    )
    
    # Debug print to inspect the dictionary
    nir_graph_dict = nir_graph.to_dict()
    for key, value in nir_graph_dict.items():
        print(f"{key}: {value}")
    
    # Сохраняем граф
    write("scnn_model.nir", nir_graph)
    
    return nir_graph

model = SCNN()

# Load the saved state dictionary
model.load_state_dict(torch.load("scnn_model.pth"))

# Set the model to evaluation mode
model.eval()
# Convert the trained model to NIR
nir_graph = convert_to_nir(model)

Flattened size for fc1: 72
Создаем узлы...
Ошибка при конвертации модели: Conv2d.__init__() got an unexpected keyword argument 'weights'


Traceback (most recent call last):
  File "C:\Users\user\AppData\Local\Temp\ipykernel_16488\3428602302.py", line 22, in convert_to_nir
    node_dict[conv1_node_name] = nir.Conv2d(
                                 ^^^^^^^^^^^
TypeError: Conv2d.__init__() got an unexpected keyword argument 'weights'
