<a href="https://colab.research.google.com/github/kgdunn/pid-book/blob/master/Model_inversion_demonstration.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
!pip install -U numpy pandas scipy matplotlib sklearn ipysheet ipywidgets plotly notebook 
# The above line only needs to be run the first time. After that you will have all the packages necessary.
import numpy as np
import pandas as pd
import scipy as sp
from pandas.plotting import scatter_matrix
import matplotlib.pyplot as plt
from sklearn.decomposition import PCA
from sklearn.base import BaseEstimator, TransformerMixin
from sklearn.utils.validation import check_is_fitted

#from ipywidgets import interact, interactive, fixed, interact_manual, Output, VBox, Box, Layout, FloatSlider,Checkbox, FloatText
from ipysheet import sheet, column, to_dataframe, row
import ipywidgets as widgets
import plotly.graph_objects as go
from plotly.subplots import make_subplots

## Strategy

1. Load the data from CSV
2. Preprocess the data
3. Fit the PCA model (2 components for this demonstration)
4. Show the scores and loadings
5. Show the model inversion scores sliders
6. Show the values of the x-variables that could be used

In [None]:
# 1. Loading the data file
full_path = ''
foods = pd.read_csv("https://openmv.net/file/food-texture.csv").drop(['Unnamed: 0',], axis=1)
display(foods.head())

In [None]:
# Create our own mean centering and scaling to unit variance (MCUV) class
# The default scaler in sklearn does not handle small datasets accurately, with ddof.
class MCUVScaler(BaseEstimator, TransformerMixin):
    def __init__(self):
        pass
        
    def fit(self, X, y = None):
        self.center_x_ = X.mean()
        self.scale_x_ = X.std(ddof=1) # this is the key difference with "preprocessing.StandardScaler"
        self.scale_x_[self.scale_x_ == 0] = 1.0 # columns with no variance are left as-is.
        return self
 
    def transform(self, X):
        check_is_fitted(self, 'center_x_')
        check_is_fitted(self, 'scale_x_')
 
        X = X.copy()
        return (X - self.center_x_) / self.scale_x_
    
    def inverse_transform(self, X):
        check_is_fitted(self, 'center_x_')
        check_is_fitted(self, 'scale_x_')
 
        X = X.copy()
        return X * self.scale_x_ + self.center_x_    

In [None]:
# 2. Preprocess the data
scaler = MCUVScaler().fit(foods)
foods_mcuv = scaler.fit_transform(foods)

In [None]:
# 3. Fit the PCA model, with "A" principal components
A = 2
pca = PCA(n_components=A).fit(foods_mcuv)
N, K = foods_mcuv.shape
pca.T2_limit_95 = A*(N-1)*(N+1)/(N*(N-A)) * sp.stats.f.isf((1-0.95), A, N-A)
pca.scaling_factor_for_scores = np.sqrt(pca.explained_variance_)
pca.t_scores = pca.fit_transform(foods_mcuv)

# Find the SPE limit
X_hat = pca.t_scores @ pca.components_
X_check_mcuv = foods_mcuv
error_X = X_check_mcuv - X_hat      
SPE = np.sum(error_X ** 2, axis=1)
# Find the SPE limit analytically still
# SPE_limit = ___
# plt.plot(SPE)
# plt.plot([0, len(SPE)], [SPE_limit, SPE_limit], 'r')
# plt.grid()

# Hotelling's T2
HotT2 = np.sum((pca.t_scores/pca.scaling_factor_for_scores)**2, axis=1)
plt.plot(HotT2)
plt.plot([0, len(HotT2)], [pca.T2_limit_95, pca.T2_limit_95], 'r')
plt.grid()
plt.title("Hotelling's $T^2$ plot");

In [None]:
# 4. Plot the model
def ellipse_coordinates(s_h, s_v, T2_limit_alpha, n_points=100):
#     We want the (t_horiz, t_vert) coaoridinate pairs that form the T2 ellipse.
# 
#     Inputs: s_h (std deviation of the score on the horizontal axis)
#             s_v (std deviation of the score on the vertical axis)
#             T2_limit_alpha: the T2_limit at the alpha confidence value
# 
#     Equation of ellipse in canonical form (http://en.wikipedia.org/wiki/Ellipse)
# 
#      (t_horiz/s_h)^2 + (t_vert/s_v)^2  =  T2_limit_alpha
# 
#      s_horiz = stddev(T_horiz)
#      s_vert  = stddev(T_vert)
#      T2_limit_alpha = T2 confidence limit at a given alpha value (e.g. 99#)
# 
#     Equation of ellipse, parametric form (http://en.wikipedia.org/wiki/Ellipse)
# 
#     t_horiz = sqrt(T2_limit_alpha)*s_h*cos(t) 
#     t_vert  = sqrt(T2_limit_alpha)*s_v*sin(t) 
# 
#     where t ranges between 0 and 2*pi
# 
#     Returns `n-points` equi-spaced points on the ellipse.
# Kevin Dunn, 2010

    h_const = np.sqrt(T2_limit_alpha) * s_h
    v_const = np.sqrt(T2_limit_alpha) * s_v
    dt = 2*np.pi/(n_points-1)
    steps = np.linspace(0, n_points-1, n_points)
    x = np.cos(steps*dt)*h_const
    y = np.sin(steps*dt)*v_const
    return x, y

def score_plot(ax=None, hide_annotations=False): # score plot of t1 vs t2
    if ax is None:
        ax = plt.subplot(1, 2, 1)
    else:
        plt.sca(ax)
    plt.plot(pca.t_scores[:,1], pca.t_scores[:, 0], 'k.', )
    if not(hide_annotations):
        plt.title('Score plot: $t_2$ vs $t_1$')
        plt.xlabel('$t_2$ scores')
        plt.ylabel('$t_1$ scores')
    T2_limit_alpha = 2 # TODO: determine from MV software
    ci_x, ci_y = ellipse_coordinates(pca.scaling_factor_for_scores[1], 
                                     pca.scaling_factor_for_scores[0], 
                                     T2_limit_alpha=pca.T2_limit_95, 
                                     n_points=100)
    plt.plot(ci_x, ci_y, '-', color=np.array([219,112,147])/255.0, linewidth=2)
    
    if not(hide_annotations):
        plt.axvline(linewidth=2)
        plt.axhline(linewidth=2)
        ax.set_aspect('equal')
        ax.grid()
    
    return ax

def loadings_plot():# loadings plot of p1 vs p2
    ax = plt.subplot(1, 2, 2)
    for idx, label in enumerate(list(foods)):
        fuzz = 0.02
        plt.plot(pca.components_.T[idx, 1], pca.components_.T[idx, 0], 'k.', )
        plt.text(pca.components_.T[idx, 1]+fuzz, pca.components_.T[idx, 0]-fuzz, label)

    plt.title('Loadings plot: $p_2$ vs $p_1$')
    plt.xlabel('$p_2$ loadings')
    plt.ylabel('$p_1$ loadings')
    plt.axvline(linewidth=2)
    plt.axhline(linewidth=2)
    ax.set_aspect('equal')
    ax.grid()
    
    return ax
    
fig = plt.figure(figsize=(15, 10))
plt.subplots_adjust(top=0.9, bottom=0.1, left=0.0, right=0.9, hspace=0.5, wspace=0.2)
score_plot();
loadings_plot();

In [None]:
# PCA model inversion
def invert_model(t1, t2):
    data = pd.concat([foods, 
                      pd.DataFrame(pca.t_scores[:,0], columns=['t1', ]),
                      pd.DataFrame(pca.t_scores[:,1], columns=['t2', ])], axis=1)
    spm = scatter_matrix(data, figsize=(10, 10));
    x_pred_mcuv = pca.inverse_transform([t1, t2])
    x_pred_orig = scaler.inverse_transform(x_pred_mcuv)
    x_pred_plot = x_pred_orig.tolist()
    print(x_pred_orig)
    x_pred_plot.extend((t1, t2))
    ax_t1 = 5
    ax_t2 = 6
    for i in range(0, 7):
        for j in range(0, 7):
            if i < j and not((ax_t1==i) and (ax_t2==j)):
                plt.sca(spm[i][j])
                plt.plot(x_pred_plot[j], x_pred_plot[i], 'r.', markersize=10)
                
            if i==j:
                plt.sca(spm[i][j])
                plt.axvline(x=x_pred_plot[j],color='r', linewidth=2)
    
    plt.sca(spm[ax_t1, ax_t2])
    score_plot(spm[ax_t1, ax_t2], hide_annotations=True)
    plt.plot(x_pred_plot[ax_t2], x_pred_plot[ax_t1], 'r.', markersize=10)
   

t1 = widgets.FloatSlider(min=-4, max=+4, step=0.2, value=0, continuous_update=False, 
                         orientation='vertical', readout_format='.1f', description='\(t_1\) value')
t2 = widgets.FloatSlider(min=-3, max=+3, step=0.1, value=0, continuous_update=False, 
                         readout_format='.1f', description='\(t_2\) value')

ui = widgets.HBox([t1, t2])
out = widgets.interactive_output(invert_model, {'t1': t1, 't2': t2,});
display(ui, out);

In [None]:
# Set up plots etc
def score_plot_points(a_horiz: int, a_vert: int, T2_limit_alpha: float): # score plot of t1 vs t2
    
    points = go.Scatter(x=pca.t_scores[:,a_horiz], 
                        y=pca.t_scores[:,a_vert], 
                        mode='markers', 
                        marker_color='rgba(0, 0, 0, 1)',
                        name="Scores",
                        hoverinfo='none',
                        showlegend=False)

    ci_x, ci_y = ellipse_coordinates(np.sqrt(pca.explained_variance_[a_horiz]), 
                                     np.sqrt(pca.explained_variance_[a_vert]), 
                                     T2_limit_alpha=T2_limit_alpha, 
                                     n_points=100)
    ellipse = go.Scatter(x=ci_x, 
                         y=ci_y, 
                         mode='lines', 
                         marker_color='rgba(219,112,147, 1)',
                         name='',
                         hoverinfo='none',
                         showlegend=False)
    
    x_range = ([max(max(abs(points['x'])), max(abs(ellipse['x'])))] * 2  * np.array([-1.05, 1.05])).tolist()
    y_range = ([max(max(abs(points['y'])), max(abs(ellipse['y'])))] * 2  * np.array([-1.05, 1.05])).tolist()
    return points, ellipse, x_range, y_range  

a_horiz=0
a_vert=1
  
if True: # plot set up, etc
    points, ellipse,x_range, y_range = score_plot_points(a_horiz, a_vert, T2_limit_alpha=pca.T2_limit_95)

    resolution = 5
    x_map = np.linspace(x_range[0], x_range[1], resolution+1)
    y_map = np.linspace(y_range[0], y_range[1], resolution+1)
    z = np.zeros((resolution+1, resolution+1))+1
    clickmap = go.Heatmap(x=x_map,y=y_map,z=z, 
                          showscale= False,                                 
                          hoverinfo='none',
                          colorscale=[[0, "rgba(255, 255, 255, 0)"], [1, "rgba(255, 255, 255, 0 )"]] 
                                   )

    scores_layout=dict(width=600,
                       height=600,
                       title_text=f'$t_{a_horiz+1} \\text{{vs}}\\, t_{a_vert+1}$',
                       hovermode="closest",                    
                       autosize=True,
                       margin= dict(l=10, r=10, b=5, t=80), # Defaults: l=80, r=80, t=100, b=80
                       spikedistance=0,       
                       xaxis=dict(
                           title=dict(
                               text=f'$t_{a_horiz+1}$ scores',
                               font=dict(size=16),
                           ),
                           mirror=True, # ticks are mirrored at the top of the frame also
                           autorange=False,
                           range=x_range,
                           showspikes=True,
                           visible=True,
                       ),
                       yaxis=dict(
                           title=dict(
                               text=f'$t_{a_vert+1}$ scores',
                               font=dict(size=16),
                           ),
                           type="linear",
                           autorange=False,
                           range=y_range,
                           showspikes=True,
                           visible=True,
                           domain=[0, 1],
                       ),
                      )

    f = go.FigureWidget([clickmap, points, ellipse], layout=scores_layout)
    clickground = f.data[0]
    f.layout.hovermode = 'x'    

    box_layout = widgets.Layout(display='inline-flex', flex_flow='row', align_items='stretch', width='90%')
    box_auto = widgets.Box(children=[f,], layout=box_layout)
    display(widgets.VBox([box_auto, ]) );

    sliders = [widgets.FloatSlider(min=-4, max=+4, step=0.2, value=0,readout_format='.1f') for _ in range(K)]
    true_value = [widgets.FloatText( value=np.nan, readout_format='.1f') for _ in range(K)]
    checkboxes = [widgets.Checkbox(value=False, disabled=False, indent=False) for _ in range(K)]
    properties = list(foods.columns)
    sheet1=sheet(columns=4, 
                 column_headers=["X variable", "Normalized value", "Real value", "Constrained?"], 
                 column_width=[10, 40, 40, 10])
    column0=column(0, properties)
    column1=column(1, sliders)
    column2=column(2, true_value)
    column2=column(3, checkboxes)
    display(sheet1)

    sheet2=sheet(rows=1,
                 columns=2, 
                 column_headers=["Squared prediction error", "Hotelling's T^2"],  
                 column_width=[40, 40])
    SPE_handle = widgets.FloatText(value=np.nan, readout_format='.1f')
    T2_handle = widgets.FloatText(value=np.nan, readout_format='.1f')
    s2_row0=row(0, [SPE_handle, T2_handle])
    display(sheet2)

def update_point(trace, points, selector):
    t1 = points.xs[0]
    t2 = points.ys[0]
    scores = np.array([t1,t2]) # 1 x A vector
    
    # Free and constrained variables
    loadings = pca.components_.T # K x A matrix
    idxF, idxC = [not(c.value) for c in checkboxes], [c.value for c in checkboxes]
    Rf, Rc = loadings[idxF, :], loadings[idxC, :]
    xc = np.array([s.value for s in sliders])[idxC].reshape((-1,1))
    
    # Subtract out (i.e. eliminate from the optimization) the the constrained parts of the multivariate projection
    x_pred_mcuv = ((scores - xc.T @ Rc) @ loadings.T @ loadings @ loadings.T).ravel()
    x_pred_mcuv[idxC] = xc.ravel()
    x_pred_orig = scaler.inverse_transform(x_pred_mcuv.ravel())
    for idx, slider in enumerate(sliders):
        slider.value = x_pred_mcuv[idx]
        true_value[idx].value = f"{x_pred_orig[idx]:.3g}"
  
    # Find the SPE and T2 value for this point:
    X_hat = scores @ loadings.T
    error_X = x_pred_mcuv - X_hat.ravel()   
    
    SPE_handle.value = 0.0 #f"{np.sum(error_X ** 2):.3g}" <-- we are always on the model plane.
    T2_handle.value = f"{np.sum((scores / pca.scaling_factor_for_scores)**2):.3g}"
           
clickground.on_hover(update_point)
widgets.VBox([out]);

# TODO:
# * show loadings points superimposed

**NOTE**

The last cell above will NOT work in Google's Colab. 
You can download the notebook (click on "File", then "Download .ipynb")
and run that notebook on your own hardware. Then you will have a fully interactive score plot 
where the model-inversion happens in real time.