In [1]:
# d1 = [u1 == x1**2 + x2,
#       u2 == u1+x3-0.2*u4]
# d2 = [u3 == x1+x2,
#       u4 == u2**0.5+u3]

d1_res = lambda x, u: [u[0] - x[0] - x[1]**2,  u[1] - u[0] - x[2] + 0.2*u[3]]
d2_res = lambda x, u: [u[2] - x[0] - x[1],  u[3] - u[1]**0.5 - u[2]]
d1_analysis_u1_u2 = lambda x, u: [x[0]+x[1]**2, x[0]+x[1]**2+x[2]-0.2*u[3]]
d2_analysis_u3_u4 = lambda x, u: [x[0]+x[1], u[1]**0.5+x[0]+x[1]]

In [4]:
# analysis: has two functions; evaluate(residual) and solve()
# eliminate: takes two analyses, one to eliminate and one to eliminate from
# analysis(lambda x,y: A(x)*y - b, lambda x: np.linalg.solve(A(x),b)
# analysis.solve(x) -> y

# vision to integrate with pytorch  / without elimination
# def all_analysis.__init__(self, analyses):
#   self.analyses = analyses
# def all_analysis.forward(self, x):
# for analysis in self.analyses:
#   x = analysis.forward() 
# return x

# one analysis forward:
# def forward(z):
#   x,y = z
#   return A(x)*y - b

# eliminate
# def forward(z):
#   z[y1idx] = b1 + x[1]
#   return A(z)

# one analysis forward with solve
# def forward(x):
#   x[output_idx] = np.linalg.solve(A(x),b) #alternatively, use index_put
#   return y

# / with elimination
# setup of A with elimination
# x = Variable()
# A1elim.forward() # calculates one value of y
# Arest.forward() # calculates the rest of the values of y

# x.forward()
# Aelim_solve.forward() # solves A(x)*y-b for y



In [2]:
import torch
import torch.nn as nn

class LambdaFunctionModule(nn.Module):
    def __init__(self, func, grad_func):
        super(LambdaFunctionModule, self).__init__()
        # Store the lambda function and its gradient
        self.func = func
        self.grad_func = grad_func

    def forward(self, x):
        # Ensure x requires gradient
        if not x.requires_grad:
            x.requires_grad_(True)

        # Evaluate the function
        y = self.func(x)

        # Create an initial gradient for non-scalar output
        if y.dim() > 0:  # y is not a scalar
            grad_output = torch.ones_like(y)
        else:  # y is a scalar
            grad_output = None

        # Use autograd to compute the gradient
        y.backward(gradient=grad_output)  # Computes the gradient of y with respect to x
        auto_grad = x.grad.clone()  # Clone to avoid being erased after zeroing gradients

        # Reset gradients in x for manual gradient computation
        x.grad.data.zero_()

        # Compute the manual gradient
        manual_grad = self.grad_func(x)

        # Ensure manual_grad is in the correct form (tensor)
        if not isinstance(manual_grad, torch.Tensor):
            manual_grad = torch.tensor(manual_grad, dtype=x.dtype, device=x.device)

        # Return the function value, automatic gradient, and manual gradient
        return y.detach(), auto_grad, manual_grad


# Example usage
if __name__ == "__main__":
    # Define a simple function and its gradient
    func = lambda x: x ** 2
    grad_func = lambda x: 2 * x

    # Create a vector of parameters x
    x = torch.tensor([1.0, 2.0, 3.0], requires_grad=True)

    # Instantiate the module
    module = LambdaFunctionModule(func, grad_func)

    # Evaluate the function and gradients
    y, auto_grad, manual_grad = module(x)

    print("Function value at x:", y)
    print("Automatic gradient:", auto_grad)
    print("Manual gradient:", manual_grad)


Function value at x: tensor([1., 4., 9.])
Automatic gradient: tensor([2., 4., 6.])
Manual gradient: tensor([2., 4., 6.], grad_fn=<MulBackward0>)


In [48]:
def generate_dense_A_and_b_diff_sizes(x, n):
    # Extend x to match the desired dimensions, n
    # Here, we're using a simple repeat and trim strategy, but more complex methods could be used
    extended_x = x.repeat((n + len(x) - 1) // len(x))[:n]
    
    # Generate an initial base matrix, e.g., identity matrix scaled
    A_base = torch.eye(n) * (1 + torch.abs(extended_x[0]))
    
    # Apply variations based on the extended x to make A fully dense
    # Using outer product with a small perturbation to ensure it doesn't make A singular
    A_variation = torch.ger(extended_x, extended_x) / (torch.norm(extended_x)**2 + 1)
    A = A_base + A_variation
    
    # Generate b(x) as a function of the extended x
    # This ensures compatibility with A's dimensions and that there's always a solution
    b = torch.matmul(A, extended_x)
    
    return A, b


# Example usage
n = 2  # Dimension of the square matrix A
x = torch.tensor([1, 2, 3], dtype=torch.float32)  # Example vector x with different size
A, b = generate_dense_A_and_b_diff_sizes(x, n)
print("A(x):")
print(A)
print("\nb(x):")
print(b)
print(np.linalg.solve(A, b))


A(x):
tensor([[2.1667, 0.3333],
        [0.3333, 2.6667]])

b(x):
tensor([2.8333, 5.6667])
[1. 2.]


In [49]:
def subdivide_tensor(tensor, indices):
    # Create a mask for all elements
    mask = torch.ones(tensor.size(0), dtype=torch.bool)
    # Set the selected indices to False
    mask[indices] = False
    # Subdivide the tensor
    tensor1 = tensor[indices]
    tensor2 = tensor[mask]
    return tensor1, tensor2

In [57]:
def generate_residual(solveindices): # should return a function with forward? or not necessary
    def residual(x):
        y, z = subdivide_tensor(x, solveindices) 
        n = len(y)
        A, b = generate_dense_A_and_b_diff_sizes(z, n)
        #print(np.linalg.solve(A, b))
        return A @ y-b
    return residual

In [60]:
residual = generate_residual([0, 1, 2])
residual(torch.tensor([8, 5, 8, 8.0, 5.0]))

tensor([0., 0., 0.])

In [61]:
from scipy.optimize import fsolve

In [76]:
class solverModule(nn.Module):
    def __init__(self, residual, solveindices):
        super(solverModule, self).__init__()
        self.residual = residual
        self.solveindices = solveindices
    
    def forward(self, x):
        y0 = x[self.solveindices].clone()
        def fsolvefunc(y):
            x[self.solveindices] = torch.tensor(y, dtype=torch.float32)
            return self.residual(x)
        out = fsolve(fsolvefunc, y0) # x gets modified in place anyways
        return x

In [77]:
sm = solverModule(residual, [0, 1, 2])

In [82]:
sm(torch.tensor([1, 2, 3, 4.0, 5.0]))

tensor([4., 5., 4., 4., 5.])

In [83]:
from modeling.arghandling import Encoding

In [88]:
E = Encoding(('a','b'), (1,), (2,2))
E.encode({'a':1, 'b':torch.rand(2,2)}, flatten=True)

DeviceArray([1.        , 0.03401518, 0.97972786, 0.8262554 , 0.35029405],            dtype=float32)

# Small example
![PyTorch MDO (1).png](<residual_example.png>)


In [1]:
import torch

In [11]:
e1 = (('x2', 'x3'), ('x1','x4'), lambda a, b: (sum(a)+b, a[:2]+b))
e2 = (('x1', 'x3'), ('x5',), lambda a, b: (a@b,))
r = (('x4', 'x5'), lambda a, b: torch.cat((a, b)))
indices = {
    'x1': [0,1],
    'x2': [2,3,4],
    'x3': [5,6],
    'x4': [7,8],
    'x5': [9]
}
# update indices to have tensor entries instead of lists
for k,v in indices.items():
    indices[k] = torch.tensor(v, dtype=torch.long)

In [12]:
def AnalysisFunction(triplet):
    inputs, outputs, function = triplet
    def forward(x):
        all_inputs = [x[indices[xin]] for xin in inputs]
        out = function(*all_inputs)
        for i, xout in enumerate(outputs):
            x = x.index_put((indices[xout],), out[i])
        return x
    return forward

def Function(tupl):
    inputs, function = tupl
    def forward(x):
        all_inputs = [x[indices[xin]] for xin in inputs]
        return function(*all_inputs)
    return forward

def EliminateAnalysis(analyses, functions):
    def forward(x):
        for a in analyses:
            x = a(x)
        return torch.cat([f(x) for f in functions])
    return forward


In [13]:
x0 = torch.rand(10)
# Fix parameters (x3)
x0[indices['x3']] = torch.tensor([1., 2])
# Set initial values for solve vars (x2)
x0[indices['x2']] = torch.tensor([-1, -1, 4/3], dtype=torch.float32)
xvar = x0.clone()
xvar.requires_grad_()
A1 = AnalysisFunction(e1)
A2 = AnalysisFunction(e2)
R = Function(r)
S = EliminateAnalysis([A1, A2], [R])
S(xvar)

tensor([0., 1., 3.], grad_fn=<CatBackward0>)

In [14]:
torch.autograd.functional.jacobian(S, xvar)

tensor([[0.0000, 0.0000, 1.0000, 0.0000, 0.0000, 1.0000, 0.0000, 0.0000, 0.0000,
         0.0000],
        [0.0000, 0.0000, 0.0000, 1.0000, 0.0000, 0.0000, 1.0000, 0.0000, 0.0000,
         0.0000],
        [0.0000, 0.0000, 3.0000, 3.0000, 3.0000, 1.3333, 3.3333, 0.0000, 0.0000,
         0.0000]])

In [4]:
torch.manual_seed(1)
indices = ([0,1], [2,3,4], [5,6], [7,8], [9])
indices = [torch.tensor(i, dtype=torch.long) for i in indices]

def elim_and_residual(x):
    x2, x3 = x[indices[1]], x[indices[2]]
    # elimin 1
    x = x.index_put((indices[0],), sum(x2)+x3)
    x = x.index_put((indices[3],), x2[:2]+x3)
    # elimin 2
    x1 = x[indices[0]]
    x = x.index_put((indices[4],), x1@x3)
    # residual
    r = torch.cat((x[indices[4]], x[indices[3]]))
    return r

In [34]:
x0 = torch.rand(10)
# Fix parameters (x3)
x0[indices[2]] = torch.tensor([1., 2])
# Set initial values for solve vars (x2)
x0[indices[1]] = torch.tensor([-1, -1, 4/3], dtype=torch.float32)
xvar = x0.clone()
xvar.requires_grad_()
#torch.autograd.functional.jacobian(elim_and_residual, xvar)
elim_and_residual(xvar)

tensor([3., 0., 1.], grad_fn=<CatBackward0>)

In [77]:
from scipy.optimize import fsolve
import numpy as np

# some intermediary calculations in sequence

# then need to have a module with a forward and backward (that will be used for calculating the jacobian of the final functions later)
class ElimResidualFunc(torch.autograd.Function):
    @staticmethod
    def forward(ctx, function, solvefor, inputs, x):
        def eval_and_gradient():
            jacobian = None
            def eval_function(y):
                nonlocal jacobian
                x.data[solvefor] = torch.tensor(y, dtype=torch.float32)
                r = function(x)
                jacobian = torch.autograd.functional.jacobian(function, xvar)
                result = r.detach().numpy()
                return result

            def recover_gradient(y=None):
                J = jacobian[:, solvefor]
                return J
            
            def recover_jacobian(y=None):
                return jacobian
            
            return eval_function, recover_gradient, recover_jacobian
        
        eval_function, recover_gradient, recover_jacobian = eval_and_gradient()
        xguess = x.data[solvefor] 
        fsolve(eval_function, xguess, fprime=recover_gradient) #sets x in place
        J = recover_jacobian()
        ctx.save_for_backward(J, solvefor, inputs)
        return x
    
    @staticmethod
    def backward(ctx, grad_output):
        J, solvefor, inputs = ctx.saved_tensors
        J_u = J[:, solvefor]
        J_x = J[:, inputs]
        dudx = np.linalg.solve(-J_u, J_x)
        result = torch.zeros_like(grad_output)
        result[inputs] = grad_output[solvefor] @ dudx
        return None, None, None, result
    
class ElimResidual(torch.nn.Module):
    def __init__(self, function, solvefor, inputs):
        super(ElimResidual, self).__init__()
        self.function = function
        self.solvefor = solvefor
        self.inputs = inputs
    
    def forward(self, x):
        return ElimResidualFunc.apply(self.function, self.solvefor, self.inputs, x)

In [79]:
er = ElimResidual(elim_and_residual, indices[1], indices[2])

In [100]:
x0 = torch.rand(10)
# Fix parameters (x3)
x0[indices[2]] = torch.tensor([1., 2])
# Set initial values for solve vars (x2)
x0[indices[1]] = torch.tensor([-1, -1, 4/3], dtype=torch.float32)
xvar = x0.clone()
xvar.requires_grad_()
xout = er(xvar)
backpass = torch.zeros_like(xvar)
backpass[4] = 1
xout.backward(backpass)

In [102]:
xvar.grad

tensor([0.0000, 0.0000, 0.0000, 0.0000, 0.0000, 0.8903, 0.2236, 0.0000, 0.0000,
        0.0000])