In [2]:
import numpy as np
from scipy import linalg as la
from matplotlib import pyplot as plt
%matplotlib inline

def get_flips(n,theta=None):
    """
    Generate data for the three-coin problem. 
    Note: heads is 1 and tails is 0
    
    Inputs
    ------
    n: number of flips of coin 0 to generate
    theta: probability vector l, p1, p2.
         If theta is None, randomly generate l, p1, p2.
    
    Outputs
    -------
    out: 3xn NumPy array with the desired data.
    """
    
    if not theta:
        theta = np.random.rand(3)
       
    out = np.zeros((3,n))
    coin_zero = np.random.binomial(1,theta[0],n)
    
    for i in xrange(n):
        # We could do this in one line but this is more clear
        if coin_zero[i] == 1:
            # Toss coin 1 three times
            out[:,i] = np.random.binomial(1,theta[1],3)
        else:
            # Toss coin 2 three times
            out[:,i] = np.random.binomial(1,theta[2],3)
            
    return out

def getP(X,theta):
    """Calculate P(Y=h,X) using current parameters in theta."""
    
    # Get number of heads in each draw
    h = np.sum(X,axis=0)
    t = 3 - h
    l, p1, p2 = theta[0], theta[1], theta[2]
    
    P_heads = l * p1**h * (1-p1)**t
    P_tails = (1-l) * p2**h * (1-p2)**t
    
    return P_heads / (P_heads + P_tails)
    

def EM(X,theta0):
    """
    Calculate the maximum likelihood estimate for data given
    by the three coin problem. 
    
    Inputs
    ------
    flip_data: a 3xn NumPy array containing the triple flips
        of coin 1 and coin 2, as given by get_flips()
    theta0: initial guess for theta.
        
    Outputs
    -------
    out: full (n+1)x(n+3) table containing guesses for theta 
        at all stages as well as posterior probabilities at 
        each stage. Final theta results found in out[-1,:3]
    """
    n = X.shape[1]
    h = np.sum(X,axis=0)
    out = np.zeros((n+1,n+3))
    out[0,:3] = theta0
    
    # Fill in hidden variables P(Y=h,X) using initial guess
    out[0,3:] = getP(X,theta0)
    
    for i in xrange(1,n+1):
        # Re-estimate parameters using hidden variables from i-1
        prev = out[i-1,3:]
        l = np.sum(prev)/n
        p1 = np.sum(h*prev) / (3*np.sum(prev))
        p2 = np.sum(h*(1-prev)) / (3*np.sum(1-prev))
        out[i,:3] = np.array([l,p1,p2])
        
        # Fill in hidden variables
        out[i,3:] = getP(X,np.array([l,p1,p2]))
        
    return out

We run our first test on Y = {(HHH),(TTT),(HHH),(TTT)}. Note that the
first three columns of the output array give us our estimate for $\theta$
at each step, while the rest of the columns give us the posterior
probabilities (e.g. how sure we are of which coin generated each flip).

In [6]:
X = np.array([[1,0,1,0],[1,0,1,0],[1,0,1,0]],dtype=np.float)
results = EM(X,np.array([0.3,0.3,0.6]))
np.set_printoptions(precision=4,suppress=True)
print results

[[ 0.3     0.3     0.6     0.0508  0.6967  0.0508  0.6967]
 [ 0.3738  0.068   0.7578  0.0004  0.9714  0.0004  0.9714]
 [ 0.4859  0.0004  0.9722  0.      1.      0.      1.    ]
 [ 0.5     0.      1.      0.      1.      0.      1.    ]
 [ 0.5     0.      1.      0.      1.      0.      1.    ]]


In [None]:
As we can see, this corresponds to a coin which always shows heads and
a coin which always shows tails, which appear with equal probability.
As a result, we are certain that the tail-biased coin 1 generated the 
second and fourth draws, while the head-biased coin 2 generated the others. 
We now run the test on Y = {(HHT),(TTT),(HHH),(TTT)}.

In [5]:
X = np.array([[1,0,1,0],[1,0,1,0],[0,0,1,0]],dtype=np.float)
results = EM(X,np.array([0.3,0.3,0.6]))
np.set_printoptions(precision=4,suppress=True)
print results

[[ 0.3     0.3     0.6     0.1579  0.6967  0.0508  0.6967]
 [ 0.4005  0.0974  0.63    0.0375  0.9065  0.0025  0.9065]
 [ 0.4632  0.0148  0.7635  0.0014  0.9842  0.      0.9842]
 [ 0.4924  0.0005  0.8205  0.      0.9941  0.      0.9941]
 [ 0.497   0.      0.8284  0.      0.9949  0.      0.9949]]


We see that we have a tails-only coin and a coin which is heavily 
biased towards heads. We are no longer completely sure that the 
second and fourth draws are generated by the tails-only coin (since
it is possible, though unlikely, for the heads-biased coin to generate
a tails-only draw), but it is highly unlikely.

In [None]:
import numpy as np
from scipy import linalg as la
from matplotlib import pyplot as plt
%matplotlib inline

class KalmanFilter(object):
    def __init__(self,F,Q,H,R,u):
        """
        Initialize the dynamical system models.
        Parameters
        ----------
        F : ndarray of shape (n,n)
            The state transition model.
        Q : ndarray of shape (n,n)
            The covariance matrix for the state noise.
        H : ndarray of shape (m,n)
            The observation model.
        R : ndarray of shape (m,m)
            The covariance matric for observation noise.
        u : ndarray of shape (n,)
            The control vector.
        """
        self.F = F
        self.Q = Q
        self.H = H
        self.R = R
        self.u = u
        
    def evolve(self,x0,N):
        """
        Compute the first N states and observations generated by the Kalman ←-
        system.
        Parameters
        ----------
        x0 : ndarray of shape (n,)
            The initial state.
        N : integer
            The number of time steps to evolve.
            
        Returns
        -------
        states : ndarray of shape (n,N)
        States 0 through N-1, given by each column.
        obs : ndarray of shape (m,N)
        Observations 0 through N-1, given by each column.
        """
        cholQ = la.cholesky(self.Q).T
        cholR = la.cholesky(self.R).T

        # Generate data and noisy measurements
        X = np.zeros((N,len(x0)))
        X[0,:] = x0
        Y = np.zeros((N,2))
        for i in xrange(1,N):
            wk = np.dot(cholQ,np.random.randn(len(x0)))
            X[i,:] = np.dot(F,X[i-1,:]) + u + wk
            Y[i,:] = np.dot(H,X[i,:].T) + np.dot(cholR,np.random.randn(2))
        return X.T, Y.T
    
    def estimate(self,x,P,z):
        """
        Compute the state estimates using the Kalman filter.
        If x and P correspond to time step k, then z is a sequence of
        observations starting at time step k+1.
        
        Parameters
        ----------
        x : ndarray of shape (n,)
            The initial state estimate.
        P : ndarray of shape (n,n)
            The initial error covariance matrix.
        z : ndarray of shape(m,N)
            Sequence of N observations (each column is an observation).
        
        Returns
        -------
        out : ndarray of shape (n,N)
            Sequence of state estimates (each column is an estimate).
        """
        # Independent of  k: u, F, Q, R, H (self.whatever)
        # Non-returning updatables: S, K
        # Returning updatables: x, P
        n = len(x)
        m, N = z.shape
        out = np.zeros((n,N))
        
        for k in xrange(N):
            xhat = np.dot(self.F,x) + u
            Phat = np.dot(self.F,np.dot(P,self.F.T)) + self.Q
            y = z[:,k] - np.dot(self.H,xhat)
            S = np.dot(self.H,np.dot(Phat,self.H.T)) + self.R
            K = np.dot(Phat,np.dot(H.T,la.inv(S)))
            x = xhat + np.dot(K,y)
            P = np.dot(np.eye(K.shape[0]) - np.dot(K,self.H),Phat)
            out[:,k] = x
        
        return out
    
    def predict(self,x,k):
        """
        Predict the next k states in the absence of observations.
        
        Parameters
        ----------
        x : ndarray of shape (n,)
            The current state estimate.
        k : integer
            The number of states to predict.
        
        Returns
        -------
        out : ndarray of shape (n,k)
        The next k predicted states.
        """
        n = len(x)
        out = np.zeros((n,k))
        for i in xrange(k):
            x = np.dot(self.F,x) + self.u
            out[:,i] = x
        return out
    
# Initialization steps
b = 1e-4
g = 9.8
dt = 0.1

F = np.diag(np.array([1,1,1-b,1-b]))+np.diag(dt*np.ones(2),2)
Q = 0.1*np.eye(4)
H = np.array([[1,0,0,0],[0,1,0,0.]])
R = 500.0*np.eye(2)
u = np.array([0,0,0,-g*dt])

myfilter = KalmanFilter(F,Q,H,R,u)
X, Y = myfilter.evolve(np.array([0,0,300,600]),1250)

# Get initial state guess
x = np.zeros(4)
x[:2] = Y[:,400]
velocities = Y[:,400:410]
x[2:] = np.mean(velocities,axis=1)
filtered = myfilter.estimate(x,1e6*Q,Y)
predicted = myfilter.predict(filtered[:,600],650)

print "Problem 1:"
plt.scatter(X[0,:],X[1,:],s=0.1)
plt.show()

print "Problem 2:"
plt.scatter(X[0,400:600],X[1,400:600],s=1,marker='.')
plt.scatter(Y[0,400:600],Y[1,400:600],s=2.5,c='r')
plt.show()

print "Problem 3:"
plt.plot(X[0,400:600],X[1,400:600])
plt.scatter(Y[0,400:600],Y[1,400:600],s=2.5,c='r')
plt.plot(filtered[0,400:600],filtered[1,400:600],c='g')
plt.show()

true_y = np.where(np.diff(np.sign(X[1,:]))==-2)[0]
predicted_y = np.where(np.diff(np.sign(predicted[1,:]))==-2)[0]
diff = np.abs(X[0,true_y] - predicted[0,predicted_y])

print "Problem 4:"
plt.plot(X[0,600:],X[1,600:])
plt.plot(predicted[0,:],predicted[1,:],c='g')
plt.show()

print "The prediction and the actual projectile hit the ground {} meters apart.".format(diff[0])

In [None]:
import numpy as np
from scipy import linalg as la

# Code up a simple RLS problem for 4-5 pts. Compare w/ full OLS soln

# A is a 5x5 matrix with rank 5

# K_k = K_k-1 - K_k-1*A_k^T*(R_k + A_k*K_k-1*A_k^T)^-1*A_k*K_k-1
# x_k = x_k-1 - K_k*A_k^T*R_k^-1*(A_k*x_k-1 - b_k)

def RLS(m,A,b,eps,stop):
    """m is len(b_i), x, etc.
    A is dimension km by n. b and eps are len(km).
    stop is an optional timestep to stop at."""
    
    n = A.shape[1]
    k = A.shape[0]/m
    
    # Get initial guess from solving w/ A1
    x = la.lstsq(A[:k,:],b[:k] - eps[:k])
    
    # Initialize R
    R = np.zeros(k)
    for i in xrange(k):
        R[i] = np.dot(eps[i*k:i*k+k],eps[i*k:i*k+k])
    
     # Initialize K 
    K = np.dot(np.inv(np.dot(A[:k,:],A[:k,:])),np.transpose(A[:k,:]))
    
    for i in xrange(1,np.min(k,stop)):
        # Update K
        d = R[i] + np.dot(A,np.dot(K,np.transpose(A)))
        Ak = A[i*k:i*k+k]
        K -= np.dot(np.dot(K,np.transpose(Ak)),np.dot(Ak,K))/d
        
        # Update x
        x -= np.dot(np.dot(K,np.transpose(Ak)),np.dot(Ak,x)-b[i*k:i*k+k])/R[i]
    
    return x, K

def OLS(m,A,b,eps):
    return np.lstsq(A,b-eps)