## This notebook implements two constrained training scheme based on Newton's method.

### Newton's method

---

### Given $g: \mathbb{R}^N\rightarrow \mathbb{R}^k$, $x\in \mathbb{R}^N$, $v \in \mathbb{R}^{N\times k}$ find $\lambda\in \mathbb{R}^k$, such that  
$$
\begin{aligned}
   g(x + \tau v \lambda) = 0 
   \end{aligned}
$$

Netwon's method:

$$
\lambda_{n+1} = \lambda_n - \tau^{-1}(\nabla g(x_n + \tau v\lambda_n) v)^{-1} g(x_n + \tau v \lambda_n)
$$



### First, load neceesary packages

In [1]:
import numpy as np
import matplotlib.pyplot as plt
import matplotlib.cm as cm
import torch
import math 
import torch.nn as nn
import random
import itertools 
from tqdm import tqdm
import os
import time

### Optimizer

In [2]:
def newton_projection(g, model, Y, v_direction, tau, res_tol=1e-3, max_newton_steps=100):       
    iter_step = 0
    converged = False
    projection_iter_steps = 0 
            
    while iter_step < max_newton_steps:
        g_now = g(model, Y)
        
        if iter_step == 0:
            k = len(g_now)
            lam = np.zeros((k))
       # if self.verbose:
        #print (f'projection step={iter_step}, g={g_now.detach().item():.3e}')
            
        if torch.linalg.norm(g_now) < res_tol:
            converged = True
            break

        grad_g = [torch.autograd.grad(g_now[idx], model.parameters(), 
                                      create_graph=True, retain_graph=True, allow_unused=True) for idx in range(k)]
               
        with torch.no_grad():
            mat = np.zeros((k, k))
            for i in range(k):
                for j in range(k):
                    for z1, z2 in zip(grad_g[i], v_direction[j]):
                        if z1 is not None: # and z2 is not None:
                            mat[i,j] += (z1 * z2).sum()
                            
            dlam = -1.0 * np.linalg.solve(mat, g_now.numpy()) / tau
            lam += dlam
            for i in range(k):
                for param, z2 in zip(model.parameters(), v_direction[i]):
                    if z2 is not None:
                        param.add_(z2 * dlam[i] * tau)
                    
        iter_step += 1
    
    return converged, iter_step, lam 
        
class ProjectedSGD():
    
    def __init__(self, model, f, g, beta=1.0, tau=1e-1, res_tol=1e-3, max_newton_steps=100, verbose=True):
        self.f = f
        self.g = g
        self.tau = tau
        self.res_tol = res_tol
        self.max_newton_steps = max_newton_steps
        self.verbose = verbose
        self.model = model
        self.beta = beta
        self.X = None
        self.Y = None
        
    def reset_data(self, X, Y):
        self.X = X
        self.Y = Y
        
    def unconstrained_update(self):
        self.loss = self.f(self.model, self.X)
        self._grad_f = torch.autograd.grad(self.loss, self.model.parameters(), allow_unused=True)

        with torch.no_grad():
            for param, grad in zip(self.model.parameters(), self._grad_f):
                rn = torch.normal(torch.zeros(param.size()), torch.ones(param.size()))
                if grad is not None:
                    param.add_(-1.0 * grad * self.tau + math.sqrt(2.0 * self.tau / self.beta) * rn)       
                else :
                    param.add_(math.sqrt(2.0 * self.tau / self.beta) * rn)       
                    
    def projection_update(self):
        g_val = self.g(self.model, self.Y)
        grad_g_prev = [torch.autograd.grad(g_val[idx], self.model.parameters(), 
                                      create_graph=True, retain_graph=True, allow_unused=True) for idx in range(len(g_val))] 
        
        self.converged, self.projection_iter_steps, lam = newton_projection(self.g, self.model, self.Y,
                                                                            grad_g_prev, self.tau, 
                                                                            self.res_tol, self.max_newton_steps)
        
    def is_succeed(self):
        return self.converged
    
    def projection_steps(self):
        return self.projection_iter_steps

    def step(self, X, Y):
        self.reset_data(X, Y)
        self.unconstrained_update()
        self.projection_update()
        
class ProjectedLangevin():
    def __init__(self, model, f, g, tau=0.1, beta=1.0, alpha=0.5, res_tol=1e-5, max_newton_steps=100):
        self.model = model
        self.tau = tau
        self.beta = beta
        self.alpha = alpha
        self.res_tol = res_tol
        self.max_newton_steps = max_newton_steps
        self.p_list = []
        self.g = g
        self.f = f
        self.X = None
        self.Y = None
        
        for param in model.parameters():
            self.p_list.append(torch.zeros(param.size()))
    
    def reset_data(self, X, Y):
        self.X = X
        self.Y = Y
               
    def unconstrained_update(self):
        self.loss = self.f(self.model, self.X)
        # momnentum update
        grad_f = torch.autograd.grad(self.loss, self.model.parameters(), allow_unused=True)
        
        with torch.no_grad():
            for param, grad in zip (self.p_list, grad_f):
                param.add_(-0.5 * self.tau * grad)

            # position update
            for param, param_p in zip (self.model.parameters(), self.p_list):
                #print (param_p)
                param.add_(self.tau * param_p)
    
    def projection_momentum(self):
        g_val = self.g(self.model, self.Y)
        k = len(g_val)
        grad_g = [torch.autograd.grad(g_val[idx], self.model.parameters(), 
                                      create_graph=True, retain_graph=True, allow_unused=True) for idx in range(k)]
        with torch.no_grad():
            mat = np.zeros((k, k))
            for i in range(k):
                for j in range(k):
                    for z1, z2 in zip(grad_g[i], grad_g[j]):
                        mat[i,j] += (z1 * z2).sum()            
                        
            lam = np.zeros((k))
            for idx in range(k):
                for param, grad in zip (self.p_list, grad_g[idx]):        
                    lam[idx] += (param * grad).sum()
                    
            coeff = np.linalg.solve(mat, lam)    
                    
            for idx in range(k):
                for param, grad in zip (self.p_list, grad_g[idx]):        
                    param.add_(-1.0 * coeff[idx] * grad)
                                                    
    def momentum_refresh(self):
        for param in self.p_list:
            rn = torch.normal(torch.zeros(param.size()), torch.ones(param.size()))
            param.add_((self.alpha - 1.0) * param + math.sqrt((1-self.alpha**2)/self.beta) * rn)
        self.projection_momentum()
            
    def projection(self):
        g_val = self.g(self.model, self.Y)
        k = len(g_val)
        grad_g_prev = [torch.autograd.grad(g_val[idx], self.model.parameters(), 
                                      create_graph=True, retain_graph=True, allow_unused=True) for idx in range(k)] 
        
        #project position, x -> x_{1}
        converged, iter_steps, lam = newton_projection(self.g, self.model, self.Y, grad_g_prev, self.tau, 
                                                       self.res_tol, self.max_newton_steps)
        
        self.loss = self.f(self.model, self.X)
        # momnentum update: p_{1/2}->p_1
        grad_f = torch.autograd.grad(self.loss, self.model.parameters(), allow_unused=True)               
                
        with torch.no_grad():
            # modify momentum to get p_{1/2}
            for idx in range(k):
                for param, grad in zip (self.p_list, grad_g_prev[idx]):
                    param.add_(grad * lam[idx])

            # momnentum update: p_{1/2}->p_1
            for param, grad in zip (self.p_list, grad_f):
                param.add_(-0.5 * self.tau * grad)
        
        self.projection_momentum()
        
        # p_1 -> p_{1,-}
        for param in self.p_list:
            param *= -1.0

    def step(self, X, Y):
        self.reset_data(X, Y)
        self.momentum_refresh()
        self.unconstrained_update()
        self.projection()
#        self.momentum_refresh()

In [3]:
from TestProblem import SimpleTest

problem = SimpleTest(1.0)
model = problem.create_model([1, 2.0])

g_val = problem.g(model, None)
grad_g_prev = [torch.autograd.grad(g_val[idx], model.parameters(), 
                                      create_graph=True, retain_graph=True, allow_unused=True) for idx in range(len(g_val))] 
        
newton_projection(problem.g, model, None, grad_g_prev, tau=0.1, res_tol=1e-10)

print (model.x)
print (problem.g(model))

Parameter containing:
tensor([-0.1493,  1.1381], requires_grad=True)
tensor([0.], grad_fn=<MulBackward0>)


### Test 1

In [4]:
from TestProblem import SimpleTest

problem = SimpleTest(1.0)
model = problem.create_model([1, 2.0])

opt = ProjectedSGD(model, problem.f, problem.g, tau=0.01, beta=20.0, verbose=True)
#opt = ProjectedLangevin(model, simple_f, simple_g, tau=0.3, beta=8.0, alpha=0.5)

n_steps = 1000
for idx in range(n_steps):
    opt.step(None, None)
    if idx % 100 == 0:
        print (f'step={idx}, loss={opt.loss.detach().numpy()}')
    #print (model.x.detach().numpy())
print (model.x.detach().numpy())


step=0, loss=3.0
step=100, loss=0.5481491684913635
step=200, loss=-0.45217540860176086
step=300, loss=-0.6795360445976257
step=400, loss=-0.9348873496055603
step=500, loss=-0.823089599609375
step=600, loss=-0.780804455280304
step=700, loss=-0.8840094804763794
step=800, loss=-0.9682284593582153
step=900, loss=-0.9883614182472229
[-0.2841745  -0.67461526]
