In [36]:
import torch
import torch.nn as nn
import numpy as np
from torch.utils.data import DataLoader
from torchvision import datasets
from torchvision.transforms import ToTensor

In [37]:
# Model / data parameters
training_data = datasets.FashionMNIST(
    root="data",
    train=True,
    download=True,
    transform=ToTensor(),
)

# Download test data from open datasets.
test_data = datasets.FashionMNIST(
    root="data",
    train=False,
    download=True,
    transform=ToTensor(),
)


In [38]:
batch_size = 64

# Create data loaders.
train_dataloader = DataLoader(training_data, batch_size=batch_size)
test_dataloader = DataLoader(test_data, batch_size=batch_size)

for X, y in test_dataloader:
    print(f"Shape of X [N, C, H, W]: {X.shape}")
    print(f"Shape of y: {y.shape} {y.dtype}")
    break

Shape of X [N, C, H, W]: torch.Size([64, 1, 28, 28])
Shape of y: torch.Size([64]) torch.int64


In [51]:
# Get cpu, gpu or mps device for training.
device = (
    "cuda"
    if torch.cuda.is_available()
    else "mps"
    if torch.backends.mps.is_available()
    else "cpu"
)
print(f"Using {device} device")

# Define model for feature encoding (feature extraction)


#In order to process the data

#28x28 is the image size and the 480 or neurons in the layer is the output size (arbirtrarily chosen)

class NeuralNetwork(nn.Module):
    def __init__(self):
        super().__init__()
        self.flatten = nn.Flatten()
        self.linear_relu_stack = nn.Sequential(
            nn.Linear(28*28, 480),
            nn.ReLU(),
            nn.Linear(480, 480),
            nn.ReLU(),
            nn.Linear(480, 10)
        )

    def forward(self, t, x):
        x = self.flatten(x)
        logits = self.linear_relu_stack(x)
        return logits

model = NeuralNetwork().to(device)
print(model)

Using cpu device
NeuralNetwork(
  (flatten): Flatten(start_dim=1, end_dim=-1)
  (linear_relu_stack): Sequential(
    (0): Linear(in_features=784, out_features=480, bias=True)
    (1): ReLU()
    (2): Linear(in_features=480, out_features=480, bias=True)
    (3): ReLU()
    (4): Linear(in_features=480, out_features=10, bias=True)
  )
)


In [52]:
loss_fn = nn.CrossEntropyLoss()
optimizer = torch.optim.SGD(model.parameters(), lr=1e-3)

In [54]:
#Define the ODE tool that will be used
#Here I chose Runge Kutta the classic

def runge_kutta(forward, t, state, dt):
  k1 = forward(t,state)
  k2 = forward(t+ 0.5 * dt, state + 0.5 *dt *k1)
  k3 = forward(t+ 0.5 * dt, state + 0.5 *dt *k2)
  k4 = forward(t + dt, state + dt *k3)
  solution = (k1 + 2 * k2 + 2 * k3 + k4)* (dt/6)
  return solution


In [55]:
def train(dataloader, model, loss_fn, optimizer):
    size = len(dataloader.dataset)
    model.train()
    for batch, (X, y) in enumerate(dataloader):
        X, y = X.to(device), y.to(device)

        #Define integration timesteps with my Runge-Kutta to make evolve over time

        time_s = np.linspace(0,1,10)

        #Initial state (in my case it is X)
        state = X

        #Performing ODE solver using my defined Runge Kutta Above

        dt = time_s[1] - time_s[0]
        for t in time_s:
          state = state + runge_kutta(model.forward, t, state, dt)
        # Compute prediction error
        pred = model(None, state)

        loss = loss_fn(pred, y)

        # Backpropagation
        loss.backward()
        optimizer.step()
        optimizer.zero_grad()

        if batch % 100 == 0:
            loss, current = loss.item(), (batch + 1) * len(X)
            print(f"loss: {loss:>7f}  [{current:>5d}/{size:>5d}]")

In [56]:
def test(dataloader, model, loss_fn):
    size = len(dataloader.dataset)
    num_batches = len(dataloader)
    model.eval()
    test_loss, correct = 0, 0
    with torch.no_grad():
        for X, y in dataloader:
            X, y = X.to(device), y.to(device)

            time_s = np.linespace(0,1,10)

            state = X

            dt = time_s[1] - time_s[0]
            for t in time_s:
              state = state + runge_kutta(model.forward, t, state, dt)
            pred = model(None, state)
            test_loss += loss_fn(pred, y).item()
            correct += (pred.argmax(1) == y).type(torch.float).sum().item()

    test_loss /= num_batches
    correct /= size
    print(f"Test Error: \n Accuracy: {(100*correct):>0.1f}%, Avg loss: {test_loss:>8f} \n")

In [57]:
epochs = 5
for t in range(epochs):
    print(f"Epoch {t+1}\n-------------------------------")
    train(train_dataloader, model, loss_fn, optimizer)
    test(test_dataloader, model, loss_fn)
print("Done!")

Epoch 1
-------------------------------


RuntimeError: The size of tensor a (28) must match the size of tensor b (10) at non-singleton dimension 3

In [None]:
class ODEBLOCK(nn.Module):

  def __init__(self, odefunc):
    super(ODEBlock, self).__init__()
    self.odefunc = odefunc
    self.integration_time = torch.tensor([0,1]).float()

  def forward(self,x):
    #this ensures the integration time is the same input as the x
    self.integration_time = self.integration_time.type_as(x)
    out = runge_kutta()