In [3]:
import numpy as np
import torch
import torch.nn as nn
from torch.autograd import Function
import numpy as np
import scipy.linalg
import attn_modules as attn


In [5]:
class encoder_attn(nn.Module):
    def __init__(self):
        """
        This will be the single class called in each training pass. Has to handle averything for a single transformer. 
        We have three transformers each taking a different slice of autoencoder latent space
        
        input:
            depth: the depth of each transformer layer all with dimension D
            head: the number of heads in each local+global pair
        """
        super().__init__()

        dim_encoder = 8 # Dimension at AttnBlock 
        d_encoder = 2
        num_heads_encoder = 1
        layer_depth_encoder = 2
        self.max_sequence_length = 15 # how many time steps we will use the same key before restarting

        dim_dynamics = 256
        d_dynamics = 2
        num_heads_dynamics = 1
        layer_depth_dynamics = 2
        self.key = 0
        emb = torch.rand((23, 4, 8))
        self.e = nn.Parameter(data = emb, requires_grad=True)
        emb_q = torch.rand((23, 1, 8))
        self.e_q = nn.Parameter(data = emb_q, requires_grad=True)

        at_l = attn.AttnLocal
        at_g = attn.AttnGlobal
        at_dyn = attn.AttnDyn

        self.lin_map = nn.Linear(64,256)
        self.q_map = nn.Linear(64,256)
        self.dynamic_map = nn.Linear(8,1)

        self.encode = nn.ModuleList([attn.attention_local_global_layer(dim = dim_encoder, depth = d_encoder, heads = num_heads_encoder, AttnType = at_l)
                                for i in range(layer_depth_encoder)
                                ])

        self.dynamics = nn.ModuleList([attn.attention_local_global_layer(dim = dim_dynamics, depth = d_dynamics, heads = num_heads_dynamics, AttnType = at_dyn)
                                for i in range(layer_depth_dynamics)
                                ])

    def key_emb(self,x):
        """
        The forward takes either an image or a flat vector of arbitrary size, patch embeds it
        then runs it through the corresponding attention network, the norms+avgpools+flattens it again
        """
        skip = x
        x = x.permute(2,0,1)
        x = torch.concat((x, self.e), dim = 0).permute(1,0,2)
        for layer in self.encode:
            x = layer(x, x)
        
        x = x.reshape(8,64,-1).flatten(-2,-1)

        return x
       
    def dynamic_update(self, q, k, v):

        q = torch.concat((q.permute(2,0,1), self.e_q), dim = 0).permute(2,1,0) #.flatten()
        q = self.q_map(q).squeeze().unsqueeze(0)
        kv = torch.concat((k.unsqueeze(0),v),dim=1)

        for layer in self.dynamics:
            q = layer(q,kv)
        
        x = self.dynamic_map(q.permute(0,2,1))
            
        return x
        
        

In [20]:
DyAttn = encoder_attn()
x = torch.ones((8, 128, 32))
l = torch.ones((1024, 28, 28))

sig = torch.zeros((1,1,512,256)).squeeze().unsqueeze(0).requires_grad_(True)
v_state = torch.rand((4,1,9))
v_state_current = torch.rand((1,1,9))
red = torch.ones(4,8,32)
lat = torch.ones(1,8,32)
v_state = v_state.repeat(1,8,1)

v_state_current = v_state_current.repeat(1,8,1)
Q = torch.concat((lat, v_state_current), dim = 2).requires_grad_(True) # [1, 8, 41]: 8 windows from 1024, size 32 per window, + 9 states
inp = torch.concat((red.squeeze(), v_state), dim = 2)
print('Key input', inp.shape,', Dynamic update input', Q.shape)

t = DyAttn.key_emb(inp.requires_grad_(True))
p = DyAttn.dynamic_update(Q, t, sig, 0).squeeze()

print('Dynamic attn model output',p.shape)
print(inp.requires_grad, t.requires_grad, Q.requires_grad, sig.requires_grad)
torch.mean(p).backward()

Key input torch.Size([4, 8, 41]) , Dynamic update input torch.Size([1, 8, 41])
Dynamic attn model output torch.Size([256])
True True True True


In [8]:
# sqrtm
class MatrixSquareRoot(Function):
    """Square root of a positive definite matrix.
    NOTE: matrix square root is not differentiable for matrices with
          zero eigenvalues.
          from https://github.com/steveli/pytorch-sqrtm
    """
    @staticmethod
    def forward(ctx, input):
        m = input.detach().cpu().numpy().astype(np.float_)
        sqrtm = torch.from_numpy(scipy.linalg.sqrtm(m).real).to(input)
        ctx.save_for_backward(sqrtm)
        return sqrtm

    @staticmethod
    def backward(ctx, grad_output):
        grad_input = None
        if ctx.needs_input_grad[0]:
            sqrtm, = ctx.saved_tensors
            sqrtm = sqrtm.data.cpu().numpy().astype(np.float_)
            gm = grad_output.data.cpu().numpy().astype(np.float_)

            # Given a positive semi-definite matrix X,
            # since X = X^{1/2}X^{1/2}, we can compute the gradient of the
            # matrix square root dX^{1/2} by solving the Sylvester equation:
            # dX = (d(X^{1/2})X^{1/2} + X^{1/2}(dX^{1/2}).
            grad_sqrtm = scipy.linalg.solve_sylvester(sqrtm, sqrtm, gm)

            grad_input = torch.from_numpy(grad_sqrtm).to(grad_output)
        return grad_input


sqrtm = MatrixSquareRoot.apply

In [114]:
# UKF
import numpy as np
import torch
import torch.nn as nn
# import xnet, whack_nn, depth_net # placeholder for wriitng code

"""
TODO:
1. integrate attn  net to kf
2. Finish squeeze excite block

DONE:
1. encoder for the transformer
2. decoder for the transformer
3. just the whole transformer lol

"""

def RelU_1(x):
    return torch.maximum(torch.maximum(x, -1), 1)


# Returns: [2NxN] sigma points and P_k
def sigma_points(x, P_u, Q_k):
    """
    Inputs: [1xN] vector, the previous covaroiance matrix P_u[+|k-1], and process noise Q_k
    Returns: [2NxN] vector of sigma points, and a [NxN] matrix of covariance P_u[-|k]
    """

    sigma = x.repeat(1,1,1,x.shape[2])
    print('Sigma points for',x.shape[2])

    p = sqrtm(x.shape[2]*torch.abs(P_u.squeeze()))    
    sigma_plus = sigma + p
    sigma_minus = sigma - p

    sigma = torch.concat((sigma_plus, sigma_minus), dim=3)
    scale_factor = 1/(2*x.shape[2])
    x_bar = torch.mean(sigma, dim=3)
    sum = torch.zeros((x.shape[1],x.shape[2],x.shape[2]))
    extra = np.zeros((x.shape[2]*2,x.shape[2],x.shape[2]))

    for i2 in range(sigma.shape[1]):
        for i1 in range(sigma.shape[3]):
            vecp = sigma[0,0,:,i1].unsqueeze(1)
            c = (vecp.squeeze()-x_bar[0,i2].squeeze().T)
            sum[i2] += torch.matmul(c.unsqueeze(-1), c.unsqueeze(0))

    P_k = sum*scale_factor + (Q_k**2)*0.01
    
    return P_k, sigma.permute(0,1,3,2) #, extra # The sigma points for X_hat[k|k-1] and the cross variance matrix for X_hat[k|k-1]
    #return sigma, x_bar

# Returns: Updated X post, P_k post, K_k for record keeping
def UKF_update(x_sigma, y_sigma, R_k, P_u, Y):

    """
    P_xx, p_hat, sigma_Y_p, state_Y, self.R_kp
    Inputs: uhhhhhhh... the [2NxN] sigma points for X, the [2NxN] outputs Y from the equation H(X_sigma_i) = Y_sigma_i,
    the measurment [1xN] noise R_k, the previous [NxN] covariance matrix P_u, and the actual [1xN] state estimate Y

    Returns: updated vector X_posteriori, the updated covariance matrix P_k+, and Kalman gain K_k
    p = sqrtm(x.shape[2]*torch.abs(P_u.squeeze()))    
    sigma_plus = sigma + p
    sigma_minus = sigma - p
    """
    print('UKF for', x_sigma.shape[2])
    
    x_bar = torch.mean(x_sigma, dim=2) # This give a Nx1 vector of the means
    y_bar = torch.mean(y_sigma, dim=2)
    scale_factor = 1/(x_sigma.shape[2])

    P_xy = torch.zeros((x_sigma.shape[1],x_bar.shape[2],x_bar.shape[2]))
    P_yy = P_xy
    
    for i2 in range(P_xy.shape[0]):
        for i1 in range(x_sigma.shape[3]):
            vec_x = x_sigma[0,0,i1,:]#.unsqueeze(1)
            vec_y = y_sigma[0,0,i1,:]#.unsqueeze(1)
            
            c_1 = (vec_x.squeeze()-x_bar[0,i2].squeeze().T)
            c_2 = (vec_y.squeeze()-y_bar[0,i2].squeeze().T)

            P_xy[i2] += torch.matmul(c_1.unsqueeze(-1), c_2.unsqueeze(0)) * scale_factor
            P_yy[i2] += torch.matmul(c_2.unsqueeze(-1), c_2.unsqueeze(0)) * scale_factor # R_k is sigma so convert to sigma^2

    P_yy = P_yy + (R_k**2)

    K_k = torch.matmul(P_xy, torch.inverse(P_yy))
    if (K_k.shape[0] > 1):
        K_k = torch.mean(K_k, dim=0)
   
    res = (Y - y_bar).squeeze().T

    X_k_posterior = x_bar.squeeze() + torch.matmul(K_k, res).unsqueeze(0).permute(0,2,1).squeeze()
   
    P_k_posterior = P_u - torch.matmul(K_k.squeeze(), torch.matmul(P_yy.squeeze(), K_k.squeeze().T)).unsqueeze(0)

    return X_k_posterior, P_k_posterior, K_k # Returns X_k|k, P_k|k, K_k

# Returns: X_hat
class dynamic_model(nn.Module):
    def __init__(self):
        super().__init__()
        """
        Module predicts the current states based on learned and physical dynamic models. Is run between frames
        Inputs: Vehicle state time k-1, latent features time k-1, dt
        Returns: Vehicle state prediciton X_k_hat and latent feature prediciton X_thetaHat_k at time k
        """
        self.model = encoder_attn()# this will be the full transformer
        self.dynamics_v = dynamic_model_vehicle()
        self.sigma_ = sigma_points
        
       
    def forward(self, state_vehicle, state_latent, key, dt, P_x, P_theta,  Q_l, Q_x):
        #print('Vehicle', state_vehicle.shape, 'latent', state_latent.shape)
        P_x, vehicle_sigma = self.sigma_(state_vehicle, P_x, Q_x) # Convert vehicle state into sigma points
        x_p_hat_sigma = self.dynamics_v(vehicle_sigma, dt) # Model of vehicle dynamcis in real world with state space matrix. Input: [X_k-1_sigma, dt] Return: posteriori estimate X_k_hat
        x_p_hat = torch.mean(x_p_hat_sigma, dim = 2) # Mean of sigma points represent "true" posterior state
        
        for i1 in range(state_latent.shape[0]):
            s = state_latent[i1].unsqueeze(0)
            P_theta_, latent_sigma = self.sigma_(s, P_theta, Q_l) # Convert latent space into sigma points and pass as "Values" through transformer

        v_state_current = x_p_hat.repeat(1,8,1)
        Q = torch.concat((state_latent.reshape((1,8,32)), v_state_current), dim = 2) # [1, 8, 41]: 8 windows from 1024, size 32 per window, + 9 states
        K = key
        V = latent_sigma.squeeze().unsqueeze(0)
        x_theta = self.model.dynamic_update(Q, K, V).unsqueeze(0) #inputs: q, k, v #latent_sigma, , dt, P_theta, P_x

        for i1 in range(state_latent.shape[0]):
            s = x_theta[i1].unsqueeze(0)
            P_theta_, x_theta_sigma = self.sigma_(s, P_theta, Q_l) # Convert latent space into sigma points and pass as "Values" through transformer
        return x_theta_sigma, x_p_hat_sigma, P_x, P_theta_ # X_theta is the predicted theta state , X_phat is the predicted vehicle state Dr. arguelles, Dr. Day, Dr. Huanyu, 

    def embed(self, x_theta, x_vehicle):
        # Inputs: latent space and vehicle state for last 4 time steps [k-1,k-4]
        v_state = x_vehicle.repeat(1,8,1)
        inp = torch.concat((x_theta.squeeze(), v_state), dim = 2)

        k = self.model.key_emb(inp)
        return k
        

class dynamic_model_vehicle(nn.Module):
    def __init__(self):
        super().__init__()
        """
        Inputs: vehicle state X_k, dt
        Returns: Vehicle estimated state X_k+dt
        """
        x = np.zeros(3)
        x_ = torch.from_numpy(x)
        parameters = nn.Parameter(data = x_, requires_grad = True) # Define dynamic model as nn parameters to propagate gradients through

    def forward(self, x, dt):
        dynamics_k = x*dt

        return dynamics_k


class UKF(nn.Module):
    def __init__(self, l_size, s_size, l_hist, v_hist):
        super().__init__()

        self.dynamics = dynamic_model()
        self.sigma_ = sigma_points
        self.ukf = UKF_update

        self.s_l = l_size # 1024 // (8*4)
        self.s_v = torch.eye(s_size)

        R_tensor_p = torch.eye(s_size) * torch.abs(torch.randn(s_size)) #self.s_v.shape) #torch.tensor(self.s_v, dtype=torch.double).random_(0,1) # Initialize measurement noise as random tensor [0,1]
        R_tensor_l = torch.eye(l_size) * torch.abs(torch.randn(l_size)) # torch.tensor(torch.eye(49), dtype=torch.double).random_(0,1) # Initialize measurement noise as random tensor [0,1]
        Q_tensor = torch.eye(s_size) * torch.abs(torch.randn(s_size)) # torch.tensor(6, dtype=torch.double).random_(0,1) # Initialize vehicle process noise as random tensor [0,1] for the 6dof
        Q_tensor_l = torch.eye(l_size) * torch.abs(torch.randn(self.s_l)) # torch.tensor(self.s_l, dtype=torch.double).random_(0,1) # Initialize latent process noise as random tensor [0,1]

        self.Q_k_x = nn.Parameter(data = Q_tensor, requires_grad = True) # Learned process noise parameters
        self.Q_k_l = nn.Parameter(data =Q_tensor_l, requires_grad = True) # Learned process noise parameters
        self.R_kp = nn.Parameter(data = R_tensor_p, requires_grad = True) # Learned measurement noise parameters
        self.R_kl = nn.Parameter(data = R_tensor_l, requires_grad = True) # Learned measurement noise parameters

        self.tf_len = 15
        self.embed = True
        self.key = 0

        self.energy = 0
        self.threshold = 10
        self.latent_history = l_hist
        self.vehicle_history = v_hist

    def predictor(self, image_t2, latent, x_vehicle, dt, P_xx_prior, P_xxl_prior): # Inputs: image at time t+1, vehicle state at time t, altent time t/k

         # either embed the new latents and states every tf_len steps/ based on some energy metric above a threshold
        if 1: #self.energy > self.threshold: # All good jsut need to update state history each loop
            self.key = self.dynamics.embed(self.latent_history, self.vehicle_history)

        # Priori update---------------
        p_hat, latent_hat, P_xx, P_xxl = self.F(latent, x_vehicle, P_xx_prior, P_xxl_prior, dt) # F(X:t-1|t-1) -> return state for t|t-1
        p_hat_mean = torch.mean(p_hat, dim=2)
        l_hat_mean = torch.mean(latent_hat, dim=2)

        # Measurement Y prediction----------
        sigma_Y_p, sigma_Y_l = self.H(p_hat_mean, l_hat_mean, P_xx, P_xxl) # Collect measurement prediction by passing the latent sigma points through odom net H(X)
        # Collect measurments Y-----------

        return p_hat, sigma_Y_p, P_xx, latent_hat, sigma_Y_l, P_xxl, l_hat_mean, p_hat_mean

    def update(self, p_hat, sigma_Y_p, P_xx, state_Y, latent_hat, sigma_Y_l, P_xxl, latent_Y):
        # Perform UKF update with X, Yhat, and Y
        # x_sigma, y_sigma, R_k, P_u, Y
        X_kpose_post, P_kp_post, K_kp = self.ukf(p_hat, sigma_Y_p, self.R_kp, P_xx, state_Y) # X_k is new latent vector and vehicle state, P_k is new P_k, K_k is just there for interesting plots lol | X_hat_prior, Y_hat, self.R_k, P_k_prior, Y

        X_klatent_post, P_kl_post, K_kl = self.ukf(latent_hat, sigma_Y_l, self.R_kl, P_xxl.unsqueeze(0), latent_Y) # X_k is new latent vector and vehicle state, P_k is new P_k, K_k is just there for interesting plots lol | X_hat_prior, Y_hat, self.R_k, P_k_prior, Y
      
        return X_kpose_post, X_klatent_post, P_kp_post, P_kl_post, K_kp, K_kl


    def F(self, latent, x_vehicle, P_xx_prior, P_xxl_prior, dt):
        """
        Inputs: Latent features, 6dof Vehicle state , P_x from t-1, P_l from k-1, dt
        Returns: 6dof vehicle time t, latent feature prediction time k, covar x, covar latent 
        """
        latent_hat, xhat_p, P_xhat, P_lhat = self.dynamics(x_vehicle, latent, self.key, dt, P_xx_prior, P_xxl_prior, self.Q_k_l, self.Q_k_x) # takes vehicle state time t, latent space k, current key, dt: returns vehicle state k+1, latent space k+1
        return xhat_p, latent_hat, P_xhat, P_lhat # Pose estimated by latent space forecasting and odom net, Pose estimated by physical dynamics, latent space forecast


    def H(self, p_hat, latent_hat, P_xx, P_xxl):
        # Measurement model H(X_hat) = Y_hat
        P_xx, sigma_p_prior = self.sigma_(p_hat.reshape(1,1,p_hat.shape[2],1), P_xx.squeeze(), self.Q_k_x) # Convert t|t-1 to sigma points
        P_xxl, sigma_l_prior = self.sigma_(latent_hat.unsqueeze(-1), P_xxl, self.Q_k_l) # Convert k|k-1 to sigma points
        
        #yhat_pose_sigma = self.pose(sigma_l_prior) 
        yhat_pose_sigma = sigma_p_prior # self.pose(sigma_l_prior) # takes latent space sigma points at k+1: returns vehicle state sigma K+1
        return yhat_pose_sigma, sigma_l_prior # Yhat at time t+1 based on sigma from latent hat


class SE_mapped(nn.Module):
    def __init__(self, inp, reduction_factor):
        super().__init__()
        i = inp // reduction_factor
        self.rf = reduction_factor
        self.c1 = nn.Conv2d(i, i, 3, 2, 1)
        self.c2 = nn.Conv2d(i, i // 2, 3, 2, 1)
        self.c3 = nn.Conv2d(i//2, i//4, 7)
        self.bn = nn.BatchNorm2d(i)

        self.map_mul = nn.Linear(256, 1024)
        self.map_add = nn.Linear(256, 1024)

        self.act = nn.PReLU()

    def encode(self, inp): # Inp is latent space, im is image
       
        batch_size = 1 #inp.shape[0]
        
        inp_ = inp.reshape(batch_size, self.rf, -1, inp.shape[2], inp.shape[2]).squeeze()
        x = self.c1(inp_)
        x = self.c2(x)
        x_encoded = self.c3(x).flatten().unsqueeze(0).unsqueeze(0).unsqueeze(-1)

        return x_encoded

    def decode(self, x_, skip):
        """
        Will take in a flat vector since this is all done with transformers. Its got to expand back into
        two inp//reduction_factor vectors. One for adding one for multiplying
        """
        x_mul = self.act(self.map_mul(x_))
        x_add = self.act(self.map_add(x_))
        print(x_mul.shape, skip.shape)
        x = (skip.permute(0,2,3,1) * x_mul + x_add).reshape(8, 128, 28, 28) # skip connection gets perturbed by a mult and add method
        print(x.shape)
        x = self.bn(x)
        return x_mul, x_add



In [116]:
se = SE_mapped(1024,8)

if 0:
    latent = depth.latent(depth.encode(image_t1), 0) # Encode new image frame at time k,t to latent space
latent = se.encode(latent_)

# Perform everyhting up to measurement step
p_hat, sigma_Y_p, P_xx, latent_hat, sigma_Y_l, P_xxl, l_hat_mean, p_hat_mean = ukf.predictor(image_t2, latent, x_vehicle, dt, P_xx_prior, P_xxl_prior)

# Collect measurements
if 0:
    latent_Y = depth.latent(depth.encode(image_t2), 0) # Encode new image frame at time k,t to latent space
    state_Y = pose(latent_Y)
    
latent_Y = l_hat_mean # Encode new image frame at time k,t to latent space
state_Y = p_hat_mean

# Perform ukf update step
X_kpose_post, X_klatent_post, P_kp_post, P_kl_post, K_kp, K_kl = ukf.update(p_hat, sigma_Y_p, P_xx, state_Y, latent_hat, sigma_Y_l, P_xxl, latent_Y)
out = se.decode(X_klatent_post, latent_)



Sigma points for 9
Sigma points for 256
Sigma points for 256
Sigma points for 9
Sigma points for 256
UKF for 18
UKF for 512
torch.Size([1024]) torch.Size([1, 1024, 28, 28])
torch.Size([8, 128, 28, 28])


In [None]:

def placeholder():
    x = 0


l_history = torch.ones(4,8,32)
v_history = torch.rand((4,1,9)) #.repeat(1,8,1)
numVstates = 9
numLfeatures = 256

ukf = UKF(numLfeatures, numVstates, l_history, v_history)

depth = placeholder()
pose = odom_net()
image_t1 = 0
image_t2 = 0
latent = torch.randn((1, 1, 256)).unsqueeze(-1)
latent_ = torch.randn((1024,28,28)).unsqueeze(0)
x_vehicle = torch.randn((9,1)).unsqueeze(0).unsqueeze(0)
dt = 0
P_xx_prior = torch.eye(9)*torch.abs(torch.rand(9)).unsqueeze(0)
l_size = 256
P_xxl_prior = torch.eye(l_size)*torch.abs(torch.rand(l_size)).unsqueeze(0)





In [86]:

# KalmanJob12

if 1:
    t = 256
    Q_tensor = torch.abs(torch.eye(256) * torch.randn(256))
    P_x = torch.abs(torch.eye(256)*torch.rand(256).unsqueeze(0))
    x = torch.randn((1,1,256)).unsqueeze(-1)
    y = torch.randn((256,1)).unsqueeze(0).unsqueeze(0)
    Q_x = nn.Parameter(data = Q_tensor)
    Q_tensor = torch.abs(torch.eye(256) * torch.randn(256))
    R = nn.Parameter(data = Q_tensor)
else:
    t = 4
    Q_tensor = torch.abs(torch.eye(t) * torch.randn(t))
    P_x = torch.abs(torch.eye(t)*torch.rand(t).unsqueeze(0))
    x = torch.randn((t,1)).unsqueeze(0).unsqueeze(0)
    y = torch.randn((t,1)).unsqueeze(0).unsqueeze(0)
    y = torch.randn((t,1)).unsqueeze(0).unsqueeze(0)
    Q_x = nn.Parameter(data = Q_tensor)

    m = np.array([0,0,1,1])
    m_ = torch.from_numpy(m)

    R = nn.Parameter(data = Q_tensor)



if 0:
    a, b = sigma_points(x, P_x, Q_x)
    print(a,b)


if 0:
    for i1 in range(50):
        p, x = sigma_points(x, P_x, Q_x)
        x = x.unsqueeze(-1)
        x_ = torch.mean(x, dim=2)
        x, P_x, K_k = UKF_update(x, x, R, p, y)
        
        print(P_x.max(), P_x.min())
        x = x.reshape(1,1,t,1)
        P_x = P_x.squeeze()



Sigma points for 256
UKF for 512
tensor(0.8043, grad_fn=<MaxBackward1>) tensor(-5.0325e-08, grad_fn=<MinBackward1>)
Sigma points for 256
UKF for 512
tensor(0.7070, grad_fn=<MaxBackward1>) tensor(-1.4109e-08, grad_fn=<MinBackward1>)
Sigma points for 256
UKF for 512
tensor(0.6432, grad_fn=<MaxBackward1>) tensor(-1.5625e-08, grad_fn=<MinBackward1>)
Sigma points for 256
UKF for 512
tensor(0.5914, grad_fn=<MaxBackward1>) tensor(-1.3885e-08, grad_fn=<MinBackward1>)
Sigma points for 256
UKF for 512
tensor(0.5498, grad_fn=<MaxBackward1>) tensor(-1.8447e-08, grad_fn=<MinBackward1>)
Sigma points for 256
UKF for 512
tensor(0.5264, grad_fn=<MaxBackward1>) tensor(-1.4063e-08, grad_fn=<MinBackward1>)
Sigma points for 256
UKF for 512
tensor(0.5059, grad_fn=<MaxBackward1>) tensor(-1.6105e-08, grad_fn=<MinBackward1>)
Sigma points for 256
UKF for 512
tensor(0.4877, grad_fn=<MaxBackward1>) tensor(-1.3280e-08, grad_fn=<MinBackward1>)
Sigma points for 256
UKF for 512
tensor(0.4715, grad_fn=<MaxBackward1>) 

KeyboardInterrupt: 

In [None]:


print(a.shape, b.shape)
x_bar = b
i2 = 0
vecp = a[0,0,:,i1].unsqueeze(1)
print(vecp.squeeze().shape,x_bar[0,i2].shape)
c = (vecp.squeeze()-x_bar[0,i2].squeeze().T)
print(c)
print(torch.matmul(c.unsqueeze(-1), c.unsqueeze(0)).detach().numpy())

#torch.matmul((vecp.squeeze()-x_bar[0,i2].squeeze().T), (vecp.squeeze()-x_bar[0,i2].squeeze().T).T)




In [None]:



image_t2 = 0
latent = torch.randn((1, 1, 256)).unsqueeze(-1)
latent_ = torch.randn((1024,28,28)).unsqueeze(0)
x_vehicle = torch.randn((6,1)).unsqueeze(0).unsqueeze(0)
dt = 0
P_xx_prior = torch.eye(6)*torch.abs(torch.rand(6)).unsqueeze(0)
l_size = 256
P_xxl_prior = torch.eye(l_size)*torch.abs(torch.rand(l_size)).unsqueeze(0)


#se = SE_mapped(1024, 8)
ukf = UKF(l_size)

#o = se.encode(latent_).flatten()
#print(o.shape)

X_p, X_l, P_xx_post, P_xxl_post, K_kp, K_kl = ukf.forward(image_t2, latent, x_vehicle, dt, P_xx_prior, P_xxl_prior)

#print('post')



