# Crush Rig Predictive Models
* __Classifier for Trauma Score__
* __Regressor for serosal thickness delta__

Written by Matt MacDonald for CIGITI at the Hospital for Sick Children Toronto
***

All tools to manipulate data will be obtained from the crush_plot.py file. The objective of this notebook is to predict the histological targets from the force/position crush data using a classifier, either logistic regression or otherwise.

In [None]:
%load_ext autoreload
%autoreload 2
%matplotlib inline

In [None]:
from pdb import set_trace
from warnings import warn

The crush data must be collected using the crush rig and crush.py and stored in the expected folder structure at the root directory indicated by PATH.

In [None]:
from crush_read import *
from crush_plot import *
PATH

In [None]:
plt.style.use('ggplot')
plt.rcParams['figure.dpi'] = 150

Load all data and modify as needed.

In [None]:
study = study_outline(PATH)
targets = study_targets(PATH)
crushes = study_data(study)
crushes = modify(crushes)
crushes = calculate(crushes)

Prepare data for classification.

In [None]:
X, y, legend = preprocess(crushes, targets)
y = refine(y)
print('Reference for categorical features:')
legend

In [None]:
X.shape

In [None]:
for col in y.columns:
    most_common = y[col].value_counts().idxmax()
    s = (y[col] == most_common).sum()
    c = y[col].count()
    r = s / c
    print(f"{col}\nBaseline Accuracy = {s}/{c} ({r:.2%})")

In [None]:
y.describe()

The major tissue damage target is unbalanced. It may not be enough data for an accurate classifier due to the skewed distribution of positive samples.

Generate matrix of correlations to aid understanding.

In [None]:
plt.figure()
damage_score = y['Tissue Damage'].copy()
damage_score[y['Major Tissue Damage']] = 2
serosal_delta = (X['Post Serosal Thickness (mm)'] - X['Serosal Thickness (mm)']) / X['Serosal Thickness (mm)']
serosal_delta = -serosal_delta
plt.scatter(serosal_delta, damage_score, color='indigo')
plt.ylabel('Tissue Damage Score')
plt.xlabel('Serosal Thickness Change Percentage')

In [None]:
plt.figure()
s = 0.25
m = y.shape[0]
y1 = y['Tissue Damage']
y2 = y['Significant Serosal Change']
rx = np.random.rand(m) * s - (s / 2)
ry = np.random.rand(m) * s - (s / 2)
plt.scatter(x=y1 + rx, y=y2 + ry, color='indigo')
plt.xlabel('Tissue Damage')
plt.ylabel('Significant Serosal Change')
plt.xticks([0, 1])
plt.yticks([0, 1])
plt.xlim([-0.5, 1.5])
plt.ylim([-0.5, 1.5])

cnts = [sum([x != y for x, y in zip(y1, y2) if x == 0]),
        sum([x == y for x, y in zip(y1, y2) if x == 1]),
        sum([x == y for x, y in zip(y1, y2) if x == 0]),
        sum([x != y for x, y in zip(y1, y2) if x == 1])]

plt.text(-0.1, 1.25, f"n = {cnts[0]}", size=10)
plt.text(0.9, 1.25, f"n = {cnts[1]}", size=10)
plt.text(-0.1, 0.25, f"n = {cnts[2]}", size=10)
plt.text(0.9, 0.25, f"n = {cnts[3]}", size=10)

print('Top left N = {} / {}'.format(cnts[0], m))
print('Top right N = {} / {}'.format(cnts[1], m))
print('Bottom left N = {} / {}'.format(cnts[2], m))
print('Bottom right N = {} / {}'.format(cnts[3], m))
print('Agreement = {} / {}'.format(sum(cnts[1:3]), m))

In [None]:
%matplotlib inline
W = pd.concat([X, y], axis=1)
W_corr = W.corr(method='spearman')
sns.heatmap(W_corr, cmap='RdBu')

In [None]:
%matplotlib notebook
X.describe()

Visualize the key variable which is target stress. Below is the corresponding load in grams for reference.

In [None]:
for load in [200, 400, 600, 800, 1000, 1200]:  # test loads in grams
    pressure = (9.81 * load) / (np.pi * (5/2)** 2)
    print(f"{pressure:6.0f} (kPa) = {load:5} (grams)")

In [None]:
x_name = 'Target Stress (MPa)'
for y_name in y.columns:
    plt.figure()
    plt.scatter(x=X[x_name], y=y[y_name])
    plt.xlabel(x_name)
    plt.ylabel(y_name)

Remove any histology related features to focus on real time predictors. Also remove the holding strain since only the STOP protocol is being considered.

In [None]:
X_full = X.copy()
X.columns

In [None]:
X = X.drop('Pathologist (Cathy or Corwyn)', axis=1)
X = X.drop('Serosal Thickness (mm)', axis=1)
X = X.drop('Post Serosal Thickness (mm)', axis=1)
X = X.drop('Holding Strain', axis=1)
X.columns

The goal for the prediction algorithm is to provide a metric for preventing tissue damage intraoperatively. Thus it has the following requirements:

1. Good overall accuracy so it is reliable without being restrictive
2. High recall such that it is conservative, limiting the occurrence of false negatives
3. Simple with limited input so that it can be implemented cheaply in real time

Further to requirement 3 above, no histology features can be used to make the prediction.

In [None]:
# Show correlations for the reduced feature set
%matplotlib inline
X_corr = X.corr(method='spearman')
sns.heatmap(X_corr, cmap='RdBu', vmin=-1, vmax=1)

In [None]:
%matplotlib notebook

In [None]:
def get_freq(crush):
    time = crush.index
    delta = time[1:] - time[:-1]
    return 1 / np.mean(delta.total_seconds())

freqs = crushes['Data'].apply(get_freq)

Sample frequency is 31 Hz

Nyquist frequency is 62 Hz

Cutoff frequency of 3rd order butterworth digital filter is 0.2 * 62 = 12.4 Hz

# Logistic Regression

Build logistic regression models as a relatively simple first step.

In [None]:
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler
from sklearn.linear_model import LogisticRegression, LogisticRegressionCV

from sklearn.metrics import confusion_matrix
from sklearn.metrics import roc_auc_score
from sklearn.metrics import roc_curve
from sklearn.metrics import classification_report

from sklearn.feature_selection import RFE

In [None]:
def logreg_split(X, y, seed=0, size=0.2):
    # Convert from pandas to numpy
    X_np = X.values.astype(np.float64)
    y_np = y.values
    
    # Split into train and test sets
    X_train, X_test, y_train, y_test = train_test_split(X_np, y_np, test_size=size, random_state=seed)
    
    return (X_train, y_train), (X_test, y_test)

In [None]:
def logreg_model(dataset, seed=0, scaled=False):
    
    # Scale input features
    if scaled:
        scaler = StandardScaler()
        X = scaler.fit_transform(dataset[0])
    else:
        X = dataset[0]
    y = dataset[1]
    
    # Fit logistic regression to training set
    model = LogisticRegressionCV(penalty='l2',
                                 solver='lbfgs',
                                 cv=5,  # 5 or 10 recommended
                                 refit=True,
                                 random_state=seed,
                                 max_iter=1000,
                                 n_jobs=4)
    model.fit(X, y)
    
    if scaled:
        return model, scaler
    else:
        return model

In [None]:
def logreg_features(dataset, n_features, seed=0):
    model, scaler = logreg_model(dataset, seed=seed, scaled=True)  # use scaled features to avoid bias in selection
    rfe = RFE(model, n_features)  # pick the best n features
    rfe = rfe.fit(*dataset) 
    return rfe, model, scaler

In [None]:
def logreg_display(model, dataset, scaler=None):
    if scaler is not None:
        scaled_dataset = (scaler.transform(dataset[0]), dataset[1])
    else:
        scaled_dataset = dataset
    logreg_auc(model, scaled_dataset, disp=True)

In [None]:
def logreg_predict(model, dataset, disp=False):
    # Predict and make a confusion matrix, optionally display results
    X = dataset[0]
    y = dataset[1]
    y_pred = model.predict(X)
    y_prob = model.predict_proba(X)
    cm = confusion_matrix(y, y_pred)
    metrics = logreg_metrics(cm, disp=disp)
    return {'predictions': y_pred,
            'probabilities': y_prob,
            'metrics': metrics}

In [None]:
def logreg_metrics(cm, disp=False):
    tp = cm[1][1]
    tn = cm[0][0]
    fp = cm[0][1]
    fn = cm[1][0]
    accuracy = (tp + tn) / cm.sum()
    precision = tp / (tp + fp)
    recall = tp / (tp + fn)
    f1score = 2 * precision * recall / (precision + recall)
    
    if disp:
        print(f'F1 Score = {f1score:.2f}')
        print(f'Accuracy = {accuracy:.2f}')
        print(f'Precision = {precision:.2f}')
        print(f'Recall = {recall:.2f}')
    
    return np.round([f1score, accuracy, precision, recall], 3)

In [None]:
def logreg_auc(model, dataset, disp=True):
    X = dataset[0]
    y = dataset[1]
    y_pred = logreg_predict(model, dataset, disp=disp)['predictions']
    logit_roc_auc = roc_auc_score(y, y_pred)
    
    if disp:
        print(classification_report(y, y_pred))
        vals = roc_curve(y, model.predict_proba(X)[:,1])
        plt.figure()
        plt.plot(*vals[:2], label=f"ROC curve (A = {logit_roc_auc:.2f})")
        plt.plot([0, 1], [0, 1], 'k--')
        plt.xlim([-0.05, 1.0])
        plt.ylim([0.0, 1.05])
        plt.xlabel('False Positive Rate')
        plt.ylabel('True Positive Rate')
        plt.legend(loc='lower right')
        plt.show()
        
    return logit_roc_auc

In [None]:
def logreg_decision_bndr(model, dataset, n1, features=None, labels=['False', 'True']):
    # Plot a series of 2D decision boundaries freezing the other features to mean values
    # dataset=(X, y) ideally of the test set and n1 is the primary feature being compared (default stress)
    
    X_set, y_set = dataset[0], dataset[1]
    X_avg = np.mean(X_set, axis=0).reshape(1, -1)
    n = X_set.shape[1]
    m = y_set.size
    
    if features is None:
        features = []
        for num in range(n):
            features.append(f"Feature {n}")
    
    def model_grid(grid1, grid2, n1, n2):
        # Predict for two 2D grids of features
        s = grid1.shape
        Z = np.zeros(s)
        for i in range(s[0]):
            for j in range(s[1]):
                X = X_avg
                X[0, n1] = grid1[i, j]
                X[0, n2] = grid2[i, j]
                Z[i, j] = model.predict(X)
        return Z

    # Visualize the decision boundary
    from matplotlib.colors import ListedColormap
    colors = ('blue', 'red')
    alt_colors = ('cornflowerblue', 'lightcoral')
    
    for n2 in range(n):
        if (n1 == n2) and (n > 1):
            continue
        maxx = X_set[:, n1].max()
        maxy = X_set[:, n2].max()
        minx = X_set[:, n1].min()
        miny = X_set[:, n2].min()
        rx = maxx - minx
        ry = maxy - miny
        f = 0.2
        X1, X2 = np.meshgrid(np.linspace(minx - f*rx, maxx + f*rx, 100),
                             np.linspace(miny - f*ry, maxy + f*ry, 100))
        Z = model_grid(X1, X2, n1, n2)

        plt.figure()
        plt.contourf(X1, X2, Z, alpha=0.50, cmap=ListedColormap(colors))
        plt.xlim(X1.min(), X1.max())
        plt.ylim(X2.min(), X2.max())
        for res in np.unique(y_set):
            color = colors[0] if (res == False) else colors[1]
            plt.scatter(X_set[y_set == res, n1], X_set[y_set == res, n2],
                        c=color, label=labels[int(res)])
        plt.title(f'Classification (N={m})')
        plt.xlabel(features[n1])
        plt.ylabel(features[n2])
        plt.legend()

In [None]:
def logreg_scale(dataset, scaler):
    return (scaler.transform(dataset[0]), dataset[1])

In [None]:
def logreg_build(X, y, n_features=None, seed=0):
    '''
    Convenient function  to build multiple logistic regression models
    '''
    
    # Select a specific indicator from the targets and split the dataset
    dataset_train, dataset_test = logreg_split(X, y, seed=seed)
    
    # Remove any features deemed to be irrelevent by recursive feature elimination
    if n_features is None:
        n_features = X.shape[1]
    rfe, model, scaler = logreg_features(dataset_train, n_features, seed=seed)
    
    # Rank the features
    rank = pd.DataFrame({'feature': X.columns.values,
                         'support': rfe.support_,
                         'ranking': rfe.ranking_})
    rank = rank.sort_values(by='ranking')
    features = rank.feature[:n_features].tolist()
    
    # Train the model once more on just the selected features
    dataset_train, dataset_test = logreg_split(X[features], y, seed=seed)
    model = logreg_model(dataset_train, seed=seed)  # no scaling
    
    # Check performance on the training set
    pred_train = logreg_predict(model, dataset_train)
    
    # Check performance on the test set for comparison
    pred_test = logreg_predict(model, dataset_test)
    
    return {'model': model,
            'rank': rank,
            'features': features,
            'n_features': n_features,
            'train_metrics': pred_train['metrics'],
            'test_metrics': pred_test['metrics']}

Define the targets and the random seed for the model training.

In [None]:
SEED = 0
indicators = ['Significant Serosal Change',
              'Tissue Damage',
              'Major Tissue Damage']
indicator_labels = {'Significant Serosal Change': ['No Change', 'Significant Change'],
                    'Tissue Damage': ['No Damage', 'Damage'],
                    'Major Tissue Damage': ['No Damage or Minor Damage', 'Major Damage']}
y.columns

### Serosal Thickness Change

In [None]:
ind = indicators[0]
ind

In [None]:
df = pd.DataFrame()
for n in range(X.shape[1]):
    res = logreg_build(X, y[ind], n_features=(n + 1), seed=SEED)
    df = df.append(res, ignore_index=True)
df

In [None]:
n_features = 1
idx = (df.n_features == n_features).idxmax()
model = df.model[idx]
features = df.features[idx]
dataset_train, dataset_test = logreg_split(X[features], y[ind], seed=SEED)

In [None]:
# Check training set performance
logreg_display(model, dataset_train)

In [None]:
# Check test set performance
logreg_display(model, dataset_test)

Review the model parameters.

In [None]:
print('Model coefficients:')
print(model.coef_)
print('Model features:')
print(features)
print('Model intercept:')
print(model.intercept_)
print('Model regularization:')
C = model.C_
print(f"C = {C}, lambda = {1 / C}")

Run an example to confirm the model equation.

In [None]:
y_prob = model.predict_proba(dataset_test[0])
y_prob

In [None]:
example = dataset_test[0][1, :]
example

In [None]:
logit = example @ model.coef_.T + model.intercept_
odds = np.exp(logit)
prob = odds / (1 + odds)
print(logit, odds, prob)

In [None]:
np.exp(example @ model.coef_.T + model.intercept_) / (1 + np.exp(example @ model.coef_.T + model.intercept_))

In [None]:
(np.exp(- (example @ model.coef_.T) - model.intercept_) + 1) ** -1

In [None]:
# UPDATE MANUALLY
# (1 + np.exp(1.326 + 0.25*(0) - 4.316*(0.6059) - 0.25*(0.00835) - 1.046*(0.2368))) ** -1
(1 + np.exp(1.257 - 4.415*(0.6059))) ** -1

Plot decision boundaries for visualization.

In [None]:
n1 = features.index('Target Stress (MPa)')
logreg_decision_bndr(model, dataset_test, n1, features=features, labels=indicator_labels[ind])

### Tissue Damage

Select a specific indicator from the targets and split the dataset.

In [None]:
ind = indicators[1]
ind

In [None]:
# Stress alone
logreg_build(X[['Target Stress (MPa)']], y[ind], seed=SEED)

In [None]:
df = pd.DataFrame()
for n in range(X.shape[1]):
    res = logreg_build(X, y[ind], n_features=(n + 1), seed=SEED)
    df = df.append(res, ignore_index=True)
df

In [None]:
df.features[4]

A high accuracy can be achieved with just stress and strain as input features. Reducing further to just strain dramatically decreases accuracy. Reducing to just stress reduces recall to 80%. A perfect prediction performance on the test set can be achieved if tissue type, gender and relaxation stress are included as well making it the most accurate model although harder to implement in real time.

In [None]:
n_features = 2
idx = (df.n_features == n_features).idxmax()
model = df.model[idx]
features = df.features[idx]
dataset_train, dataset_test = logreg_split(X[features], y[ind], seed=SEED)

In [None]:
# Check training set performance
logreg_display(model, dataset_train)

In [None]:
# Check test set performance
logreg_display(model, dataset_test)

Review the model parameters.

In [None]:
print('Model coefficients:')
print(model.coef_)
print('Model features:')
print(features)
print('Model intercept:')
print(model.intercept_)
print('Model regularization:')
C = model.C_
print(f"C = {C}, lambda = {1 / C}")

Run an example to confirm the model equation.

In [None]:
y_prob = model.predict_proba(dataset_test[0])
y_prob

In [None]:
example = dataset_test[0][1, :]
example

In [None]:
logit = example @ model.coef_.T + model.intercept_
odds = np.exp(logit)
prob = odds / (1 + odds)
print(logit, odds, prob)

In [None]:
np.exp(example @ model.coef_.T + model.intercept_) / (1 + np.exp(example @ model.coef_.T + model.intercept_))

In [None]:
(np.exp(- (example @ model.coef_.T) - model.intercept_) + 1) ** -1

In [None]:
# UPDATE MANUALLY
# (1 + np.exp(2.94*(0) - 17.84*(0.6059) - 4.95*(0.2368) + 15.54*(0.7923) - 6.65)) ** -1
(1 + np.exp(-17.09*(0.6059) + 18.45*(0.7923) - 8.97)) ** -1

In [None]:
(1 + np.exp(-17.1*(0.6059) + 18.5*(0.7923) - 9)) ** -1

In [None]:
dataset_test[0][5, :]

In [None]:
(1 + np.exp(-17.1*(0.10344) + 18.5*(0.637) - 9)) ** -1

Plot decision boundaries for visualization.

In [None]:
ind

In [None]:
n1 = features.index('Target Stress (MPa)')
logreg_decision_bndr(model, dataset_train, n1, features=features, labels=indicator_labels[ind])

### Major Tissue Damage

Select a specific indicator from the targets and split the dataset.

In [None]:
ind = indicators[2]
ind

In [None]:
# Only 3 positive examples for major damage!!
y[ind].sum()

As expected, 3 positive samples for major damage is not enough to form a useful model. So instead an anomaly detection algorithm will be applied to look for deviations from the normal expectation. For simplicity the validation will be excluded.