In [3]:
# 导入必需的库
import torch
from torch import nn
from torch.utils.data import DataLoader, Subset
from torch.optim import SGD
from torch.nn import CrossEntropyLoss
from tqdm import tqdm
import samna
from tonic.datasets import NMNIST
from tonic.transforms import ToFrame
from sinabs.from_torch import from_model
from sinabs.backend.dynapcnn import DynapcnnNetwork
from collections import Counter

# 数据准备
root_dir = "./NMNIST"
try:
    from tonic.datasets.nmnist import NMNIST
except ImportError:
    !pip install tonic
    from tonic.datasets.nmnist import NMNIST

# 下载 N-MNIST 数据集
_ = NMNIST(save_to=root_dir, train=True)
_ = NMNIST(save_to=root_dir, train=False)

# 使用 Tonic 转换事件数据为帧
to_frame = ToFrame(sensor_size=NMNIST.sensor_size, n_time_bins=1)
cnn_train_dataset = NMNIST(save_to=root_dir, train=True, transform=to_frame)
cnn_test_dataset = NMNIST(save_to=root_dir, train=False, transform=to_frame)

# 打印一下数据的形状
sample_data, label = cnn_train_dataset[0]
print(f"The transformed array is in shape [Time-Step, Channel, Height, Width] --> {sample_data.shape}")

# 定义 CNN 模型
cnn = nn.Sequential(
    nn.Conv2d(in_channels=2, out_channels=8, kernel_size=(3, 3), padding=(1, 1), bias=False),
    nn.ReLU(),
    nn.AvgPool2d(2, 2),
    nn.Conv2d(in_channels=8, out_channels=16, kernel_size=(3, 3), padding=(1, 1), bias=False),
    nn.ReLU(),
    nn.AvgPool2d(2, 2),
    nn.Conv2d(in_channels=16, out_channels=16, kernel_size=(3, 3), padding=(1, 1), stride=(2, 2), bias=False),
    nn.ReLU(),
    nn.Flatten(),
    nn.Linear(16 * 4 * 4, 10, bias=False),
    nn.ReLU(),
)

# 初始化模型权重
for layer in cnn.modules():
    if isinstance(layer, (nn.Conv2d, nn.Linear)):
        nn.init.xavier_normal_(layer.weight.data)

# 训练和验证 CNN
epochs = 3
lr = 1e-3
batch_size = 4
num_workers = 4
device = "cuda" if torch.cuda.is_available() else "cpu"
shuffle = True

cnn = cnn.to(device=device)

cnn_train_dataloader = DataLoader(cnn_train_dataset, batch_size=batch_size, num_workers=num_workers, drop_last=True, shuffle=shuffle)
cnn_test_dataloader = DataLoader(cnn_test_dataset, batch_size=batch_size, num_workers=num_workers, drop_last=True, shuffle=shuffle)

optimizer = SGD(params=cnn.parameters(), lr=lr)
criterion = CrossEntropyLoss()

for e in range(epochs):
    # Train the model
    train_p_bar = tqdm(cnn_train_dataloader)
    for data, label in train_p_bar:
        data = data.squeeze(dim=1).to(dtype=torch.float, device=device)
        label = label.to(dtype=torch.long, device=device)
        optimizer.zero_grad()
        output = cnn(data)
        loss = criterion(output, label)
        loss.backward()
        optimizer.step()
        train_p_bar.set_description(f"Epoch {e} - Training Loss: {round(loss.item(), 4)}")

    # Validate the model
    correct_predictions = []
    with torch.no_grad():
        test_p_bar = tqdm(cnn_test_dataloader)
        for data, label in test_p_bar:
            data = data.squeeze(dim=1).to(dtype=torch.float, device=device)
            label = label.to(dtype=torch.long, device=device)
            output = cnn(data)
            pred = output.argmax(dim=1, keepdim=True)
            correct_predictions.append(pred.eq(label.view_as(pred)))
            test_p_bar.set_description(f"Epoch {e} - Testing Model...")
    
        correct_predictions = torch.cat(correct_predictions)
        print(f"Epoch {e} - accuracy: {correct_predictions.sum().item()/(len(correct_predictions))*100}%")

# CNN 转 SNN
snn_convert = from_model(model=cnn, input_shape=(2, 34, 34), batch_size=batch_size).spiking_model

# 转换后的 SNN 测试
n_time_steps = 100
to_raster = ToFrame(sensor_size=NMNIST.sensor_size, n_time_bins=n_time_steps)
snn_test_dataset = NMNIST(save_to=root_dir, train=False, transform=to_raster)
snn_test_dataloader = DataLoader(snn_test_dataset, batch_size=batch_size, num_workers=num_workers, drop_last=True, shuffle=False)

device = "cuda" if torch.cuda.is_available() else "cpu"
snn_convert = snn_convert.to(device)

correct_predictions = []
with torch.no_grad():
    test_p_bar = tqdm(snn_test_dataloader)
    for data, label in test_p_bar:
        data = data.reshape(-1, 2, 34, 34).to(dtype=torch.float, device=device)
        label = label.to(dtype=torch.long, device=device)
        output = snn_convert(data)
        output = output.reshape(batch_size, n_time_steps, -1)
        output = output.sum(dim=1)
        pred = output.argmax(dim=1, keepdim=True)
        correct_predictions.append(pred.eq(label.view_as(pred)))
        test_p_bar.set_description(f"Testing SNN Model...")

    correct_predictions = torch.cat(correct_predictions)
    print(f"accuracy of converted SNN: {correct_predictions.sum().item()/(len(correct_predictions))*100}%")

# 部署 SNN 到 Devkit
cpu_snn = snn_convert.to(device="cpu")
devices = samna.device.get_unopened_devices()
if len(devices) == 0:
    raise Exception("No devices found")
my_board = samna.device.open_device(devices[0])

# 使用设备信息部署到 DevKit
devkit_name = my_board.devkit_name
dynapcnn = DynapcnnNetwork(snn=cpu_snn, input_shape=(2, 34, 34), discretize=True, dvs_input=False)
dynapcnn.to(device=devkit_name, chip_layers_ordering="auto")

print(f"The SNN is deployed on the core: {dynapcnn.chip_layers_ordering}")

# 推理
snn_test_dataset = NMNIST(save_to=root_dir, train=False)
subset_indices = list(range(0, len(snn_test_dataset), 100))
snn_test_dataset = Subset(snn_test_dataset, subset_indices)

correct_samples = 0
test_samples = 0

for events, label in tqdm(snn_test_dataset):
    samna_event_stream = []
    for ev in events:
        spk = samna.speck2f.event.Spike()
        spk.x = ev['x']
        spk.y = ev['y']
        spk.timestamp = ev['t'] - events['t'][0]
        spk.feature = ev['p']
        spk.layer = 0
        samna_event_stream.append(spk)

    output_events = dynapcnn(samna_event_stream)
    neuron_index = [each.feature for each in output_events]
    if len(neuron_index) != 0:
        frequent_counter = Counter(neuron_index)
        prediction = frequent_counter.most_common(1)[0][0]
    else:
        prediction = -1

    if prediction == label:
        correct_samples += 1
    test_samples += 1

print(f"On chip inference accuracy: {correct_samples / test_samples}")

# 可视化
from sinabs.backend.dynapcnn.dynapcnn_visualizer import DynapcnnVisualizer
visualizer = DynapcnnVisualizer(window_scale=(4, 8), dvs_shape=(34, 34), spike_collection_interval=50)
visualizer.connect(dynapcnn)

print("Visualizer setup complete!")


The transformed array is in shape [Time-Step, Channel, Height, Width] --> (1, 2, 34, 34)


Epoch 0 - Training Loss: 1.3631: 100%|██████████| 15000/15000 [02:47<00:00, 89.34it/s] 
Epoch 0 - Testing Model...: 100%|██████████| 2500/2500 [00:43<00:00, 56.85it/s] 


Epoch 0 - accuracy: 48.29%


Epoch 1 - Training Loss: 1.2505: 100%|██████████| 15000/15000 [02:47<00:00, 89.34it/s] 
Epoch 1 - Testing Model...: 100%|██████████| 2500/2500 [00:43<00:00, 57.62it/s] 


Epoch 1 - accuracy: 48.949999999999996%


Epoch 2 - Training Loss: 0.5779: 100%|██████████| 15000/15000 [02:39<00:00, 93.80it/s] 
Epoch 2 - Testing Model...: 100%|██████████| 2500/2500 [00:43<00:00, 57.81it/s] 


Epoch 2 - accuracy: 57.410000000000004%


Testing SNN Model...: 100%|██████████| 2500/2500 [06:46<00:00,  6.15it/s]


accuracy of converted SNN: 55.900000000000006%


AttributeError: 'samna.speck2fBoards.Speck2fDevKit' object has no attribute 'devkit_name'