# Xception Model for Senescence Score

This jupyter notebook will take into account training of the Xception model and architecture. Training structure (i.e. splits on preprocessing,training functions)  was derived from DeepSesmo (https://doi.org/10.1038/s41467-020-20213-0). Xception was used as the base architecture with appended global averaging, dropout, and softmax dense layer. Note this platform is trained on 3-dimensional images (256,256,3). For this work, we used a nuclei channel, actin channel, and composite channel respectively. This can be modulated

In [None]:
# import packages

import scipy.misc
from scipy import ndimage
import sys
import os
import glob
import json
from collections import defaultdict
import numpy as np
from numpy import random
import matplotlib.pyplot as plt
import matplotlib as mpl
import pandas as pd
import sklearn.model_selection as crv
from sklearn.metrics import f1_score, accuracy_score, recall_score, precision_score
from sklearn.metrics import confusion_matrix,  roc_curve, auc
from keras.preprocessing.image import ImageDataGenerator
from keras.optimizers import gradient_descent_v2
from keras import backend as K
from keras.utils import np_utils
from keras.applications.inception_v3 import InceptionV3 
from tensorflow.keras.applications.resnet50 import ResNet50 
from keras.applications.xception import Xception
from keras.applications.vgg19 import VGG19
from keras.models import Model, model_from_json, Sequential
from keras.layers import Conv2D, MaxPooling2D, Dense, GlobalAveragePooling2D, GlobalMaxPooling2D, Dropout, Flatten,Activation
from keras.callbacks import ModelCheckpoint
from keras.initializers import glorot_normal, glorot_uniform, he_normal, he_uniform, lecun_normal, lecun_uniform
import tensorflow as tf
from tensorflow.keras.applications.resnet50 import ResNet50
import keras
%matplotlib inline

Preprocessing function below. Since our images are 8 bit (maximum pixel value of 255), this function will standardize alll pixels between -1 and 1 for the machine learning inputs.

In [1]:
def preprocess_input(x0):
    return ((x0/255.)-0.5)*2.

The following fdunction takes the two conditions (i.e. senescent and non senescent) and creates train-test splits 

In [2]:
def data_setting(condition_1, condition_2):
    
    global test_size
    
    X_all = np.concatenate((condition_1, condition_2),axis=0)
    y_all = np.concatenate((np.tile(np.array([[0]]),(condition_1.shape[0],1)),
                             np.tile(np.array([[1]]),(condition_2.shape[0],1))
                            ),axis=0)
    train_X, test_X, train_y, test_y = crv.train_test_split(X_all,y_all,test_size=test_size,random_state=42)
    
    X_train = preprocess_input(np.copy(train_X))
    X_test = preprocess_input(np.copy(test_X))
    y_train = np.copy(train_y)
    y_test = np.copy(test_y)

    X_train = X_train.reshape(X_train.shape[0], scale_v*2, scale_h*2, n_chan)
    Y_train = np_utils.to_categorical(y_train, 2)
    X_test = X_test.reshape(X_test.shape[0], scale_v*2, scale_h*2, n_chan)
    Y_test = np_utils.to_categorical(y_test, 2)

    return X_train, X_test, Y_train, Y_test

The following is the model architecture with xception with imagenet initialized weights. Addition of Global pooling, dropout, and Dense softmax layer at the end. Categorical crossentropy loss with adam optimizer (e-3) and accuracy metrics used.

In [3]:
def model_CNN_2():
   
    global nb_epoch, opt, my_init,base_model
    
    
    base_model = Xception(
    weights='imagenet',  # Load weights pre-trained on ImageNet.
    input_shape=(256, 256, 3),
    include_top=False)  
    
    base_model.trainable = False
    
    
    
    inputs = keras.Input(shape=(256, 256, 3))
    x = base_model(inputs, training=False)
    # Convert features of shape `base_model.output_shape[1:]` to vectors
    x = keras.layers.GlobalAveragePooling2D()(x)
    x = keras.layers.Dropout(0.2)(x) 
    outputs = keras.layers.Dense(2, activation='softmax')(x)
    model = keras.Model(inputs, outputs)
    
    

     
    model.compile(loss='categorical_crossentropy',
             optimizer='adam',
             metrics=['accuracy'])
    
    return model

Following plots model metrics

In [4]:
def plot_history(history):
    
    plt.plot(history.history['accuracy'],"o-",label="accuracy")
    plt.plot(history.history['val_accuracy'],"o-",label="val_acc")
    plt.title('model accuracy')
    plt.xlabel('epoch')
    plt.ylabel('accuracy')
    plt.legend(loc="lower right")
    plt.ylim([0,1])
    plt.show()

    plt.plot(history.history['loss'],"o-",label="loss",)
    plt.plot(history.history['val_loss'],"o-",label="val_loss")
    plt.title('model loss')
    plt.xlabel('epoch')
    plt.ylabel('loss')
    plt.legend(loc='upper right')
    plt.show()
    

The following is a nested function in the 'Training CNN' master function. The datagen function keeps non-transfomative orientation shifts to the dataset (not we did not want to create shape-based transformations on the dataset). The function then trains the model on the resulting dataset for the originally outlined epochs. After this initial training, the base model is unfrozen and trained for additional epochs at a lower adams learning rate (e-4) to allow for finer convergance without overfitting

In [5]:
def do_CNN(X_train, X_test, Y_train, Y_test):
    
    global batch_size, nb_epoch, model, history_x, shift_range, std_normalization, whitening,history_y
    
    datagen = ImageDataGenerator(
        rotation_range=360,
        horizontal_flip=True,
        vertical_flip=True,
        )

    history_x = model.fit_generator(datagen.flow(X_train, Y_train, batch_size=batch_size, shuffle=True),
    steps_per_epoch=X_train.shape[0]//batch_size,
    epochs=nb_epoch,
    validation_data=datagen.flow(X_test, Y_test, batch_size=batch_size),
    validation_steps=X_test.shape[0]//batch_size)
    
    
    #unfreeze weights for small amount of change 
    base_model.trainable = True
    model.compile(loss='categorical_crossentropy',
             optimizer=keras.optimizers.adam_v2.Adam(learning_rate=1e-4),
             metrics=['accuracy'])
    history_y=model.fit(datagen.flow(X_train, Y_train, batch_size=batch_size, shuffle=True),
    steps_per_epoch=X_train.shape[0]//batch_size,
    epochs=50,
    validation_data=datagen.flow(X_test, Y_test, batch_size=batch_size),
    validation_steps=X_test.shape[0]//batch_size)
    

The following is the master function that leverages the previous functions to train the xception model

In [6]:
def Training_CNN(condition_1, condition_2):
    
    global nb_classes, batch_size, model, history_x, Y_test, y_pred, X_test,history_y
    
    X_train, X_test, Y_train, Y_test = data_setting(condition_1, condition_2)
    
    model = model_CNN_2()
    
    do_CNN(X_train, X_test, Y_train, Y_test)
    Y_pred = model.predict(X_test, batch_size=1,verbose=1)
    y_pred = np.argmax(Y_pred, axis=1)
    print('final accuracy:',f1_score(np.argmax(Y_test,1), y_pred, average='macro'))
    
    #plot_history(history_x)


Below are  functions to save the model and epoch history, and parameters of the model

In [7]:
def save_model(network_name):
    
    global model
    
    model_json_str = model.to_json()
    
    json_name = os.path.join(network_name,'model_20.json')
    h5_name = os.path.join(network_name,'weights.h5')
    open(json_name, 'w').write(model_json_str)
    model.save_weights(h5_name)

def save_history(network_name):
    
    global history_x, nb_epoch
    
    epochs = range(1, nb_epoch+1)
    data_history = pd.DataFrame(history_x.history, index = epochs)

    print(data_history)
    data_history.to_csv(os.path.join(network_name, "_history" + ".csv"))
    
    
def save_parameters(network_name):
    
    global F1_score, Accuracy, Precision, Recall, confusion_m
    
    #Final parameters
    F1_score = f1_score(np.argmax(Y_test,1), y_pred, average='binary')
    Accuracy = accuracy_score(np.argmax(Y_test,1), y_pred)
    Precision = precision_score(np.argmax(Y_test,1), y_pred, average='binary')
    Recall = recall_score(np.argmax(Y_test,1), y_pred, average='binary')
    
    parameter_dict = {network_name:[F1_score, Accuracy, Precision, Recall]}
    parameter_dataframe = pd.DataFrame(parameter_dict,index=["F1_score", "Accuracy", "Precision", "Recall"])
    
    print(parameter_dataframe)
    parameter_dataframe.to_csv(os.path.join(network_name, "final_parameter" + ".csv"))
    
    #Confusion matrix
    confusion_m = confusion_matrix(np.argmax(Y_test,1), y_pred)
    confusion_dataframe = pd.DataFrame(confusion_m, index=["Answer:0", "Answer:1"])
    confusion_dataframe.columns = ["Prediction:0", "Prediction:1"]
    print(confusion_dataframe)
    confusion_dataframe.to_csv(os.path.join(network_name, "confution_matrix" + ".csv"))
    
    #AUC of ROC curve
    print('hello')

    prob = model.predict(X_test)[:,1]

    fpr, tpr, threshold = roc_curve(np.argmax(Y_test,1), prob)
    roc_auc = auc(fpr, tpr)
    print("AUC: {0}".format(roc_auc))
    roc_data = [fpr, tpr]
    roc_dataframe = pd.DataFrame(roc_data, index=["False positive rate", "True positive rate"])
    print(roc_dataframe)
    roc_dataframe.to_csv(os.path.join(network_name, "ROC_AUC" + ".csv"))
    #print('hello')
    #plt.plot(fpr, tpr, color='red',lw= 2, label='ROC curve (area = %0.2f)' % roc_auc)
    #plt.plot([0, 1], [0, 1], color='black', lw= 2, linestyle='--')
    #plt.xlim([0.0, 1.0])
    #plt.ylim([0.0, 1.05])
    #plt.xlabel('False Positive Rate')
    #plt.ylabel('True Positive Rate')
    #plt.title('Receiver operating characteristic')
    #plt.legend(loc="best")
    #print('hello')
    #plt.savefig(os.path.join(network_name, "roc_Auc" + ".jpg"))
    #plt.show()    



In [None]:
# Parmaeters for training

n_chan=3            # Number of channels of images

scale_v = 128        # Image width/2 (px)
scale_h = 128       # Image height/2 (px)

test_size = 0.2 #test size
nb_epoch = 200 # number of base epochs, not inclusive of additional training sets
nb_classes = 2 # number of classes, binary 
batch_size = 8 # batch size for training

In [None]:
control_npy = np.load('\\path\\')    #Load numpy file of healthy condition and senescent tagged condition
stress_npy = np.load('\\path\\') 

In [None]:
Training_CNN(control_npy, stress_npy) # train model


In [None]:
network_name = '\\directory to save\\'    #Directory path and name to save results
save_model(network_name)
save_history(network_name)
save_parameters(network_name)


# Model Application

Here we will apply the trained model to determine senescent scores of single cells (data instances created from data instance creator)

In [1]:
def preprocess_input(x0):
    return ((x0/255.)-0.5)*2.

In [2]:
def data_setting(condition):
    
    global X_test, Y_test, y_pred
    
    test_X = condition
    test_y = np.concatenate((np.tile(np.array([[0]]),(condition.shape[0],1)),
                            ),axis=0)
    X_test = preprocess_input(np.copy(test_X))
    y_test = np.copy(test_y)

    X_test = X_test.reshape(X_test.shape[0], scale_v*2, scale_h*2, n_chan)
    Y_test = np_utils.to_categorical(y_test, 2)
    
    Y_pred = model.predict(X_test, batch_size=batch_size)
    y_pred = np.argmax(Y_pred, axis=1)

    return X_test, Y_test, Y_pred, y_pred

In [3]:
def save_parameters(network_name, condition):
   
    global F1_score, Accuracy, Precision, Recall, confusion_m
    
    #Final parameters
    data_setting(condition)
    F1_score = f1_score(np.argmax(Y_test,1), y_pred, average='binary')
    Accuracy = accuracy_score(np.argmax(Y_test,1), y_pred)
    Precision = precision_score(np.argmax(Y_test,1), y_pred, average='binary')
    Recall = recall_score(np.argmax(Y_test,1), y_pred, average='binary')
    
    parameter_dict = {network_name:[F1_score, Accuracy, Precision, Recall]}
    parameter_dataframe = pd.DataFrame(parameter_dict,index=["F1_score", "Accuracy", "Precision", "Recall"])
    
    print(parameter_dataframe)
    parameter_dataframe.to_csv(os.path.join(network_name, "final_parameter" + ".csv"))
    
    #Confusion matrix
    confusion_m = confusion_matrix(np.argmax(Y_test,1), y_pred)
    confusion_dataframe = pd.DataFrame(confusion_m, index=["Answer:0", "Answer:1",])
    confusion_dataframe.columns = ["Prediction:0", "Prediction:1"]
    print(confusion_dataframe)
    confusion_dataframe.to_csv(os.path.join(network_name, "confution_matrix" + ".csv"))


In [4]:
# Prameters for validation

n_chan=3            # Number of channels of images

scale_v = 128        # Image width/2 (px)
scale_h = 128       # Image height/2 (px)

batch_size = 16

In [5]:
# Load trained CNN data
json_file_path = '\\path\\'
h5_file_path = '\\path\\'
model = model_from_json(open(json_file_path).read())
model.load_weights(h5_file_path)

In [None]:
# load dataset for prediction
data_predict = np.load( '\\path\\')   #Load numpy file of condition


In [None]:
network_name =  '\\path\\' #Directory path and name to save results
save_parameters(network_name,data_predict)

In [None]:
X_test, Y_test, Y_pred, y_pred=data_setting(data_predict) # predict on the dataset
np.save('\\path\\',Y_pred) # save the scores (note y_pred is the rounded version of this score)
