In [2]:
%matplotlib qt
import numpy as np
import matplotlib.pyplot as plt
from mpl_toolkits import mplot3d
import sys
sys.path.insert(0, "../scripts")
import util
# import pfilter_multi as pfilter
from scipy.stats import norm

np.set_printoptions(precision=4)
!pwd

/home/zpyang/git/visnet/src/visnet/notebooks


In [24]:
class Camera:
    def __init__(self, cam_param, cam_pos, cam_att):
        self.param = cam_param
        self.pos = cam_pos
        self.att = cam_att
        self.R_cam = util.so3_exp(self.att)
        self.K = util.get_cam_in(self.param)
        self.R = util.R_model2cam @ self.R_cam # this rotation is kinda goofy but works, TODO: maybe
        self.P = util.get_cam_mat_lie(self.param, self.pos, self.att)
        self.range = 20 # meters
        self.fov = np.deg2rad(90) #degrees, both directions
        
        
    def __repr__(self):
        return "param: {}, pos: {}, att: {}".format(self.param, self.pos, self.att)
        
    def _get_pixel_pos_hom(self, target_pos):
        return self.P @ util.cart2hom(target_pos)
    
    def _get_pixel_pos(self, target_pos):
        return util.hom2cart(self._get_pixel_pos_hom(target_pos))
    
    def _get_distance(self, target_pos):
        """
        !!!Should not be used directly in estimation!!!
        """
        dist =  np.linalg.norm(target_pos-self.pos)
        return dist
    
    def get_bearing(self, target_pos):
        """
        considers the FOV and range of the camera
        Outside FOV - no measurement
        Outside range - worse measurement, more noise?
        """
        # check FOV on x & y axis of the camera frame
        dist = self._get_distance(target_pos)
        vec = self.R_cam @ (target_pos - self.pos) / dist
        ang1 = np.arctan2(vec[2], vec[0])
        ang2 = np.arctan2(vec[1], vec[0])
        if not(ang1 < self.fov/2 and ang1 > -self.fov/2) or not(ang2 < self.fov/2 and ang2 > -self.fov/2) :
            return np.array([[-1, -1]])
        
        bearing = self._get_pixel_pos(target_pos)
        # if dist > self.range:
        #     return self.add_pixel_noise(bearing, sigma=20).reshape((1,2))
        return self.add_pixel_noise(bearing, sigma=10).reshape((1,2))
        
    def add_pixel_noise(self, pixel_coord, sigma=5):
        return pixel_coord + np.random.normal(0, sigma)
    
class CamNode(Camera):
    def __init__(self, cam_param, cam_pos, cam_att):
        super(CamNode, self).__init__(cam_param, cam_pos, cam_att)
        self.max_false_alert = 4
        
    def _get_vec(self, pixel_pos):
        """
        Back projected ray from the camera to the target (approximate)
        """
        pixel_coord = self.add_pixel_noise(pixel_pos)
        K = self.K
        R = self.R
        Bp = np.linalg.inv(K @ R)
        vec = Bp @ util.cart2hom(pixel_pos).astype('int32') # cast to integer because actual pixels are discrete
        vec = vec/np.linalg.norm(vec)
        return vec
    
    def get_measurement(self, targets):
        """
        Bearing measurements in pixel coordinate (and target identity) 
        bearing.shape = (n,2)
        """
        
        dim = targets.shape
        bearings = np.empty([0,2])
        for target_pos in targets:
            bearing = self.get_bearing(target_pos).reshape((1,2))
            bearings = np.vstack([bearings,bearing])

        # add false alerts
        # rand = np.random.uniform()
        # if rand < -0.1:
        #     n_false_alert = np.random.randint(0, self.max_false_alert)
        #     bearing = np.vstack([bearing, np.random.uniform(0, 2000, size=(n_false_alert,2))])

        return bearings

class CamGroup():
    def __init__(self, cam_param, cam_poses):
        self.n_cams = cam_poses.shape[0]
        self.cam_param = cam_param
        self.cam_poses = cam_poses
        self.cam_nodes = [CamNode(cam_param, poses[0:3], poses[3:6]) for poses in cam_poses]
        
    def get_group_measurement(self, targets, hypo_mode=False):
        z = []
        if hypo_mode: # each cam only gives one measurement of a particle
            for node in self.cam_nodes:
                z.append(node.get_bearing(targets))
        else: # simulation mode
            for node in self.cam_nodes:
                z.append(node.get_measurement(targets))
        return z

cam_param = [642.0926, 642.0926, 1000.5, 1000.5,0]
# [x y z roll pitch yaw]
cam_poses = np.array([
    [20, 20, 12, 0, 0, -2.2],
    [20, -20, 12, 0, 0, 2.2],
    [-20, -20, 12, 0, 0, 0.7],
    [-20, 20, 12, 0, 0, -0.7],
])
cam_poses.shape

(4, 6)

In [25]:
def observe_fn(particles, cam_group):
    """
    Parameters:
        particles : all particles at a step
    Returns:
        hypothesis from x using the camera projection matrix
    """
    res = 2000 # camera's resolution
    hypothesis = []
    for p in particles:
        pos = p[0:3]
        h = cam_group.get_group_measurement(pos, hypo_mode=True)
        h = np.array(h)
        hypothesis.append(h)
    hypothesis = np.array(hypothesis)
    return hypothesis

def prior_fn(n, spawn_region=None):
    if spawn_region is not None:
        pos = np.random.normal(spawn_region, 2*np.ones(3), (n,3))
        vel = np.random.normal(np.zeros(3), np.ones(1), (n,3))
        label = np.random.randint(0, 2, (n,1))
        particles = np.hstack([pos, vel, label])

        print("particles", particles.shape)
        return particles
    else:
        pos_x = np.random.uniform(-19, 19, (n,1))
        pos_y = np.random.uniform(-19, 19, (n,1))
        pos_z = np.random.uniform(10, 25, (n,1))
        vel_x = np.random.normal(0, 1, (n,1))
        vel_y = np.random.normal(0, 1, (n,1))

        vel_z = np.random.normal(0, 1, (n,1))
        label = np.random.randint(0, 2, (n,1))
        particles = np.hstack([pos_x, pos_y, pos_z, vel_x, vel_y, vel_z, label])
        print("particles", particles.shape)
        return particles

In [137]:
start = np.array([-19, 0, 15])
end = np.array([20, 0, 15])

num_targets = 1
pfs = []

particles = []
state = []
state_est = []
n_eff = []
t_final = 1

verbose = False
spawn_region = start

cam_group = CamGroup(cam_param, cam_poses[:, :])
n_particles = 1000


target_1 = np.array([5,10,20])
target_2 = np.array([-10,0,10])
targets = np.vstack([target_1, target_2])

    
#each camera can give multiple measurements
z_set = cam_group.get_group_measurement(targets)
particles = prior_fn(n_particles)

if verbose:
    print("==============================")
    print("z",z_set)

hypothesis = observe_fn(particles, cam_group).reshape(n_particles, cam_group.n_cams * 2)
print("hypothesis: ", hypothesis.shape)

def weight_fn(z_set, hypo, sigma=100,verbose=False):
    """
    Iterate through each camera, find the measurement most likely to be associated with the target
    @ z_set: [{yj}_0, {yj}_1, ...{yj}_n]    n cameras
    @ hypo: [bearing_0, bearing_1, ....bearing_n]   n_cameras
    """
    y = []
    weights = []
    weight = np.ones(n_particles)
    for i, zi in enumerate(z_set):
        if verbose:
            print("i, zi", (i,zi.shape))
        
        wi = np.ones(n_particles)/n_particles
        # wi = np.zeros(n_particles)
        hi = hypo[:, i*2:i*2+2]
        for yj in zi:
            
            w_tmp = util.squared_error(hi, yj, sigma=sigma)
            S_tmp = sum(w_tmp)
            if verbose:
                print("w_tmp", w_tmp.shape)
                print("S_tmp", S_tmp)

            wi += w_tmp
        
        wi = wi/sum(wi) 

        if verbose:
            print("weight: ", weight.shape)
        weights.append(wi)
    return np.array(weights)

weights = weight_fn(z_set, hypothesis, sigma=150, verbose=verbose)

particles (1000, 7)
hypothesis:  (1000, 8)


In [138]:
# def weight_visualize(particles, weights, cam_group):
from matplotlib import cm
print("particles: ", particles.shape)
print("weights: ", weights.shape)
n_cams = cam_group.n_cams
print("Number of camera: ", n_cams)

fig = plt.figure("weight visualization", figsize=(20,10))

for i, cam_node in enumerate(cam_group.cam_nodes):
    ax = fig.add_subplot(1, n_cams+1, i+1, projection="3d")
    ax.scatter3D(particles[:,0], particles[:,1], weights[i],
            cmap=cm.turbo,c=weights[i])
    ax.scatter(cam_node.pos[0], cam_node.pos[1], 0, marker="s", s=200,)
    ax.set_xlabel('x [m]')
    ax.set_ylabel('y [m]')
    ax.set_zlabel('weight')

w_joint = np.product(weights, axis=0)
w_joint = w_joint/np.sum(w_joint)
print(w_joint.shape)
ax = fig.add_subplot(1,n_cams+1, n_cams+1, projection="3d")

ax.scatter3D(particles[:,0], particles[:,1], w_joint,
        cmap=cm.turbo,c=w_joint)
ax.set_xlabel('x [m]')
ax.set_ylabel('y [m]')
ax.set_zlabel('weight')

indices = util.residual_resample(w_joint)
new_particles = particles[indices, :]
fig = plt.figure("resampled particles", figsize=(10,10))
ax = plt.axes(projection="3d")
ax.scatter(particles[:,0], particles[:,1], particles[:,2])
ax.scatter(targets[:,0], targets[:,1], targets[:,2], marker="P", s=100)
plt.show()

particles:  (1000, 7)
weights:  (4, 1000)
Number of camera:  4
(1000,)
