Tracking Object in Video with a Particle Filter
===============================================

以下是Particle Filter的Class。

In [1]:
import numpy as np
import cv2
class Particle:
    # Repeatability
    np.random.seed(0)
    def __init__(self, HEIGHT=100, WIDTH=100, NUM_PARTICLES=1000, VEL_RANGE = 0.5):
        self.HEIGHT = HEIGHT
        self.WIDTH = WIDTH
        self.NUM_PARTICLES = NUM_PARTICLES
        self.VEL_RANGE = VEL_RANGE
        
        self.particles = np.random.rand(NUM_PARTICLES, 4)
        self.particles = self.particles * np.array( (WIDTH,HEIGHT,VEL_RANGE,VEL_RANGE) )
        self.particles[ :, 2:4 ] -= VEL_RANGE/2.0 # Center velocities around 0
        return None
    def get_frames(self, filename):
        video = cv2.VideoCapture(filename)
        while video.isOpened():
            ret, frame = video.read()
            if ret:
                yield frame
            else:
                break
        video.release()
        yield None
    def apply_velocity(self):
        self.particles[ :, 0 ] += self.particles[ :, 2 ]  # x = x + u
        self.particles[ :, 1 ] += self.particles[ :, 3 ]
        
    def enforce_edges(self):
        for i in range(self.NUM_PARTICLES):
            self.particles[i,0] = max(0, min(self.WIDTH-1, self.particles[i,0]))
            self.particles[i,1] = max(0, min(self.HEIGHT-1, self.particles[i,1]))
    def compute_errors(self, target_color, frame):
        errors = np.zeros(self.NUM_PARTICLES)
        TARGET_COLOUR = np.array( target_color ) # target_color is a tuple
        for i in range(self.NUM_PARTICLES):
            x = int(self.particles[i,0])
            y = int(self.particles[i,1])
            pixel_colour = frame[ y, x, : ]
            errors[i] = np.sum( ( TARGET_COLOUR - pixel_colour )**2 ) # MSE in colour space
        return errors

    def particle_color_mean(self, frame):
        
        blue = np.zeros(self.NUM_PARTICLES)
        green = np.zeros(self.NUM_PARTICLES)
        red = np.zeros(self.NUM_PARTICLES)
        
        for i in range(self.NUM_PARTICLES):
            x = int(self.particles[i,0]) #% frame.shape[1]
            y = int(self.particles[i,1]) #% frame.shape[0]
            blue[i] = frame[ y, x, 0 ]
            green[i] = frame[ y, x, 1 ]
            red[i] = frame[ y, x, 2 ]
        blue = np.average(blue)
        green = np.average(green)
        red = np.average(red)
        return (blue, green, red)
    
    def compute_weights(self, errors):
        weights = np.max(errors) - errors
        weights[ 
            (self.particles[ :,0 ] == 0) |
            (self.particles[ :,0 ] == self.WIDTH-1) |
            (self.particles[ :,1 ] == 0) |
            (self.particles[ :,1 ] == self.HEIGHT-1)
        ] = 0.0

        # Make weights more sensitive to colour difference.
        # Cubing a set of numbers in the interval [0,1], the farther a number is from 1, the more it gets squashed toward zero
        weights = weights**4

        return weights    
    
    def resample(self, weights):
        # Normalize to get valid PDF
        probabilities = weights / np.sum(weights)

        # Resample
        indices = np.random.choice(
            self.NUM_PARTICLES,
            size = self.NUM_PARTICLES,
            p = probabilities)
        self.particles = self.particles[ indices, : ]

        # Take average over all particles, best-guess for location
        x = np.mean(self.particles[:,0])
        y = np.mean(self.particles[:,1])
        return (int(x),int(y))

    def apply_noise(self):
        # Noise is good!  Noise expresses our uncertainty in the target's position and velocity
        # We add small variations to each hypothesis that were samples from the best ones in last iteration.
        # The target's position and velocity may have changed since the last frame, some of the fuzzed hypotheses will match these changes.
        POS_SIGMA = 1.0
        VEL_SIGMA = 0.5
        noise = np.concatenate(
            (
                np.random.normal(0.0, POS_SIGMA, (self.NUM_PARTICLES,1)),
                np.random.normal(0.0, POS_SIGMA, (self.NUM_PARTICLES,1)),
                np.random.normal(0.0, VEL_SIGMA, (self.NUM_PARTICLES,1)),
                np.random.normal(0.0, VEL_SIGMA, (self.NUM_PARTICLES,1))
            ),
            axis=1
        )
        self.particles += noise

    def display(self, frame, location):
        if len(self.particles) > 0:
            for i in range(self.NUM_PARTICLES):
                x = int(self.particles[i,0])
                y = int(self.particles[i,1])
                cv2.circle(frame, (x,y), 1, (0,255,0), 1)
        if len(location) > 0:
            cv2.circle(frame, location, 15, (0,0,255), 5)
        cv2.imshow('frame', frame)
        if cv2.waitKey(4) == 27: # wait n msec for user to his Esc key
            if cv2.waitKey(0) == 27: # second Esc key exits program
                return True
        return False

    def batch(self, frame):
        self.apply_velocity()
        self.enforce_edges()
        target_color = self.particle_color_mean(frame)
        target_color = (189,105,82)
        errors = self.compute_errors(target_color, frame) # target_color = (189,105,82)
        
        weights = self.compute_weights(errors)
        location = self.resample(weights)
        
        self.apply_noise()
        
        terminate = self.display(frame, location)
        
        return terminate

In [2]:
VFILENAME = "walking.mp4"

P = Particle(HEIGHT = 406, WIDTH = 722)

for frame in P.get_frames(VFILENAME):
    if frame is None: break
    
    terminate = P.batch(frame)
    
    if terminate:
        break
    
cv2.destroyAllWindows()