### Dynamic Programming Code for Optimal CP Detection

Implements Levy-Leduc Code to determine optimal number of changepoints across a candidate set 


In [829]:
import numpy as np
import matplotlib.pyplot as plt

In [830]:
def map_intervals(Y,indices):
    """ Maps the indices of the candidate changepoints to the intervals of the data
    """
    partitioned_indices=np.unique(np.concatenate([[0],indices,[len(Y)]]))
    
    intervals={}
    for i in range(len(partitioned_indices)-1):
        intervals[i]=([partitioned_indices[i],partitioned_indices[i+1]])
   
    return intervals


In [881]:
# TO:DO: Consider constraining continuity of the polynomial instead of mse 

def best_fit_polynomial(Y,interval,k=1):
    """ Fits a polynomial of order k to the data Y across a given interval
    """
   
    
    y=Y[interval[0]:interval[1]]

    x_range=np.arange(interval[0],interval[1],1)

    poly_coef=np.polyfit(x_range,y,k)
    polynomial=poly_coef.dot(np.vstack([x_range**i for i in range(k,-1,-1)]))
    
    # mean or sum of squared errors
    mse=np.mean((y-polynomial)**2)
    
    
    # for i in range(len(intervals)):
    #     x=intervals[i]
        
    #     y=Y[x[0]:x[1]]
    #     observed.append(y)

    #     x_range=np.arange(x[0],x[1],1)
        
    #     poly_coef=np.polyfit(x_range,y,k)
    #     poly_estimation=poly_coef.dot(np.vstack([x_range**i for i in range(k,-1,-1)]))
        
    #     polynomials.append(poly_estimation)

    # polynomials=np.concatenate(polynomials)
    # observed=np.concatenate(observed)


    
    return mse

def compute_J(Y,indices,k):
    """ Evaluate cost matrix between each nested pair of changepoints
    """
    cp_mappings=map_intervals(Y,indices)
    mapping_indices=np.arange(len(cp_mappings))
    cost_matrix=np.zeros((len(cp_mappings),len(cp_mappings)))
    


    for i in range(len(mapping_indices)):
        for j in range(i,len(mapping_indices)):

            ###
                # Look into best polynomial or range 
            ###
            #intervals=[cp_mappings[k] for k in range(i,j+1)]
          
            interval=[cp_mappings[i][0],cp_mappings[j][1]]
            cost_matrix[i,j]=best_fit_polynomial(Y,interval,k=k)
          

    return cost_matrix

def compute_V(cost_matrix,K_max,indices):
    """ Computes the optimal cost and changepoint locations
    """
   
    k=len(indices)+1
    
    optimal_cost=np.zeros((K_max+1,k))
    optimal_loc=np.zeros((K_max+1,k))
    
    
    # no changepoints; best RSE is the sum of squared errors
    for i in range(k):
        optimal_cost[0,i]=cost_matrix[0,i]

    #############
    # Interesting slicing as converted from matlab 

    #############
    for k_i in range(1,K_max+1):
        for j in range(k_i+1,k):
 
            optimal_cost[k_i,j]=np.min(optimal_cost[k_i-1,k_i-1:j-1]+cost_matrix[k_i:j,j])
            ind=np.argmin(optimal_cost[k_i-1,k_i-1:j-1]+cost_matrix[k_i:j,j])
            optimal_loc[k_i-1,j]=ind+k_i-1
    
    return optimal_loc

def optimal_segmentation(optimal_loc,indices,K_max):
    """ Computes the optimal segmentation based on recursive RSE
    """
    
    all_loc=optimal_loc.copy()
    
    k=len(indices)+1
    total_loc={}

    for k_i in range(1,K_max+1):
 
        total_loc[k_i-1]=np.zeros(k_i)
        total_loc[k_i-1][k_i-1]=all_loc[k_i-1,k-1]
        for i in range(k_i-1,0,-1):
            print(total_loc[k_i-1][i])
            total_loc[k_i-1][i-1]=all_loc[i-1,int(total_loc[k_i-1][i])]

    return total_loc
    

def convert_observed_cp(optimal_segment,indices):
    all_segments=optimal_segment.copy()
    
    for i in range(len(all_segments)):
        not_zero=np.where(all_segments[i]!=0)[0]
        all_segments[i]=indices[not_zero]

    return all_segments

def generalized_cross_validation(optimal_segment,optimal_cost):
    """ Determines optimal number of changepoints based on generalized cross validation """

    N=None
    M=None

    mse=None

    return None


def dp_solver(Y,indices,K_max,k=1):
    """ Determines optimal partition of Y into K_max piecewise order polynomials
    """

    if K_max>len(indices):
        print("K_max must be less than or equal to the number of candidate changepoints")
        K_max=len(indices)

    # Initialize cost matrix
    cost_matrix=compute_J(Y,indices,k)

    # Compute optimal cost and changepoint locations
    optimal_loc=compute_V(cost_matrix,K_max,indices)
  
    # Compute optimal segmentation
    optimal_segment=optimal_segmentation(optimal_loc,indices,K_max)

    optimal_indices=convert_observed_cp(optimal_segment,indices)


    return optimal_segment,optimal_indices


    
    
    
    
    

In [882]:
def test_dp():
    """ Prep dataset to test dynamic programming recursion
    """
    n=250
    k=5
    x=np.arange(0,n,1)
    y=np.empty(n)

    sample_variance=0.5
    
    n1,n2,n3,n4,n5=np.arange(0,n,n/k).astype(int)
    
    def generate_noise(samples,sample_variance):
        return np.random.normal(scale=sample_variance,size=n)
    
    y[:n2]=0.1*x[:n2]+2
    y[n2:n3]=-0.05*x[n2:n3]+9.5
    y[n3:n4]=0.05*x[n3:n4]-0.5

    y[n4:n5]=-0.05*x[n4:n5]+14.5
    y[n5:]=0.25*x[n5:]-45.5

    true_change_points=np.array([10,35,n2,75,n3,125,130,135,141,n4,167,n5])
    
    return x,y+generate_noise(n,sample_variance),true_change_points

In [883]:
x,y,true_changepoints=test_dp()
true_changepoints

array([ 10,  35,  50,  75, 100, 125, 130, 135, 141, 150, 167, 200])

In [884]:
index_mappings=map_intervals(y,true_changepoints)
index_mappings

{0: [0, 10],
 1: [10, 35],
 2: [35, 50],
 3: [50, 75],
 4: [75, 100],
 5: [100, 125],
 6: [125, 130],
 7: [130, 135],
 8: [135, 141],
 9: [141, 150],
 10: [150, 167],
 11: [167, 200],
 12: [200, 250]}

In [886]:
cost_matrix=compute_J(y,true_changepoints,k=1)
print(f"Mean Squared Error: {cost_matrix[0,-1]}")

Mean Squared Error: 4.4547250015918465


In [887]:
n=len(true_changepoints)

optimal_segment=dp_solver(y,true_changepoints,K_max=4,k=1)
optimal_segment

10.0
10.0
4.0
10.0
4.0
1.0


({0: array([10.]),
  1: array([ 2., 10.]),
  2: array([ 2.,  4., 10.]),
  3: array([ 0.,  1.,  4., 10.])},
 {0: array([10]),
  1: array([10, 35]),
  2: array([10, 35, 50]),
  3: array([35, 50, 75])})