In [11]:
import snntorch as snn
import numpy as np
import torch
import torch.nn as nn
import torch.optim as optim
from torchvision import datasets, transforms 
from sklearn.datasets import fetch_california_housing
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler
from torch.utils.data import TensorDataset, DataLoader
from snntorch import spikegen


In [5]:
data = fetch_california_housing()
# Torch Variables
dtype = torch.float       #para consumir menos memoria y asegurar compatibilidad con ciertas librerías que 
                          # esperan tensores de este tipo en luagr de 64 por ejemplo
device = torch.device("cuda") if torch.cuda.is_available() else torch.device("mps") if torch.backends.mps.is_available() else torch.device("cpu")
transform = transforms.Compose([
            transforms.ToTensor(),
            transforms.Normalize((0,), (1,))])

In [6]:
X = data.data
y = data.target

# Normalizar los datos
scaler_X = StandardScaler()
scaler_y = StandardScaler()
X_scaled = scaler_X.fit_transform(X)
y_scaled = scaler_y.fit_transform(y.reshape(-1, 1))

# Dividir en conjuntos de entrenamiento y prueba
X_train, X_test, y_train, y_test = train_test_split(X_scaled, y_scaled, test_size=0.2, random_state=42)

# Convertir a tensores
X_train_tensor = torch.Tensor(X_train).unsqueeze(1)  # Agregar dimensión para el batch
y_train_tensor = torch.Tensor(y_train).unsqueeze(1)
X_test_tensor = torch.Tensor(X_test).unsqueeze(1)
y_test_tensor = torch.Tensor(y_test).unsqueeze(1)

# Crear DataLoaders
train_dataset = TensorDataset(X_train_tensor, y_train_tensor)
test_dataset = TensorDataset(X_test_tensor, y_test_tensor)
train_loader = DataLoader(train_dataset, batch_size=32, shuffle=True)
test_loader = DataLoader(test_dataset, batch_size=32, shuffle=False)

In [8]:
# Definir la red neuronal
class Net(nn.Module):
    def __init__(self):
        super().__init__()
        self.fc1 = nn.Linear(X_train.shape[1], 128)
        self.lif1 = snn.Leaky(beta=0.9)
        self.fc2 = nn.Linear(128, 1)
        self.lif2 = snn.Leaky(beta=0.9)

    def forward(self, x):
        mem1 = self.lif1.init_leaky()
        mem2 = self.lif2.init_leaky()
        spk_rec = []
        mem_rec = []
        for step in range(num_steps):
            cur1 = self.fc1(x[step])
            spk1, mem1 = self.lif1(cur1, mem1)
            cur2 = self.fc2(spk1)
            spk2, mem2 = self.lif2(cur2, mem2)
            spk_rec.append(spk2)
            mem_rec.append(mem2)
        return torch.stack(mem_rec, dim=0)  # Devolver el potencial de membrana

In [9]:
# Inicializar el modelo, la función de pérdida y el optimizador
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
net = Net().to(device)
criterion = nn.MSELoss()  # Para regresión, usamos MSE
optimizer = optim.Adam(net.parameters(), lr=1e-3)
num_steps = 10

In [12]:
# Entrenamiento
num_epochs = 5
for epoch in range(num_epochs):
    net.train()
    for data, targets in train_loader:
        spike_data = spikegen.rate(data, num_steps=num_steps).to(device)
        targets = targets.to(device)
        
        optimizer.zero_grad()
        outputs = net(spike_data.view(num_steps, data.size(0), -1))
        
        loss = criterion(outputs[-1], targets)  # Usar solo el último paso de tiempo para la predicción final
        loss.backward()
        optimizer.step()

    print(f'Epoch [{epoch+1}/{num_epochs}], Loss: {loss.item():.4f}')

  return F.mse_loss(input, target, reduction=self.reduction)


Epoch [1/5], Loss: 1.2968
Epoch [2/5], Loss: 0.6640
Epoch [3/5], Loss: 0.9242
Epoch [4/5], Loss: 1.4719
Epoch [5/5], Loss: 1.2098
