In [None]:
import os
os.environ['TF_CPP_MIN_LOG_LEVEL'] = '2'

import numpy as np
import pandas as pd
import tensorflow as tf
from tensorflow import keras
from tensorflow.keras import layers
tf.random.set_seed(123)
from tensorflow import keras
from tensorflow.keras.layers import multiply
from tensorflow.keras import backend as K

In [None]:
import os
os.environ["CUDA_VISIBLE_DEVICES"] = "2"

In [None]:
def cbam_block(cbam_feature, ratio=8,kernel_size = (3,3)):

    cbam_feature = channel_attention(cbam_feature, ratio)
    cbam_feature = spatial_attention(cbam_feature,kernel_size)
    return cbam_feature

def channel_attention(input_feature, ratio=8):
    channel = input_feature.shape[-1]
    filters = max(1, int(channel//ratio))
    shared_layer_one = tf.keras.layers.Dense(filters,
                             activation='relu',
                             kernel_initializer='he_normal',
                             use_bias=True,
                             bias_initializer='zeros')
    shared_layer_two = tf.keras.layers.Dense(channel,
                             kernel_initializer='he_normal',
                             use_bias=True,
                             bias_initializer='zeros')

    avg_pool = tf.keras.layers.GlobalAveragePooling2D()(input_feature)    
    avg_pool = tf.keras.layers.Reshape((1,1,channel))(avg_pool)
    avg_pool = shared_layer_one(avg_pool)
    avg_pool = shared_layer_two(avg_pool)

    max_pool = tf.keras.layers.GlobalMaxPooling2D()(input_feature)
    max_pool = tf.keras.layers.Reshape((1,1,channel))(max_pool)
    max_pool = shared_layer_one(max_pool)
    max_pool = shared_layer_two(max_pool)
   

    cbam_feature = tf.keras.layers.Add()([avg_pool,max_pool])
    cbam_feature = tf.keras.layers.Activation('sigmoid')(cbam_feature)


    return multiply([input_feature, cbam_feature])
def spatial_attention(input_feature,kernel_siz):
    kernel_size = kernel_siz
    channel = input_feature.shape[-1]
    cbam_feature = input_feature
    avg_pool = tf.keras.layers.Lambda(lambda x: K.mean(x, axis=3, keepdims=True))(cbam_feature)
    max_pool = tf.keras.layers.Lambda(lambda x: K.max(x, axis=3, keepdims=True))(cbam_feature)
    concat = tf.keras.layers.Concatenate(axis=3)([avg_pool, max_pool])
    cbam_feature = tf.keras.layers.Conv2D(filters = 1,
                    kernel_size=kernel_size,
                    strides=1,
                    padding='same',
                    activation='sigmoid',
                    kernel_initializer='he_normal',
                    use_bias=False)(concat)	

    return multiply([input_feature, cbam_feature])

In [None]:

def init_cnn():
    inputs = layers.Input(shape=(10, 10, 1))
    x = layers.Conv2D(128, (3, 3), activation='relu', padding='same')(inputs)
    x = layers.MaxPooling2D((2, 2))(x)
    x = layers.Conv2D(64, (3, 3), activation='relu', padding='same')(x)
    x = layers.MaxPooling2D((2, 2))(x)
    x = cbam_block(x)
    x = layers.Flatten()(x)
    x = layers.Dense(128, activation='relu')(x)
    return tf.keras.Model(inputs, x)

def transformer_encoder(inputs, d_model, num_heads, mlp_dim, dropout_rate):
    x = layers.LayerNormalization(epsilon=1e-6)(inputs)
    x = layers.MultiHeadAttention(num_heads=num_heads, key_dim=d_model, dropout=dropout_rate)(x, x)
    x = layers.Add()([inputs, x])
    x = layers.LayerNormalization(epsilon=1e-6)(x)
    x = layers.Dense(mlp_dim, activation="gelu")(x)
    x = layers.Dropout(dropout_rate)(x)
    x = layers.Dense(d_model, activation="gelu")(x)
    x = layers.Dropout(dropout_rate)(x)
    x = layers.Add()([inputs, x])
    return x

def init_model():
    cnn = init_cnn()
    cnn_output = cnn.output
    reshaped = layers.Reshape((1, 128))(cnn_output)  


    encoded_sequence = transformer_encoder(reshaped, 128, 4, 512, 0.1)

 
    encoded_sequence = layers.Flatten()(encoded_sequence)
    
    hidden_layer = tf.keras.layers.Dense(units=128, activation="relu")(encoded_sequence)
    hidden_layer = tf.keras.layers.Dropout(0.4)(hidden_layer)
    hidden_layer = tf.keras.layers.Dense(units=64, activation="relu")(hidden_layer)
    hidden_layer = tf.keras.layers.Dropout(0.4)(hidden_layer)
    
    
    outputs = layers.Dense(1, activation='sigmoid')(hidden_layer)

    model = tf.keras.Model(inputs=cnn.input, outputs=outputs)
    model.compile(loss=tf.keras.losses.BinaryCrossentropy(label_smoothing=0.01), 
            optimizer=tf.keras.optimizers.Adam(learning_rate = 3e-4),
            metrics=[keras.metrics.TruePositives(name='tp'),
              keras.metrics.FalsePositives(name='fp'),
              keras.metrics.TrueNegatives(name='tn'),
              keras.metrics.FalseNegatives(name='fn'), 
              keras.metrics.BinaryAccuracy(name='accuracy'),
              keras.metrics.Precision(name='precision'),
              keras.metrics.Recall(name='recall')])
    return model

model = init_model()
model.summary()

In [None]:
from tensorflow.keras.utils import Sequence
def fun(a):
    oshape = a.shape
    a = a.reshape(-1,10).astype('float32')
    mean = np.mean(a,axis = 1).reshape(-1,1)
    a =a - mean
    sqrt = (np.sqrt(a.var(axis =1))+1e-10).reshape(-1,1)
    a = a/sqrt
    return a.reshape(oshape)
    
class load_testdata(Sequence):
    def __init__(self, x_y_set, batch_size):
        self.x_y_set = x_y_set
        self.x = self.x_y_set[:,:]
        self.batch_size = batch_size

    def __len__(self):
        
        return math.floor(len(self.x) / self.batch_size)
        
    def __getitem__(self, idx):
        batch_x = self.x[idx * self.batch_size:(idx + 1) * self.batch_size]
        
        batch_x = batch_x.reshape(-1,10,10,1)
           
        return np.array(batch_x)

### e.g. predict HIC056, HIC074 data

In [None]:

import math
import random
# Loading model weights
model.load_weights('/home/wangxiaoyan/deepTAD/model/CNN_transformer_weights/all_27.h5')

cells = ["HIC056","HIC074"]
chrs = list(map(str, range(1, 23))) + ["X"]
resolutions = [10000,25000,50000,100000]
for cell in cells:
    for resolution in resolutions:
        display_reso = int(resolution/1000)
        for chr in chrs:
            pre_data=np.load(f'/home/wangxiaoyan/deepTAD/work/deepTAD/{cell}/samples_generate/{cell}_{display_reso}k_chr{chr}-matrix.npy')
            print(pre_data.shape)
            random.seed(0)
            np.random.shuffle(pre_data)
            data1=fun(pre_data[:,:])
            data=pre_data[:,:]

            batch_size=32
            preprocessed_data = load_testdata(data1, batch_size)
            epo=math.floor(len(pre_data) / batch_size)
            predictions = model.predict(preprocessed_data,batch_size=32)
            binary_predictions = (predictions > 0.5).astype(int)
            predictions1 = np.array(predictions.flatten())
            merged_data = np.concatenate((data[:batch_size*epo,:], predictions1.reshape(-1,1)), axis=1)
            merged_data[:, -1] = np.where(merged_data[:, -1].astype(float) >= 0.5, 1, 0)
            print(merged_data.shape)
            np.save('/home/wangxiaoyan/deepTAD/work/deepTAD/%s/predict_result/%dk_chr%s-total.npy'%(cell,display_reso,chr), merged_data)


#### ②Filter out the ones predicted to be 1, find the position in the large matrix, and output the boundaries

In [None]:

import numpy as np

def find_submatrix(big_matrix, small_matrix):
    big_rows, big_cols = big_matrix.shape
    small_rows, small_cols = small_matrix.shape

    for i in range(big_rows - small_rows + 1):
        if not np.allclose(big_matrix[i:i+small_rows, i:i+small_cols], small_matrix):
            continue
        return i+5, i+6
    return None


cells = ["HIC056","HIC074"]
chrs = list(map(str, range(1, 23))) + ["X"]
resolutions = [10000,25000,50000,100000]
for cell in cells:
    for resolution in resolutions:
        display_reso = int(resolution/1000)
        for chr in chrs:

            big_matrix = np.loadtxt("/home/wangxiaoyan/deepTAD/prepare_data/%s/%s_%dk_KR.chr%s"%(cell,cell,display_reso,chr))
        
            sub_matrix_data = np.load('/home/wangxiaoyan/deepTAD/work/deepTAD/%s/predict_result/%dk_chr%s-total.npy'%(cell,display_reso,chr))
            sub_matrix_data = sub_matrix_data.astype(float)
            positive_indices = np.where(sub_matrix_data[:, -1] == 1)[0]
            positive_data = sub_matrix_data[positive_indices, :100]
            num_submatrices = positive_data.shape[0]
            results = []
            for i in range(num_submatrices):
                small_matrix1 = positive_data[i].reshape((10, 10))
                #print(small_matrix1)
                result = find_submatrix(big_matrix, small_matrix1)
                if result is not None:
                    results.append(result)
            if results:
                for start_row, start_col in results:
                    print("The center of the small matrix：(row: {}, column: {})".format(start_row, start_col))
                    print()
            else:
                print("small_matrix is not found in big_matrix.")
            sorted_results = sorted(results)
            with open("/home/wangxiaoyan/deepTAD/work/deepTAD/%s/predict_results_boundary/%dk_chr%s-predbound.txt" %(cell,display_reso,chr), "w") as file:
                for position in sorted_results:
                    file.write("{}\t{}\n".format(position[0], position[1]))

In [None]:
### 过滤边界 运行/home/wangxiaoyan/deepTAD/work/cnn_transformer_tad/Cells_generate.sh
### 生成bin、loci文件   运行"/home/wangxiaoyan/deepTAD/work/all_TADs/extract_diff_cnn_transformer.sh"

In [None]:
###组装TAD   运行deepTAD/work/cnn_transformer_tad/Cells_mergeTAD.ipynb
