### Imports & settings

In [1]:
import re
import io
import os
from contextlib import redirect_stdout, redirect_stderr

import pandas as pd
import numpy as np
from Bio.Seq import Seq
from Bio import SeqIO
import rpy2.robjects as robjects
from rpy2.robjects import pandas2ri

from rpy2.robjects.packages import importr
import rpy2.rinterface_lib.callbacks
import logging

In [2]:
import tensorflow as tf
print(tf.__version__)


2.15.0


In [3]:
pd.set_option('display.max_rows', 500)
pd.set_option('display.max_columns', 500)

In [4]:
# utils = importr("utils") 
# utils.install_packages("gbm") # установка пакета R

os.environ['RPY2_CCHAR_ENCODING'] = 'latin1'
pandas2ri.activate()
rpy2.rinterface_lib.callbacks.logger.setLevel(logging.ERROR)

### Функции

In [5]:
def load_sequence(userseq: str) -> str:
    """Load sequence from FASTA, TXT, or direct string"""
    if userseq.endswith(".fasta") or userseq.endswith(".fa"):
        records = list(SeqIO.parse(userseq, "fasta"))
        sequence = str(records[0].seq)
    elif userseq.endswith(".txt"):
        with open(userseq, "r") as f:
            sequence = "".join(line.strip() for line in f.readlines())
    else:
        sequence = userseq.replace(" ", "").upper()
    return sequence

In [6]:
def find_candidate_sgRNAs(sequence: str, pam: str = "NGG") -> list[dict]:
    """Find candidate sgRNAs with PAM motif in both strands"""
    pam_regex = pam.replace("N", ".")
    results = []

    pam_pattern = re.compile(pam_regex)

    for strand, seq in [('+', sequence), ('-', str(Seq(sequence).reverse_complement()))]:
        for i in range(len(seq) - 29):
            window = seq[i:i+30]
            pam_candidate = window[24:24 + len(pam)]
            if pam_pattern.fullmatch(pam_candidate):
                sgRNA = window[4:24]
                results.append({
                    "strand": strand,
                    "start": i+4 if strand == "+" else len(seq) - i - 26,
                    "end": i+24 if strand == "+" else len(seq) - i - 6,
                    "sgRNA": sgRNA,
                    "pam": pam_candidate,
                    "window": window
                })
    return results

In [7]:
DEGENERATE_BASES = re.compile(r"[UWSMKRYBDHVNZ]")  # некорректные символы
HOMOPOLYMER_PATTERN = re.compile(r"(A{4,}|T{4,}|G{4,}|C{4,})")

def is_valid(seq: str) -> bool:
    return not DEGENERATE_BASES.search(seq)

def get_gc_content(seq: str) -> float:
    return (seq.count("G") + seq.count("C")) / len(seq)

def has_homopolymer(seq: str) -> bool:
    return bool(HOMOPOLYMER_PATTERN.search(seq))

BACKBONE = Seq("AGGCTAGTCCGT")
REVCOMP_BACKBONE = BACKBONE.reverse_complement()

def gc_content_4bp(seq4: str) -> float:
    return (seq4.count("G") + seq4.count("C")) / 4

def count_self_complementarity(sgRNA: str) -> int:
    sg_seq = Seq(sgRNA)
    revcomp_sg = sg_seq.reverse_complement()
    score = 0

    for i in range(0, len(sgRNA) - 3):  # 4-меры от 0 до 16
        tetramer = sgRNA[i:i+4]
        if gc_content_4bp(tetramer) >= 0.5:
            # Сравниваем с обратным комплементом самого sgRNA
            search_region = sgRNA[i+7:] if i <= 10 else ""
            if tetramer in Seq(search_region).reverse_complement():
                score += 1
            # Сравниваем с обратным комплементом бекбона
            if tetramer in REVCOMP_BACKBONE:
                score += 1
    return score

def filter_sgRNAs(candidates: list[dict]) -> list[dict]:
    filtered = []
    for c in candidates:
        if not is_valid(c["window"]):
            continue
        sgRNA_seq = c["sgRNA"]
        pam_seq = c["pam"]
        gc_content = get_gc_content(sgRNA_seq)
        homopolymer = has_homopolymer(sgRNA_seq)
        self_comp_score = count_self_complementarity(sgRNA_seq)

        filtered.append({
            **c,
            "sgRNA_seq": sgRNA_seq,
            "pam_seq": pam_seq,
            "gc_content": round(gc_content, 3),
            "homopolymer": homopolymer,
            "self_complementarity": self_comp_score
        })
    return filtered

In [8]:
# def predict_from_r_model(df_features):
#     robjects.r('library(gbm)')
#     r_df = pandas2ri.py2rpy(df_features)
#     # передаём всю команду как строку, включая параметры
#     robjects.globalenv["df_features_r"] = r_df
#     pred = robjects.r('predict(model, df_features_r, n.trees=500)')
#     return list(pred)

def predict_from_r_model(df_features):
    gbm = importr("gbm")  # безопасный импорт пакета, не вызывает строковый вывод
    r_df = pandas2ri.py2rpy(df_features)
    robjects.globalenv["df_features_r"] = r_df

    with redirect_stdout(io.StringIO()), redirect_stderr(io.StringIO()):
        pred = robjects.r('predict(model, df_features_r, n.trees=500)')
    
    return list(pred)

In [9]:
def build_dataframe(sgrna_candidates, features_df, efficiency_scores):
    """
    Сборка итогового датафрейма:
    - sgrna_candidates: список словарей с полями sgRNA, pam, strand, start, end, window
    - features_df: датафрейм признаков (выход Doench_2016_processing)
    - efficiency_scores: список float значений от модели
    """

    df = pd.DataFrame(sgrna_candidates)
    df["efficiency_score"] = [round(x, 3) for x in efficiency_scores]

    # Подсчёт GC-содержания 20-mer sgRNA
    df["GC_content"] = df["sgRNA"].apply(lambda x: round((x.count("G") + x.count("C")) / len(x), 3))

    # Детектирование гомополимеров
    df["Homopolymer"] = df["sgRNA"].apply(lambda x: any(rep in x for rep in ["AAAA", "TTTT", "GGGG", "CCCC"]))

    # Простейшая проверка самокомплементарности (временно — длина обратной комплементарной пары > 0)
    df["Self_complementary"] = df["sgRNA"].apply(lambda x: "CG" in x[::-1] or "GC" in x[::-1])  # упрощённо

    # Генерация Notes
    def generate_notes(row):
        notes = []
        if row["GC_content"] < 0.3:
            notes.append("Low GC")
        elif row["GC_content"] > 0.8:
            notes.append("High GC")
        if row["Homopolymer"]:
            notes.append("Homopolymer")
        if row["Self_complementary"]:
            notes.append("Self Complementary")
        if row["efficiency_score"] < 0.5:
            notes.append("Low Efficiency")
        if not notes:
            return "N/A"
        return ", ".join(notes)

    df["Notes"] = df.apply(generate_notes, axis=1)

    df = df.sort_values(by="efficiency_score", ascending=False).reset_index(drop=True)

    # import ace_tools as tools; tools.display_dataframe_to_user(name="sgRNA Candidates", dataframe=df)
    return df

### Run

In [10]:
robjects.r('model <- readRDS("models/Doench_2016/Rule_set_2_Model.rds")')

In [11]:
robjects.r('source("R/Doench_2016_processing.R")')

def generate_features_in_r(sgrna_windows: list[str]):
    sgrna_vector = robjects.StrVector(sgrna_windows)
    with redirect_stdout(io.StringIO()):  # скрываем печать R в stdout
        r_df = robjects.r['Doench_2016_processing'](sgrna_vector)
    return pandas2ri.rpy2py(r_df)

In [12]:
sequence = load_sequence("data/DAK1.fasta")
candidates = find_candidate_sgRNAs(sequence, pam="NGG")
print(f"Найдено {len(candidates)} sgRNA-кандидатов.")
print(candidates[:3])

filtered = filter_sgRNAs(candidates)
print(f"Осталось {len(filtered)} sgRNA после фильтрации.")

windows = [x["window"] for x in filtered]
features_df = generate_features_in_r(windows)

scores = predict_from_r_model(features_df)
df_sg_candidates  = build_dataframe(filtered, features_df, scores)
df_sg_candidates.head()

Найдено 170 sgRNA-кандидатов.
[{'strand': '+', 'start': 30, 'end': 50, 'sgRNA': 'CCAGTCAATTCAAGTCTCAA', 'pam': 'AGG', 'window': 'AGATCCAGTCAATTCAAGTCTCAAAGGGTT'}, {'strand': '+', 'start': 31, 'end': 51, 'sgRNA': 'CAGTCAATTCAAGTCTCAAA', 'pam': 'GGG', 'window': 'GATCCAGTCAATTCAAGTCTCAAAGGGTTT'}, {'strand': '+', 'start': 62, 'end': 82, 'sgRNA': 'TGCTAACCCCTCCATTACGC', 'pam': 'TGG', 'window': 'CCCTTGCTAACCCCTCCATTACGCTGGTCC'}]
Осталось 170 sgRNA после фильтрации.


Unnamed: 0,strand,start,end,sgRNA,pam,window,sgRNA_seq,pam_seq,gc_content,homopolymer,self_complementarity,efficiency_score,GC_content,Homopolymer,Self_complementary,Notes
0,-,378,398,CATCACCTATGACAGCAACG,CGG,ACATCATCACCTATGACAGCAACGCGGCAG,CATCACCTATGACAGCAACG,CGG,0.5,False,0,0.786,0.5,False,True,Self Complementary
1,+,405,425,GCAGTTGGCAGAGAAAAGGG,TGG,TGTTGCAGTTGGCAGAGAAAAGGGTGGTAT,GCAGTTGGCAGAGAAAAGGG,TGG,0.55,True,0,0.741,0.55,True,True,"Homopolymer, Self Complementary"
2,+,156,176,GGACATGAACCTACACACGC,CGG,TAGTGGACATGAACCTACACACGCCGGTTT,GGACATGAACCTACACACGC,CGG,0.55,False,1,0.711,0.55,False,True,Self Complementary
3,+,1310,1330,GAAGGACTCATTATCTCAGG,CGG,TGTCGAAGGACTCATTATCTCAGGCGGTTG,GAAGGACTCATTATCTCAGG,CGG,0.45,False,2,0.7,0.45,False,False,
4,+,875,895,GGAAAATTACAACATAACCC,CGG,TAAAGGAAAATTACAACATAACCCCGGTTC,GGAAAATTACAACATAACCC,CGG,0.35,True,0,0.695,0.35,True,False,Homopolymer


In [13]:
len(df_sg_candidates.sgRNA_seq.iloc[0])

20

### CRIPSR-BERT

In [None]:
import pandas as pd
import numpy as np
from models.CRISPR_BERT.model import build_bert
from models.CRISPR_BERT.Encoder_change import BERT_encode, C_RNN_encode
from models.CRISPR_BERT.Encoder_change import Encoder

In [None]:
def load_data_to_df(path, n_rows=1000):
    """Загружает sgRNA/off_target/label из CSV"""
    df = pd.read_csv(path, names=["sgRNA", "off_target", "label"], nrows=n_rows)
    df["label"] = df["label"].astype(int)
    return df

def prepare_crisprbert_df(df: pd.DataFrame) -> pd.DataFrame:
    """
    Подготавливает датафрейм для подачи в CRISPR-BERT:
    - совмещает sgRNA и off_target по позициям (одинаковой длины);
    - приводит пары нуклеотидов к нижнему регистру;
    - заменяет '_' на 'x';
    - соединяет в строку, разделённую запятой.
    """
    paired_sequences = []

    for idx, row in df.iterrows():
        sgrna = row["sgRNA"].replace("_", "X").lower()
        off = row["off_target"].replace("_", "X").lower()

        if len(sgrna) != len(off):
            print(f"⚠️ Строка #{idx}: длины sgRNA и off_target не совпадают → пропущена")
            paired_sequences.append("")  # или можно вставить заглушку 'xx,xx,...'
            continue

        pairs = [sgrna[i] + off[i] for i in range(len(sgrna))]
        sequence_str = ",".join(pairs)
        paired_sequences.append(sequence_str)

    df = df.copy()
    df["sequence"] = paired_sequences
    return df[["sequence", "label", "sgRNA", "off_target"]]


def predict_crisprbert_from_df(df: pd.DataFrame, model_weights_path: str, n_predict=100):
    """Кодирует и предсказывает off-target вероятность"""
    df_subset = df.iloc[:n_predict].copy()
    
    # BERT inputs
    data_list = df_subset[["sequence", "label"]].values.tolist()
    token_ids, segment_ids = BERT_encode(data_list)

    # RNN inputs
    rnn_encoded = np.array(C_RNN_encode(df_subset))

    # Модель
    model = build_bert()
    model.load_weights(model_weights_path)

    # Предсказания
    y_pred = model.predict([rnn_encoded, np.array(token_ids), np.array(segment_ids)])
    return df_subset, y_pred


def encode_sequence_column(df: pd.DataFrame, column: str = "sequence") -> pd.DataFrame:
    """
    Применяет Encoder к колонке с биграммами (в виде строки через запятую),
    добавляет закодированные данные в столбец 'rnn_encoded'.
    """
    encoded_list = []
    for i, val in enumerate(df[column]):
        try:
            merged = val.replace(",", "")  # склеиваем биграммы в строку длины 52
            en = Encoder(merged)
            encoded_list.append(en.on_off_code)
        except KeyError as e:
            print(f"⚠️ Ошибка кодирования в строке {i}: {e} → {val}")
            encoded_list.append(np.zeros((26, 7), dtype=int))  # fallback для некорректных строк

    df["rnn_encoded"] = encoded_list
    return df


def encode_with_bert_tokenizer(df: pd.DataFrame, column: str = "sequence"):
    """
    Применяет BERT_encode к колонке с биграммами и добавляет token_ids и segment_ids в датафрейм.
    """
    data_for_encoding = []
    for val in df[column]:
        tokens = val.split(',')
        tokens = [t.strip() for t in tokens if t]  # очистка
        text = " ".join(tokens)
        data_for_encoding.append((text, 0))  # фиктивный label
    
    token_ids, segment_ids = BERT_encode(data_for_encoding)
    df["bert_token_ids"] = token_ids
    df["bert_segment_ids"] = segment_ids
    return df


def add_rnn_encoded_column(df: pd.DataFrame, column: str = "sequence", n_nucl: int = 24) -> pd.DataFrame:
    """
    Добавляет столбец rnn_encoded — результат работы Encoder (OHE-кодировка биграмм).
    Если биграмм меньше 26, добавляется padding 'xx'.
    """
    encoded = []
    for i, val in enumerate(df[column]):
        try:
            bigrams = val.split(",")
            # паддинг до 26 биграмм
            if len(bigrams) < n_nucl:
                bigrams += ["xx"] * (n_nucl - len(bigrams))
            elif len(bigrams) > n_nucl:
                bigrams = bigrams[:n_nucl]
            # оборачиваем в строку без запятых
            merged = "".join(bigrams)
            en = Encoder(merged)
            encoded.append(en.on_off_code)
        except KeyError as e:
            print(f"⚠️ Ошибка кодирования RNN в строке {i}: {e} → {val}")
            encoded.append(np.zeros((n_nucl+2, 7)))  # с запасом
    df = df.copy()
    df["rnn_encoded"] = encoded
    return df


def add_bert_encoding_columns(df: pd.DataFrame, column: str = "sequence", n_nucl: int = 24) -> pd.DataFrame:
    """
    Добавляет в DataFrame две колонки:
    - bert_token_ids — токен-идентификаторы (из token_dict)
    - bert_segment_ids — список нулей того же размера.
    
    При необходимости добавляет 'xx'-паддинг до n_nucl биграмм.
    """
    data_for_encoding = []
    
    for i, val in enumerate(df[column]):
        try:
            bigrams = val.strip().split(",")
            # паддинг
            if len(bigrams) < n_nucl:
                bigrams += ["xx"] * (n_nucl - len(bigrams))
            elif len(bigrams) > n_nucl:
                bigrams = bigrams[:n_nucl]

            # преобразуем обратно в строку
            padded_text = ",".join(bigrams)
            data_for_encoding.append((padded_text, 0))  # фиктивный label
        except Exception as e:
            print(f"⚠️ Ошибка при подготовке строки #{i} для BERT: {e}")
            data_for_encoding.append((",".join(["xx"] * n_nucl), 0))

    token_ids, segment_ids = BERT_encode(data_for_encoding)

    df = df.copy()
    df["bert_token_ids"] = token_ids
    df["bert_segment_ids"] = segment_ids
    return df


def run_crisprbert_prediction(df: pd.DataFrame, model, n_predict: int = 100) -> pd.DataFrame:
    """
    Запускает CRISPR-BERT предсказания для первых n_predict строк датафрейма.
    Предполагает наличие 'rnn_encoded', 'bert_token_ids', 'bert_segment_ids'.
    Возвращает df с новой колонкой 'prediction' (вероятность off-target).
    Остальные строки получают "No pred".
    """
    df = df.copy()
    
    # Обрезаем подмножество
    df_subset = df.iloc[:n_predict]
    
    # Собираем входы
    rnn_encoded = np.array(df_subset["rnn_encoded"].tolist())
    token_ids = np.array(df_subset["bert_token_ids"].tolist())
    segment_ids = np.array(df_subset["bert_segment_ids"].tolist())

    # Предсказание
    y_pred = model.predict([rnn_encoded, token_ids, segment_ids])

    # Берём вероятность класса "on-target = 1" (index 1)
    probs = [float(p[1]) for p in y_pred]

    # Заполняем столбец предсказаний
    df["prediction"] = "No pred"
    df.loc[:n_predict - 1, "prediction"] = probs

    return df

In [None]:
# === Настройки ===
DATA_PATH = "models/CRISPR_BERT/datasets/K562.txt"

# === Пайплайн ===
df = load_data_to_df(DATA_PATH, n_rows=1000)
df = prepare_crisprbert_df(df)
df = add_rnn_encoded_column(df)
df = add_bert_encoding_columns(df)

In [80]:
df.head()

Unnamed: 0,sequence,label,sgRNA,off_target,rnn_encoded,bert_token_ids,bert_segment_ids
0,"ag,ac,aa,tt,gg,aa,gg,aa,aa,gg,aa,aa,gg,aa,gg,g...",0,AAATGAGAAGAAGAGGCACAGGG,GCATGAGAAGAAGAGACATAGCC,"[[0, 0, 0, 0, 0, 0, 0], [1, 0, 1, 0, 0, 1, 0],...","[0, 4, 3, 2, 17, 12, 2, 12, 2, 2, 12, 2, 2, 12...","[0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, ..."
1,"ag,aa,aa,tg,ga,aa,gg,aa,aa,gg,aa,aa,gg,aa,gg,g...",0,AAATGAGAAGAAGAGGCACAGGG,GAAGAAGAAGAAGAGGAAGAGGA,"[[0, 0, 0, 0, 0, 0, 0], [1, 0, 1, 0, 0, 1, 0],...","[0, 4, 2, 2, 16, 10, 2, 12, 2, 2, 12, 2, 2, 12...","[0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, ..."
2,"tt,gg,at,cc,aa,tt,cc,aa,aa,tt,tt,aa,tt,tt,aa,t...",0,TGACATCAATTATTATACATCGG,TGTCATCAATTATTAGGATTCGT,"[[0, 0, 0, 0, 0, 0, 0], [0, 1, 0, 0, 0, 0, 0],...","[0, 17, 12, 5, 7, 2, 17, 7, 2, 2, 17, 17, 2, 1...","[0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, ..."
3,"gt,aa,cc,aa,cc,cc,ga,aa,aa,gg,cc,aa,gg,aa,gg,t...",0,GACACCGAAGCAGAGTTTTTAGG,TACACCAAAGCAGAGTTTGGAGA,"[[0, 0, 0, 0, 0, 0, 0], [0, 1, 1, 0, 0, 0, 1],...","[0, 13, 2, 7, 2, 7, 7, 10, 2, 2, 12, 7, 2, 12,...","[0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, ..."
4,"ag,aa,aa,tg,ga,aa,gg,aa,ag,gg,aa,aa,gg,aa,gg,g...",0,AAATGAGAAGAAGAGGCACAGGG,GAAGAAGAGGAAGAGACACAAGG,"[[0, 0, 0, 0, 0, 0, 0], [1, 0, 1, 0, 0, 1, 0],...","[0, 4, 2, 2, 16, 10, 2, 12, 2, 4, 12, 2, 2, 12...","[0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, ..."


In [86]:
WEIGHTS_PATH = "models/CRISPR_BERT/weight/I1.h5"

bert_model = build_bert()
bert_model.load_weights(WEIGHTS_PATH)



Model: "model_22"
__________________________________________________________________________________________________
 Layer (type)                Output Shape                 Param #   Connected to                  
 input_19 (InputLayer)       [(None, 26, 7)]              0         []                            
                                                                                                  
 reshape_12 (Reshape)        (None, 1, 26, 7)             0         ['input_19[0][0]']            
                                                                                                  
 conv2d_24 (Conv2D)          (None, 1, 26, 5)             40        ['reshape_12[0][0]']          
                                                                                                  
 conv2d_25 (Conv2D)          (None, 1, 26, 15)            435       ['reshape_12[0][0]']          
                                                                                           

In [87]:
df_with_preds = run_crisprbert_prediction(df, bert_model, n_predict=100)
df_with_preds[["sgRNA", "off_target", "label", "prediction"]].head(10)



Unnamed: 0,sgRNA,off_target,label,prediction
0,AAATGAGAAGAAGAGGCACAGGG,GCATGAGAAGAAGAGACATAGCC,0,0.0
1,AAATGAGAAGAAGAGGCACAGGG,GAAGAAGAAGAAGAGGAAGAGGA,0,0.0
2,TGACATCAATTATTATACATCGG,TGTCATCAATTATTAGGATTCGT,0,0.0
3,GACACCGAAGCAGAGTTTTTAGG,TACACCAAAGCAGAGTTTGGAGA,0,0.0
4,AAATGAGAAGAAGAGGCACAGGG,GAAGAAGAGGAAGAGACACAAGG,0,0.0
5,GGGTGGGGGGAGTTTGCTCCTGG,GATTTGGGGGAGTTTGCTCAGGC,0,0.0
6,ATGAACACCAGTGAGTAGAGCGG,GTGGGCAACAGTGAGTAGAGCAG,0,5.7e-05
7,GGTCCTGCCGCTGCTTGTCATGG,GGCTCAGCTGGGGCTTGTCATGG,0,0.0
8,AAATGAGAAGAAGAGGCACAGGG,AGATAGGAGGAAGAGGCATTGGG,0,0.0
9,CCTGCCTCCGCTCTACTCACTGG,CCTGCCTCCTCTTTACACATTCA,0,0.0


### Relative activity predictor

In [15]:
from models.RA_predictor.RA_predictor import build_relative_activity_predictor
from keras import backend as K

In [None]:
WEIGHTS_PATH_RA = os.path.join("models", "RA_predictor", "RA_predictor_1.weights.h5")

ra_model = build_relative_activity_predictor()
ra_model.load_weights(WEIGHTS_PATH_RA)

Model: "sequential_1"
_________________________________________________________________
 Layer (type)                Output Shape              Param #   
 conv_1 (Conv2D)             (None, 7, 26, 32)         544       
                                                                 
 dense_2 (Conv2D)            (None, 7, 26, 32)         16416     
                                                                 
 dense_3 (MaxPooling2D)      (None, 7, 13, 32)         0         
                                                                 
 dropout (Dropout)           (None, 7, 13, 32)         0         
                                                                 
 flatten (Flatten)           (None, 2912)              0         
                                                                 
 dense_4 (Dense)             (None, 256)               745728    
                                                                 
 dropout_1 (Dropout)         (None, 256)              

