Train a model to predict complete loss of methylation or partial loss using a sequence

In [None]:
from google.colab import drive
drive.mount('/gdrive')
%cd /gdrive 

Mounted at /gdrive
/gdrive


In [None]:
cd /gdrive/My\ Drive/nn 

/gdrive/My Drive/nn


In [None]:
%tensorflow_version 1.x
import argparse
import os
import pickle
import sys
import glob

import numpy as np

np.random.seed(7)  # for reproducibility

import tensorflow as tf
tf.random.set_random_seed(5005)

from sklearn.model_selection import train_test_split, KFold
from sklearn.utils import class_weight


from tensorflow.python.keras.models import Model, load_model
from tensorflow.python.keras.layers import Input
from tensorflow.python.keras.layers import Dense, Flatten, Dropout
from tensorflow.python.keras.layers.convolutional import Conv1D
from tensorflow.python.keras.layers.pooling import MaxPooling1D
from tensorflow.python.keras.layers.pooling import AveragePooling1D
from tensorflow.keras.optimizers import Adam
from tensorflow.python.keras.callbacks import ModelCheckpoint, EarlyStopping
import tensorflow.python.keras.backend as K
from keras import regularizers
from tensorflow.python.keras.utils import plot_model 

sys.path.append(".")
import utils
from utils import *

l2_lam = 5e-07 
l1_lam = 1e-08 

TensorFlow 1.x selected.


Using TensorFlow backend.


In [None]:
def train_model_on_fold(x_train, y_train, x_test,y_test, input_len,
                        num_epoch, batchsize, func,model_path, class_weights, output_bias=None):
  """
  Train a model to using the train data to predict the test data
  :param x_train: The train dataset 
  :param y_train: The train labels
  :param x_test: The test dataset
  :param y_test: The test labels
  :param input_len: The length of the input
  :param num_epoch: Number of epoches 
  :param batchsize: The batchsize 
  :param func: The model function to use 
  :param model_path: The path to save the model from run to run
  :return: The model after fitting
  """
  model = func(input_len, output_bias=output_bias)
  adam = Adam(lr=0.001, beta_1=0.9, beta_2=0.999, epsilon=1e-08, decay=1e-6)
  model.compile(loss='binary_crossentropy', optimizer=adam, metrics=['accuracy', recall_TP,recall_TN])
  checkpointer = ModelCheckpoint(filepath=model_path, verbose=1, save_best_only=True)
  earlystopper = EarlyStopping(monitor="val_loss", patience=5, verbose=1)
    
  print('fitting the model')          
  history = model.fit(x_train, y_train, epochs=num_epoch, batch_size=batchsize,
                      validation_data=(x_test, y_test), verbose=1,
                      callbacks=[checkpointer, earlystopper, ], class_weight=class_weights)
  return model

In [None]:
def sequence_model(input_len, output_bias=None):
  """
  Buld a model to predict a sequence information 
  :param input_len: The length of the input
  """
  K.clear_session()
  tf.random.set_random_seed(5005)

  if output_bias:
    output_bias = tf.keras.initializers.Constant(output_bias)

  input_node = Input(shape=(input_len, 4), name="input")
  conv1 = Conv1D(filters=90, kernel_size=3, padding='valid', activation="relu", name="conv1",kernel_regularizer=regularizers.l2(l2_lam))(input_node)
  pool1 = MaxPooling1D(pool_size=2, strides=1, name="pool1")(conv1)
  drop1 = Dropout(0.25, name="drop1")(pool1)

  conv2 = Conv1D(filters=100, kernel_size=5, padding='valid', activation="relu", name="conv2", kernel_regularizer=regularizers.l2(l2_lam))(drop1)
  pool2 = MaxPooling1D(pool_size=2, strides=1)(conv2)
  drop2 = Dropout(0.25)(pool2)
  flat = Flatten()(drop2)

  hidden1 = Dense(500, activation='relu', name="hidden1",kernel_regularizer=regularizers.l1(l1_lam))(flat)
  drop3 = Dropout(0.5)(hidden1)
  hidden2 = Dense(250, activation='relu', name="hidden2",kernel_regularizer=regularizers.l1(l1_lam))(drop3)

  output = Dense(1, activation='sigmoid', name="output", bias_initializer=output_bias)(hidden2)
  model = Model(inputs=[input_node], outputs=output)

  return model

In [None]:
def train_model(data_path, model_folder="./models/folds_models", temp_model_folder="./models/temp/weight.h5", input_len=150, number_of_folds=3):
  """
  Train a model or x models using the cross validation 
  :param data_path: The path for the dataset
  :param model_folder: The final folder to save the models
  :param temp_model_folder: A folder to save the models while running
  :param input_len: The length of the input
  :param number_of_folds: Number of fold to use for the model
  :return: The model if we used 1 model(1 fold) or None if more 
  """

  print('loading data')
  x_train_list, y_train_list, x_valid_list, y_valid_list, x_test_seq, y_test, x_train, y_train = load_train_validate_test_data(data_path, input_len, kfold=number_of_folds)

  models_path = []
  acc_per_fold = []
  loss_per_fold = []

  neg, pos = np.sum(y_test==0), np.sum(y_test==1)
  initial_bias = np.log([pos/neg])
  total = neg + pos
  
  class_weights = class_weight.compute_class_weight('balanced',  np.unique(y_test), y_test)
  temp_class_weights = dict(enumerate(class_weights))
  min_value = min(temp_class_weights.values())
  class_weights = {i: temp_class_weights[i] / min_value for i in temp_class_weights}

  for fold_num in range(len(x_train_list)):
    print("Using fold %s/%s" %(fold_num+1, number_of_folds))
    x_train_fold = x_train[x_train_list[fold_num]] if number_of_folds != 1 else x_train_list[fold_num]
    y_train_fold = y_train[y_train_list[fold_num]] if number_of_folds != 1 else y_train_list[fold_num]
    x_valid_fold = x_train[x_valid_list[fold_num]] if number_of_folds != 1 else x_valid_list[fold_num]
    y_valid_fold = y_train[y_valid_list[fold_num]] if number_of_folds != 1 else y_valid_list[fold_num]

    temp_model_file = model_path  = os.path.join(model_folder, "fold%s.h5" %fold_num)

    model = train_model_on_fold(x_train_fold, y_train_fold,x_valid_fold, y_valid_fold, model_path=temp_model_folder, 
                            input_len=150, num_epoch=20, batchsize=128, func = sequence_model, class_weights=class_weights, output_bias = initial_bias)
    
    if fold_num == 0:
      print(model.summary())
      plot_model(model, show_shapes=True, show_layer_names=True,rankdir="TB")

    print("Finish training fold %d" % (fold_num+1))
    print('testing the model')
    score = model.evaluate(x_test_seq, y_test)

    for i in range(len(model.metrics_names)):
        print(str(model.metrics_names[i]) + ": " + str(score[i]))

    acc_per_fold.append(score[1] * 100)
    loss_per_fold.append(score[0])
    models_path.append(model_path)

    model.save(model_path)

  print('Average scores for all folds:')
  print(f'> Accuracy: {np.mean(acc_per_fold)} (+- {np.std(acc_per_fold)})')
  print(f'> Loss: {np.mean(loss_per_fold)}')

  if number_of_folds == 1:
    return model
  
  return None 

# Train the two models

In [None]:
# Train the NN on the scWGBS data using 5 folds 
bian_data = r"dataset/bian_crc01_train_test.pkl"
zhou_data = r"dataset/zhou_train_test.pkl"

models_folder_zhou ="./models/zhou"
models_folder_bian = "./models/bian"

In [None]:
# Train zhou model
model = train_model(data_path=zhou_data, model_folder=models_folder_zhou, temp_model_folder="./models/temp/weight.h5", input_len=150, number_of_folds=5)

In [None]:
# Train Bian model
model = train_model(data_path=bian_data, model_folder=models_folder_bian, temp_model_folder="./models/temp/weight.h5", input_len=150, number_of_folds=5)

In [None]:
# Test zhou model
_,_,_,_, x_test_seq, y_test,_,_ = load_train_validate_test_data(path_to_data=zhou_data, input_len=150, kfold=1, only_test=True)

models = load_models(models_folder_zhou)
get_scores(models,x_test_seq, y_test)


In [None]:
# Test bian model
_,_,_,_, x_test_seq, y_test,_,_ = load_train_validate_test_data(path_to_data=bian_data, input_len=150, kfold=1, only_test=True)

models = load_models(models_folder_bian)
get_scores(models,x_test_seq, y_test)