In [None]:
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
from torchvision import datasets, transforms
from torch.optim.lr_scheduler import StepLR
import numpy as np


In [None]:
use_cuda = torch.cuda.is_available()  # not no_cuda and
batch_size = 100
test_batch_size = 1000
lr = 0.04
gamma = 0.8
epochs = 2
seed = np.random.randint(0, 1000)
torch.manual_seed(seed)
np.random.seed(seed)

out = 1
n_classes = 10

In [None]:
class H1(nn.Module):
    # Hamiltonian neural network, as presented in [1,2].
    # H_1-DNN and H_2-DNN
    # General ODE: \dot{y} = J(y,t) K(t) \tanh( K^T(t) y(t) + b(t) )
    # Constraints:
    #   J(y,t) = J_1 = [ 0 I ; -I 0 ]  or  J(y,t) = J_2 = [ 0 1 .. 1 ; -1 0 .. 1 ; .. ; -1 -1 .. 0 ].
    # Discretization method: Forward Euler
    def __init__(self, n_layers, t_end, nf, random=True, select_j='J1',n_classes=5):
        super().__init__()

        self.n_layers = n_layers  # nt: number of layers
        self.h = t_end / self.n_layers  #interval
        self.act = nn.Tanh()    # activation function
        self.nf = nf            # number of features
        self.n_classes = n_classes # number of classes

        if random:
            K = torch.randn(self.nf, self.nf, self.n_layers-1)
            b = torch.randn(self.nf, 1, self.n_layers-1)
            final_K = torch.randn(self.nf, self.n_classes,1)
            final_b = torch.randn(self.n_classes, 1, 1)
        else:
            K = torch.ones(self.nf, self.nf, self.n_layers-1)
            b = torch.zeros(self.nf, 1, self.n_layers-1)
            final_K = torch.ones(self.nf, self.n_classes, 1)
            final_b = torch.zeros(self.n_classes, 1, 1)
        
        self.K = nn.Parameter(K, True)
        self.b = nn.Parameter(b, True)
        self.final_K = nn.Parameter(final_K, True)
        self.final_b = nn.Parameter(final_b, True)

        if select_j == 'J1':
            j_identity = torch.eye(self.nf//2)
            j_zeros = torch.zeros(self.nf//2, self.nf//2)
            self.J = torch.cat((torch.cat((j_zeros, j_identity), 0), torch.cat((- j_identity, j_zeros), 0)), 1)
        else:
            j_aux = np.hstack((np.zeros(1), np.ones(self.nf-1)))
            J = j_aux
            for j in range(self.nf-1):
                j_aux = np.hstack((-1 * np.ones(1), j_aux[:-1]))
                J = np.vstack((J, j_aux))
            self.J = torch.tensor(J, dtype=torch.float32)

    def getK(self):
        return self.K

    def getb(self):
        return self.b

    def getJ(self):
        return self.J

    def forward(self, Y0, ini=0, end=None):

        dim = len(Y0.shape)
        Y = Y0.transpose(1, dim-1)

        if end is None:
            end = self.n_layers
        
        for j in range(ini, end-1):
            Y = Y + self.h * F.linear(self.act(F.linear(
                Y, self.K[:, :, j].transpose(0, 1), self.b[:, 0, j])), torch.matmul(self.J, self.K[:, :, j]))
            
        NNoutput = Y.transpose(1, dim-1)

        return NNoutput

In [None]:
class Net_HDNN(nn.Module):
    def __init__(self, nf=8, n_layers=4, h=0.5, net_type='H1_J1'):
        super(Net_HDNN, self).__init__()
        self.conv1 = nn.Conv2d(in_channels=1, out_channels=nf, kernel_size=3, stride=1, padding=1)
        if net_type == 'H1_J1':
            self.hamiltonian = H1(n_layers=n_layers, t_end=h * n_layers, nf=nf, select_j='J1')
        elif net_type == 'H1_J2':
            self.hamiltonian = H1(n_layers=n_layers, t_end=h * n_layers, nf=nf, select_j='J2')
        else:
            raise ValueError("%s model is not yet implemented for MNIST" % net_type)
        self.fc_end = nn.Linear(nf*28*28, 10)
        self.nf = nf

    def forward(self, x):
        x = self.conv1(x)
        x = self.hamiltonian(x)
        x = x.reshape(-1, self.nf*28*28)
        output = self.fc_end(x)
        # output = F.log_softmax(x, dim=1)
        return output

In [None]:
def train(model, device, train_loader, optimizer, epoch, alpha, out):
    model.train()
    for batch_idx, (data, target) in enumerate(train_loader):
        data, target = data.to(device), target.to(device)
        optimizer.zero_grad()
        output = model(data)
        loss = F.cross_entropy(output, target)
        # loss = F.nll_loss(output, target)
        K = model.hamiltonian.getK()
        b = model.hamiltonian.getb()
        for j in range(int(model.hamiltonian.n_layers) - 1):
            loss = loss + regularization(alpha, h, K, b)
        loss.backward()
        optimizer.step()
        if batch_idx % 100 == 0 and out>0:
            output = model(data)
            pred = output.argmax(dim=1, keepdim=True)
            correct = pred.eq(target.view_as(pred)).sum().item()
            print('\tTrain Epoch: {:2d} [{:5d}/{} ({:2.0f}%)]\tLoss: {:.6f}\tAccuracy: {}/{}'.format(
                epoch, batch_idx * len(data), len(train_loader.dataset),
                100. * batch_idx / len(train_loader), loss.item(), correct, len(data)))

In [None]:
def test(model, device, test_loader, out):
    model.eval()
    test_loss = 0
    correct = 0
    with torch.no_grad():
        for data, target in test_loader:
            data, target = data.to(device), target.to(device)
            output = model(data)
            test_loss += F.cross_entropy(output, target, reduction='sum').item()  # sum up batch loss
            pred = output.argmax(dim=1, keepdim=True)  # get the index of the max log-probability
            correct += pred.eq(target.view_as(pred)).sum().item()
    test_loss /= len(test_loader.dataset)
    if out > 0:
        print('Test set:\tAverage loss: {:.4f}, Accuracy: {:5d}/{} ({:.2f}%)'.format(
            test_loss, correct, len(test_loader.dataset),
            100. * correct / len(test_loader.dataset)))
    return correct

In [None]:
use_cuda = torch.cuda.is_available()  # not no_cuda and
batch_size = 100
test_batch_size = 1000
lr = 0.04
gamma = 0.8
epochs = 10
seed = np.random.randint(0, 1000)
torch.manual_seed(seed)
np.random.seed(seed)

out = 1

In [None]:
# Define the net model
n_layers = 4
net_type = 'H1_J1'

h = 0.5
wd = 4e-3
alpha = 8e-3

device = torch.device("cuda" if use_cuda else "cpu")
kwargs = {'num_workers': 20, 'pin_memory': True} if use_cuda else {}
model = Net_HDNN(nf=8, n_layers=n_layers, h=h, net_type=net_type).to(device)

print("\n------------------------------------------------------------------")
print("MNIST dataset - %s-DNN - %i layers" % (net_type, n_layers))
print("== sgd with Adam (lr=%.1e, weight_decay=%.1e, gamma=%.1f, max_epochs=%i, alpha=%.1e, minibatch=%i)" %
      (lr, wd, gamma, epochs, alpha, batch_size))

best_acc = 0
best_acc_train = 0



------------------------------------------------------------------
MNIST dataset - H1_J1-DNN - 4 layers
== sgd with Adam (lr=4.0e-02, weight_decay=4.0e-03, gamma=0.8, max_epochs=10, alpha=8.0e-03, minibatch=100)


In [None]:
# Load train data
train_loader = torch.utils.data.DataLoader(
    datasets.MNIST('data', train=True, download=True,
                    transform=transforms.Compose([
                        transforms.ToTensor(),
                        transforms.Normalize((0.1307,), (0.3081,))
                    ])),
    batch_size=batch_size, shuffle=True, **kwargs)
# Load test data
test_loader = torch.utils.data.DataLoader(
    datasets.MNIST('data', train=False, transform=transforms.Compose([
                        transforms.ToTensor(),
                        transforms.Normalize((0.1307,), (0.3081,))
                    ])),
    batch_size=test_batch_size, shuffle=True, **kwargs)

In [None]:
# Define optimization algorithm
optimizer = optim.Adam(model.parameters(), lr=lr, weight_decay=wd)

# Scheduler for learning_rate parameter
scheduler = StepLR(optimizer, step_size=1, gamma=gamma)

In [None]:
def regularization(alpha, h, K, b):
    # Regularization function as introduced in [1]
    n_layers = K.shape[-1]
    loss = 0
    for j in range(n_layers - 1):
        loss = loss + alpha * h * (1 / 2 * torch.norm(K[:, :, j + 1] - K[:, :, j]) ** 2 +
                                   1 / 2 * torch.norm(b[:, :, j + 1] - b[:, :, j]) ** 2)
    return loss

In [None]:
for epoch in range(1, epochs + 1):
  train(model, device, train_loader, optimizer, epoch, alpha, out)
  test_acc = test(model, device, test_loader, out)
  # Results over training set after training
  train_loss = 0
  correct = 0
  with torch.no_grad():
      for data, target in train_loader:
          data, target = data.to(device), target.to(device)
          output = model(data)
          train_loss += F.cross_entropy(output, target, reduction='sum').item()  # sum up batch loss
          pred = output.argmax(dim=1, keepdim=True)  # get the index of the max log-probability
          correct += pred.eq(target.view_as(pred)).sum().item()
  train_loss /= len(train_loader.dataset)
  if out > 0:
      print('Train set:\tAverage loss: {:.4f}, Accuracy: {:5d}/{} ({:.2f}%)'.format(
          train_loss, correct, len(train_loader.dataset),
          100. * correct / len(train_loader.dataset)))
  scheduler.step()
  if best_acc < test_acc:
      best_acc = test_acc
      best_acc_train = correct

Test set:	Average loss: 1.2511, Accuracy:  9278/10000 (92.78%)
Train set:	Average loss: 1.1380, Accuracy: 55880/60000 (93.13%)
Test set:	Average loss: 0.4276, Accuracy:  9490/10000 (94.90%)
Train set:	Average loss: 0.3816, Accuracy: 57004/60000 (95.01%)
Test set:	Average loss: 0.2659, Accuracy:  9380/10000 (93.80%)
Train set:	Average loss: 0.2412, Accuracy: 56615/60000 (94.36%)
Test set:	Average loss: 0.1802, Accuracy:  9461/10000 (94.61%)
Train set:	Average loss: 0.1706, Accuracy: 57068/60000 (95.11%)
Test set:	Average loss: 0.1637, Accuracy:  9513/10000 (95.13%)
Train set:	Average loss: 0.1555, Accuracy: 57200/60000 (95.33%)
Test set:	Average loss: 0.1785, Accuracy:  9461/10000 (94.61%)
Train set:	Average loss: 0.1697, Accuracy: 56899/60000 (94.83%)
Test set:	Average loss: 0.1473, Accuracy:  9545/10000 (95.45%)
Train set:	Average loss: 0.1375, Accuracy: 57435/60000 (95.72%)
Test set:	Average loss: 0.1493, Accuracy:  9517/10000 (95.17%)
Train set:	Average loss: 0.1453, Accuracy: 57295

In [None]:
print("\nNetwork trained!")
print('Test accuracy: {:.2f}%  - Train accuracy: {:.3f}% '.format(
      100. * best_acc / len(test_loader.dataset), 100. * best_acc_train / len(train_loader.dataset)))
print("------------------------------------------------------------------\n")


Network trained!
Test accuracy: 95.47%  - Train accuracy: 95.532% 
------------------------------------------------------------------



In [None]:
torch.save(model,'Ham_Net.pkl') 