In [1]:
import pandas as pd
import numpy as np
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import OneHotEncoder
from itertools import product
import matplotlib.pyplot as plt
%matplotlib inline
from os.path import join
import tensorflow as tf
from tensorflow.keras import layers, models, optimizers
from tensorflow.python.keras.models import load_model

In [2]:

def sequence_to_onehot(_datax):
  # init k-mer Dictionary
  k_mer_dict = dict()
  k_mer = []
  for i in product(['A', 'T', 'G', 'C', 'D'],repeat=3):
      k_mer.append(''.join(i))
  k_mer = pd.DataFrame(k_mer)
  ohencoder = OneHotEncoder()
  ohencoder.fit(k_mer)
  k_mer_oh = ohencoder.transform(k_mer).toarray()
  for i in range(125):
    k_mer_dict[k_mer[0][i]] = k_mer_oh[i]
  for i in range(125):
    k_mer_dict[k_mer[0][i]] = list(map(int, (k_mer_dict[k_mer[0][i]])))
  
  # Sequence to OneHot
  res_list = []
  for i in range(len(_datax)):
    tmp =list()
    for j in range(len(_datax[i])-3):
      tmp.append(k_mer_dict[str(_datax[i][j:j+3])])
    res_list.append(np.array(tmp))
  res_np = np.array(res_list)
  return res_np

def load_data(_data_file_name:str):
  _df_shuffled = pd.read_csv(_data_file_name).sample(frac=1).reset_index(drop=True)
  _datax = sequence_to_onehot(_df_shuffled.iloc[:,[0,]].to_numpy().reshape(-1))
  _datay = pd.get_dummies(_df_shuffled, columns=['label'])[['label_0','label_1']][:] 
  # Train, Test Split 
  _trnx, _tstx, _trny, _tsty = train_test_split(_datax, _datay, test_size = 0.2, random_state =111)
  return _datax, _datay, _trnx, _tstx, _trny, _tsty

def make_cnn_model(input_shape:tuple):
  _model = models.Sequential()
  _model.add(layers.Conv2D(16,(2,2), padding='same', input_shape=input_shape))
  _model.add(layers.BatchNormalization())
  _model.add(layers.Activation("relu"))
  _model.add(layers.MaxPooling2D((2,2)))

  _model.add(layers.Conv2D(16,(2,2), padding='same'))
  _model.add(layers.BatchNormalization())
  _model.add(layers.Activation("relu"))
  _model.add(layers.Dropout(0.2))
  _model.add(layers.MaxPooling2D((2,2)))

  _model.add(layers.Flatten())

  _model.add(layers.Dense(units = 100, activation = "relu"))
  _model.add(layers.Dense(units = 2, activation = "sigmoid"))
  _model.summary()
  return _model

def fit(_model, _train_test_data:tuple, batch_size=20, epochs=50):
  #_train_test_data = ( trnx, tstx, trny, tsty )
  trnx, tstx, trny, tsty = _train_test_data
  opt = optimizers.SGD(learning_rate=0.1)
  _model.compile(optimizer = 'Adam', loss = 'binary_crossentropy', metrics=['accuracy'])
  _history = _model.fit(trnx.reshape(-1,5597,125,1), trny, validation_data = [tstx.reshape(-1,5597,125,1),tsty], batch_size=batch_size, epochs=epochs)
  return _history

def draw_history_graph(_history):
  plt.plot(_history.history['accuracy'])
  plt.plot(_history.history['val_accuracy'])
  plt.title('model accuracy')
  plt.ylabel('accuracy')
  plt.xlabel('epoch')
  plt.legend(['train','test'], loc = 'lower right') 
  plt.show()
  print(f"Final Validation Accuracy: {_history.history['val_accuracy'][-1]}")

def start_train(_data_file_name, model_to_train = None, batch_size=20, epochs=50):
  # Load Data
  datax, datay, trnx, tstx, trny, tsty = load_data(_data_file_name)
  train_test_data = (trnx, tstx, trny, tsty)

  # Make Model
  if model_to_train == None:
    model = make_cnn_model(input_shape = (5597,125,1))
  else:
    model = model_to_train

  # Fitting Data to Model
  history = fit(model,train_test_data,batch_size=batch_size, epochs=epochs) 

  # Show Performance
  draw_history_graph(history)
  
  # return Trained Model & History
  return model, history

In [3]:
#first_trained_model,first_trained_history = start_train('data800_1.csv',model_to_train = None, batch_size=50, epochs=20)

In [4]:
#import time as t 
#t.sleep(10)
#first_trained_model.save('first_trained_model.h5')



In [5]:
model_load = models.load_model('first_trained_model.h5')

In [6]:
datax, datay, trnx, tstx, trny, tsty = load_data('data800_2.csv')

In [12]:
opt = optimizers.SGD(learning_rate=0.1)
model_load.compile(optimizer = 'Adam', loss = 'binary_crossentropy', metrics=['accuracy'])
history = model_load.fit(trnx.reshape(-1,5597,125,1), trny, validation_data = [tstx.reshape(-1,5597,125,1),tsty], batch_size=20, epochs=20)

Epoch 1/20
Epoch 2/20
Epoch 3/20
Epoch 4/20
Epoch 5/20
Epoch 6/20
Epoch 7/20
Epoch 8/20
Epoch 9/20
Epoch 10/20
Epoch 11/20
Epoch 12/20
Epoch 13/20
Epoch 14/20
Epoch 15/20
Epoch 16/20
Epoch 17/20
Epoch 18/20
Epoch 19/20
Epoch 20/20


In [7]:
#second_trained_model,second_trained_history = start_train('data800_2.csv',model_to_train=model_load, batch_size=50, epochs=20)

In [8]:
#t.sleep(10)

In [9]:
#third_trained_model,third_trained_history = start_train('data800_3.csv',model_to_train = second_trained_model, batch_size=50, epochs=20)