In [7]:
# Import Required Libraries
import os
import torch
import numpy as np
import matplotlib.pyplot as plt
from torch.utils.data import DataLoader
from dvs_gesture_dataset import DVS346Sign
from sklearn.metrics import confusion_matrix, ConfusionMatrixDisplay
from tqdm import tqdm
import torch.nn as nn
import torch.optim as optim
import matplotlib.pyplot as plt
from torch.utils.data import DataLoader, Dataset, Subset
from tqdm import tqdm
import torch.nn as nn
import torch.optim as optim

# Initialize Device
# Set device to MPS or CPU
device = torch.device("cpu")
print(f"Using device: {device}")
# Temporal Integration (MESTOR)
def temporal_binning(events, time_window=10000, resolution=(128, 128)):
    """
    Bin events into temporal windows.
    Args:
        events: np.ndarray (N x 4) [x, y, polarity, timestamp]
        time_window: int, time duration for each bin in microseconds.
    Returns:
        np.ndarray: Binned events as temporal frames.
    """
    max_time = events[:, 3].max()
    num_bins = int(np.ceil(max_time / time_window))
    binned_events = np.zeros((num_bins, resolution[0], resolution[1]), dtype=np.float32)

    for x, y, polarity, timestamp in events:
        bin_idx = int(timestamp // time_window)
        binned_events[bin_idx, int(y), int(x)] += polarity

    return binned_events

Using device: cpu


In [8]:

# Paths and Constants
root_dir = "/Users/kayraozturk/Downloads/DvsGesture"
labels_path = "/Users/kayraozturk/Downloads/DvsGesture"
train_trials_path = "/Users/kayraozturk/Downloads/DvsGesture/trials_to_train.txt"
test_trials_path = "/Users/kayraozturk/Downloads/DvsGesture/trials_to_test.txt"
batch_size = 4
epochs = 10
time_bin = 10000


In [9]:

# Load Training and Testing Dataset
train_dataset = DVS346Sign(root_dir, labels_path, time_bin=time_bin)
train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)

test_dataset = DVS346Sign(root_dir, labels_path, time_bin=time_bin)
test_loader = DataLoader(test_dataset, batch_size=batch_size, shuffle=False)
from torch.utils.data import Subset

# Reduce dataset for debugging (use a subset of 100 samples)
train_subset = Subset(train_dataset, range(20))  # Adjust range as needed
test_subset = Subset(test_dataset, range(10))     # Adjust range as needed

train_loader = DataLoader(train_subset, batch_size=batch_size, shuffle=True, num_workers=4, pin_memory=True)
test_loader = DataLoader(test_subset, batch_size=batch_size, shuffle=False, num_workers=4, pin_memory=True)

Processing AEDAT file: /Users/kayraozturk/Downloads/DvsGesture/user17_led.aedat
Looking for Label CSV file: /Users/kayraozturk/Downloads/DvsGesture/user17_led_labels.csv
Loaded 12 gestures from /Users/kayraozturk/Downloads/DvsGesture/user17_led_labels.csv
Processing AEDAT file: /Users/kayraozturk/Downloads/DvsGesture/user23_lab.aedat
Looking for Label CSV file: /Users/kayraozturk/Downloads/DvsGesture/user23_lab_labels.csv
Loaded 12 gestures from /Users/kayraozturk/Downloads/DvsGesture/user23_lab_labels.csv
Processing AEDAT file: /Users/kayraozturk/Downloads/DvsGesture/user26_natural.aedat
Looking for Label CSV file: /Users/kayraozturk/Downloads/DvsGesture/user26_natural_labels.csv
Loaded 12 gestures from /Users/kayraozturk/Downloads/DvsGesture/user26_natural_labels.csv
Processing AEDAT file: /Users/kayraozturk/Downloads/DvsGesture/user06_fluorescent.aedat
Looking for Label CSV file: /Users/kayraozturk/Downloads/DvsGesture/user06_fluorescent_labels.csv
Loaded 12 gestures from /Users/kay

In [10]:
# FSN Utilities
class ActFun(torch.autograd.Function):
    @staticmethod
    def forward(ctx, input):
        ctx.save_for_backward(input)
        return torch.where(input >= 0, torch.ones_like(input), torch.zeros_like(input))

    @staticmethod
    def backward(ctx, grad_output):
        input, = ctx.saved_tensors
        lens = 0.5
        temp = (torch.abs(input) < lens) * (0.5 / lens)
        return grad_output * temp.float()

act_fun = ActFun.apply

def fs_mem_update(ops, x, mem, decay):
    mem = mem + ops(x) * decay
    return mem

def fs_coding(mem, decay, threshold):
    spike = act_fun(mem - threshold)
    mem = mem - decay * spike
    return mem, spike

# FSN Model
class FSN(nn.Module):
    def __init__(self):
        super(FSN, self).__init__()
        self.conv1 = nn.Conv3d(1, 16, kernel_size=3, stride=1, padding=1)
        self.conv2 = nn.Conv3d(16, 32, kernel_size=3, stride=1, padding=1)
        self.conv3 = nn.Conv3d(32, 64, kernel_size=3, stride=1, padding=1)
        self.pool = nn.MaxPool3d(kernel_size=2, stride=2)
        self.fc1 = None
        self.fc2 = nn.Linear(128, 11)

    def forward(self, x):
        mem = torch.zeros_like(x, device=x.device)
        decay = 0.9
        threshold = 1.0

        mem = fs_mem_update(self.conv1, x, mem, decay)
        mem, spikes = fs_coding(mem, decay, threshold)
        x = self.pool(spikes)

        mem = fs_mem_update(self.conv2, x, mem, decay)
        mem, spikes = fs_coding(mem, decay, threshold)
        x = self.pool(spikes)

        mem = fs_mem_update(self.conv3, x, mem, decay)
        mem, spikes = fs_coding(mem, decay, threshold)
        x = self.pool(spikes)

        if self.fc1 is None:
            flattened_size = x.view(x.size(0), -1).size(1)
            self.fc1 = nn.Linear(flattened_size, 128).to(x.device)

        x = x.view(x.size(0), -1)
        x = torch.relu(self.fc1(x))
        x = self.fc2(x)
        return x

model = FSN().to(device)
criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr=0.001)

In [11]:
# Training and Testing Loops
epochs = 10
train_accuracies, train_losses = [], []
test_accuracies, test_losses = [], []

In [None]:
from tqdm import tqdm

for epoch in range(epochs):
    model.train()
    running_loss, correct, total = 0.0, 0, 0
    successful_batches = 0
    print(f"Epoch {epoch + 1}/{epochs}")

    with tqdm(total=len(train_loader), desc=f"Training Epoch {epoch + 1}/{epochs}", unit="batch") as pbar:
        for spike_frames, labels in train_loader:
            try:
                # Temporal reduction (optional)
                spike_frames = spike_frames[:, :, :50, :, :]  # Use first 50 temporal bins

                # Combine gestures and time bins into one depth dimension
                depth = spike_frames.size(1) * spike_frames.size(2)
                spike_frames = spike_frames.reshape(
                    spike_frames.size(0), 1, depth, spike_frames.size(3), spike_frames.size(4)
                ).to(device, dtype=torch.float32)

                # Ensure labels are 1D
                labels = labels[:, 0].to(device, dtype=torch.long)

                # Forward and backward pass
                optimizer.zero_grad()
                outputs = model(spike_frames)
                loss = criterion(outputs, labels)
                loss.backward()
                optimizer.step()

                # Metrics
                running_loss += loss.item()
                _, predicted = torch.max(outputs, 1)
                correct += (predicted == labels).sum().item()
                total += labels.size(0)

                successful_batches += 1
                pbar.update(1)
                pbar.set_postfix(loss=loss.item())

            except Exception as e:
                print(f"Error during training batch: {e}")
                continue

    train_accuracy = correct / total if total > 0 else 0
    train_accuracies.append(train_accuracy)
    train_losses.append(running_loss / max(1, successful_batches))  # Avoid divide-by-zero

    print(f"Epoch {epoch + 1}: Train Loss: {train_losses[-1]:.4f}, Train Acc: {train_accuracies[-1]:.4f}")

    # Save model checkpoint every epoch
    checkpoint_path = f"fsn_model_epoch_{epoch + 1}.pth"
    torch.save(model.state_dict(), checkpoint_path)
    print(f"Model saved at: {checkpoint_path}")

    # Testing loop
    model.eval()
    test_loss, correct, total = 0.0, 0, 0
    successful_test_batches = 0

    with torch.no_grad():
        with tqdm(total=len(test_loader), desc=f"Testing Epoch {epoch + 1}/{epochs}", unit="batch") as pbar:
            for spike_frames, labels in test_loader:
                try:
                    # Temporal reduction (optional)
                    spike_frames = spike_frames[:, :, :50, :, :]  # Use first 50 temporal bins

                    # Combine gestures and time bins into one depth dimension
                    depth = spike_frames.size(1) * spike_frames.size(2)
                    spike_frames = spike_frames.reshape(
                        spike_frames.size(0), 1, depth, spike_frames.size(3), spike_frames.size(4)
                    ).to(device, dtype=torch.float32)

                    # Ensure labels are 1D
                    labels = labels[:, 0].to(device, dtype=torch.long)

                    # Forward pass
                    outputs = model(spike_frames)
                    loss = criterion(outputs, labels)

                    # Metrics
                    test_loss += loss.item()
                    _, predicted = torch.max(outputs, 1)
                    correct += (predicted == labels).sum().item()
                    total += labels.size(0)

                    successful_test_batches += 1
                    pbar.update(1)

                except Exception as e:
                    print(f"Error during testing batch: {e}")
                    continue

    test_accuracy = correct / total if total > 0 else 0
    test_accuracies.append(test_accuracy)
    test_losses.append(test_loss / max(1, successful_test_batches))  # Avoid divide-by-zero

    print(f"Epoch {epoch + 1}: Test Loss: {test_losses[-1]:.4f}, Test Acc: {test_accuracies[-1]:.4f}")

Epoch 1/10


Training Epoch 1/10:   0%|          | 0/5 [00:00<?, ?batch/s]

File: /Users/kayraozturk/Downloads/DvsGesture/user07_fluorescent_led.aedat, Total Events Parsed: 2727077
No events found for gesture 1 in /Users/kayraozturk/Downloads/DvsGesture/user07_fluorescent_led.aedat.
No events found for gesture 2 in /Users/kayraozturk/Downloads/DvsGesture/user07_fluorescent_led.aedat.
Binned Events Shape: (100, 128, 128)
No events found for gesture 4 in /Users/kayraozturk/Downloads/DvsGesture/user07_fluorescent_led.aedat.
Binned Events Shape: (100, 128, 128)
No events found for gesture 6 in /Users/kayraozturk/Downloads/DvsGesture/user07_fluorescent_led.aedat.
No events found for gesture 7 in /Users/kayraozturk/Downloads/DvsGesture/user07_fluorescent_led.aedat.
Binned Events Shape: (100, 128, 128)
No events found for gesture 8 in /Users/kayraozturk/Downloads/DvsGesture/user07_fluorescent_led.aedat.
Binned Events Shape: (100, 128, 128)
No events found for gesture 10 in /Users/kayraozturk/Downloads/DvsGesture/user07_fluorescent_led.aedat.
No events found for gestu

In [6]:
# Temporal Integration with MESTOR
def temporal_binning(events, time_window):
    """
    Bin events into temporal windows.
    Args:
        events: Input events (N x 4: x, y, polarity, timestamp).
        time_window: Time duration for each bin (microseconds).
    Returns:
        binned_events: Tensor of temporally binned events.
    """
    max_time = events[:, 3].max()
    num_bins = int(np.ceil(max_time / time_window))
    binned_events = torch.zeros((num_bins, 128, 128), dtype=torch.float32)

    for x, y, polarity, timestamp in events:
        bin_idx = int(timestamp // time_window)
        binned_events[bin_idx, int(y), int(x)] += polarity

    return binned_events

In [None]:
import time
# Training and Testing Loop
for epoch in range(epochs):
    model.train()
    running_loss, correct, total = 0.0, 0, 0
    print(f"Epoch {epoch + 1}/{epochs}")
    with tqdm(total=len(train_loader), desc="Training", unit="batch") as pbar:
        for spike_frames, labels in train_loader:
            try:
                # Skip batches with no events
                if spike_frames.size(2) == 0:
                    print("Skipping batch with no events")
                    continue

                # Preprocess spike_frames with temporal integration
                spike_frames = spike_frames[:, :, :50, :, :]  # Use first 50 temporal bins
                depth = spike_frames.size(1) * spike_frames.size(2)
                spike_frames = spike_frames.reshape(
                    spike_frames.size(0), 1, depth, spike_frames.size(3), spike_frames.size(4)
                ).to(device, dtype=torch.float32)

                labels = labels[:, 0].to(device, dtype=torch.long)

                # Forward pass
                optimizer.zero_grad()
                outputs = model(spike_frames)
                loss = criterion(outputs, labels)
                loss.backward()
                optimizer.step()

                running_loss += loss.item()
                _, predicted = torch.max(outputs, 1)
                correct += (predicted == labels).sum().item()
                total += labels.size(0)

            except Exception as e:
                print(f"Error during training batch: {e}")
                continue

            pbar.update(1)
            pbar.set_postfix(loss=loss.item())

    train_accuracy = correct / total if total > 0 else 0
    train_accuracies.append(train_accuracy)
    train_losses.append(running_loss / len(train_loader))
    print(f"Epoch {epoch + 1}/{epochs}, Loss: {running_loss:.4f}, Accuracy: {train_accuracy:.4f}")

    # Save Model Checkpoint
    torch.save(model.state_dict(), f"fsn_model_epoch_{epoch + 1}.pth")

    # Testing
    model.eval()
    test_loss, correct, total = 0.0, 0, 0
    with torch.no_grad():
        with tqdm(total=len(test_loader), desc="Testing", unit="batch") as pbar:
            for spike_frames, labels in test_loader:
                try:
                    spike_frames = spike_frames[:, :, :50, :, :]
                    depth = spike_frames.size(1) * spike_frames.size(2)
                    spike_frames = spike_frames.reshape(
                        spike_frames.size(0), 1, depth, spike_frames.size(3), spike_frames.size(4)
                    ).to(device, dtype=torch.float32)

                    labels = labels[:, 0].to(device, dtype=torch.long)

                    outputs = model(spike_frames)
                    loss = criterion(outputs, labels)
                    test_loss += loss.item()
                    _, predicted = torch.max(outputs, 1)
                    correct += (predicted == labels).sum().item()
                    total += labels.size(0)
                except Exception as e:
                    print(f"Error during testing batch: {e}")
                    continue

                pbar.update(1)

    test_accuracy = correct / total if total > 0 else 0
    test_accuracies.append(test_accuracy)
    test_losses.append(test_loss / len(test_loader))
    print(f"Test Loss: {test_loss:.4f}, Accuracy: {test_accuracy:.4f}")

Epoch 1/10


Training:   0%|          | 0/25 [00:00<?, ?batch/s]

File: /Users/kayraozturk/Downloads/DvsGesture/user23_fluorescent_led.aedat, Total Events Parsed: 3647957
Binned Events Shape: (100, 128, 128)
No events found for gesture 2 in /Users/kayraozturk/Downloads/DvsGesture/user23_fluorescent_led.aedat.
Binned Events Shape: (100, 128, 128)
No events found for gesture 4 in /Users/kayraozturk/Downloads/DvsGesture/user23_fluorescent_led.aedat.
Binned Events Shape: (100, 128, 128)
No events found for gesture 6 in /Users/kayraozturk/Downloads/DvsGesture/user23_fluorescent_led.aedat.
Binned Events Shape: (100, 128, 128)
No events found for gesture 8 in /Users/kayraozturk/Downloads/DvsGesture/user23_fluorescent_led.aedat.
No events found for gesture 8 in /Users/kayraozturk/Downloads/DvsGesture/user23_fluorescent_led.aedat.
Binned Events Shape: (100, 128, 128)
No events found for gesture 10 in /Users/kayraozturk/Downloads/DvsGesture/user23_fluorescent_led.aedat.
No events found for gesture 11 in /Users/kayraozturk/Downloads/DvsGesture/user23_fluorescen

In [18]:
# Plot Training and Testing Accuracy
plt.figure(figsize=(10, 5))
plt.plot(train_accuracies, label="Training Accuracy")
plt.plot(test_accuracies, label="Testing Accuracy")
plt.xlabel("Epoch")
plt.ylabel("Accuracy")
plt.legend()
plt.title("Model Accuracy")
plt.show()

# Plot Training and Testing Loss
plt.figure(figsize=(10, 5))
plt.plot(train_losses, label="Training Loss")
plt.plot(test_losses, label="Testing Loss")
plt.xlabel("Epoch")
plt.ylabel("Loss")
plt.legend()
plt.title("Model Loss")
plt.show()

# Save Final Model
torch.save(model.state_dict(), "fsn_model_final.pth")


In [None]:

# Evaluate Model with Confusion Matrix
model.eval()
all_labels, all_predictions = [], []
with torch.no_grad():
    for spike_frames, labels in test_loader:
        depth = spike_frames.size(1) * spike_frames.size(2)
        spike_frames = spike_frames.view(
            spike_frames.size(0), 1, depth, spike_frames.size(3), spike_frames.size(4)
        ).to(device, dtype=torch.float32)
        labels = labels.to(device, dtype=torch.long)

        outputs = model(spike_frames)
        _, predicted = torch.max(outputs, 1)
        all_labels.extend(labels.cpu().numpy())
        all_predictions.extend(predicted.cpu().numpy())

conf_matrix = confusion_matrix(all_labels, all_predictions)
disp = ConfusionMatrixDisplay(conf_matrix, display_labels=range(1, 12))
disp.plot(cmap=plt.cm.Blues)
plt.title("Confusion Matrix")
plt.show()

In [None]:

for epoch in range(epochs):
    print(f"\nEpoch {epoch+1}/{epochs}")
    model.train()
    correct = 0
    total = 0
    running_loss = 0.0

    for batch_idx, (spike_frames, labels) in enumerate(train_loader):
        print(f"\nProcessing Batch {batch_idx+1}/{len(train_loader)}")
        # Adjust input shape
        print(f"Original spike_frames shape: {spike_frames.shape}")
        spike_frames = spike_frames.permute(0, 2, 1, 3, 4)  # [batch_size, 1, 100, 128, 128]
        print(f"Permuted spike_frames shape: {spike_frames.shape}")
        spike_frames = spike_frames.to(device, dtype=torch.float32)
        labels = labels.to(device, dtype=torch.long)
        print(f"Labels: {labels}")

        optimizer.zero_grad()
        outputs = model(spike_frames)
        print(f"Model outputs: {outputs}")

        loss = criterion(outputs, labels)
        print(f"Loss: {loss.item()}")
        loss.backward()
        optimizer.step()

        running_loss += loss.item()
        _, predicted = torch.max(outputs, 1)
        correct += (predicted == labels).sum().item()
        total += labels.size(0)

    train_accuracy = correct / total
    train_accuracies.append(train_accuracy)
    train_losses.append(running_loss / len(train_loader))
    print(f"Epoch {epoch+1}/{epochs}, Loss: {running_loss:.4f}, Accuracy: {train_accuracy:.4f}")

In [None]:

# Evaluation
model.eval()
test_accuracies = []
test_labels = []
test_predictions = []

with torch.no_grad():
    correct = 0
    total = 0

    for spike_frames, labels in test_loader:
        spike_frames = spike_frames.unsqueeze(1).to(device, dtype=torch.float32)
        labels = labels.to(device, dtype=torch.long)

        outputs = model(spike_frames)
        _, predicted = torch.max(outputs, 1)

        test_labels.extend(labels.cpu().numpy())
        test_predictions.extend(predicted.cpu().numpy())

        correct += (predicted == labels).sum().item()
        total += labels.size(0)

    test_accuracy = correct / total
    print(f"Test Accuracy: {test_accuracy:.4f}")

# Confusion Matrix
cm = confusion_matrix(test_labels, test_predictions)
disp = ConfusionMatrixDisplay(confusion_matrix=cm)
disp.plot()
plt.show()

# Plot Training Metrics
plt.figure()
plt.plot(train_accuracies, label="Training Accuracy")
plt.plot(train_losses, label="Training Loss")
plt.legend()
plt.title("Training Metrics")
plt.xlabel("Epochs")
plt.ylabel("Value")
plt.show()

# Save Model and Metrics
torch.save(model.state_dict(), "fsn_model.pth")
np.savetxt("training_metrics.csv", np.column_stack((train_accuracies, train_losses)), delimiter=",")