In [137]:
import os
import numpy as np
import pandas as pd
from tqdm import tqdm
from pprint import pprint


workdir = r"./dataset/2023.11.20_mutation"
info = pd.read_excel(os.path.join(workdir, "selected_mutation_list_SJT002HCC.xlsx"))

## get seq from ensembl

In [138]:
import requests
from Bio.SeqUtils import MeltingTemp as mt


def padlock_thre(Tm_left, Tm_right, left, right, Tm_dif_thre=10, Tm_sing_thre=45, bind_sing_thre=10):
    flag = False
    if abs(Tm_left - Tm_right) < Tm_dif_thre:
        if Tm_left > Tm_sing_thre:
            if Tm_right > Tm_sing_thre:
                flag = True
            else:
                right += 1
        else:
            left += 1
    elif Tm_left > Tm_right:
        left -= 1
        right += 1
        if left <= bind_sing_thre:
            flag = True
    else:
        left += 1
        right -= 1
        if right <= bind_sing_thre:
            flag = True
    return left, right, flag


def fetch_and_label_sequences(positions, gap=50, strand=1, left_length=20, right_length=20, Tm_dif_thre=10,Tm_sing_thre=45, bind_sing_thre=10):
    # Ensembl REST API URL for GRCh37 batch sequence fetching
    server = "https://rest.ensembl.org"
    sequences_info = []
    coord_system_version = "GRCh37"

    for position in tqdm(positions):
        left = left_length
        right = right_length
        # Adjust start and end for the extra 50 base pairs
        chromosome = position["chr"].replace("chr", "")
        adjusted_start = int(position["start"])  # Ensure start is not less than 1
        adjusted_end = int(position["end"])
        ext = f"/sequence/region/human/{chromosome}:{adjusted_start}..{adjusted_end}:{strand}?"

        options = ";".join([
                # 'content-type=text/x-fasta',
                "content-type=application/json",
                f"coord_system_version={coord_system_version}",
                f"expand_3prime={gap}",
                f"expand_5prime={gap}",
            ])

        response = requests.get(server + ext + options)

        if response.ok:
            decoder = response.json()
            seq = decoder["seq"]
            while True:
                binding_left = seq[gap + 1 + adjusted_end-adjusted_start - left: gap + 1 + adjusted_end-adjusted_start]
                binding_right = seq[-gap : -gap + right]
                Tm_left = mt.Tm_NN(binding_left, nn_table=mt.R_DNA_NN1)
                Tm_right = mt.Tm_NN(binding_right, nn_table=mt.R_DNA_NN1)
                left, right, flag = padlock_thre(Tm_left, Tm_right, left, right, 
                                                 Tm_dif_thre=Tm_dif_thre, Tm_sing_thre=Tm_sing_thre, 
                                                 bind_sing_thre=bind_sing_thre)
                if flag:
                    break
                
            decoder['gene'] = position['gene']
            decoder["binding_left"] = binding_left
            decoder["Tm_left"] = Tm_left
            decoder["binding_right"] = binding_right
            decoder["Tm_right"] = Tm_right
            sequences_info.append(decoder)
        else:
            response.raise_for_status()

    return sequences_info

In [139]:
# Example usage
positions = [
    {
        "gene": info.loc[line, "Gene.refGene"],
        "chr": info.loc[line, "Chr"],
        "start": info.loc[line, "Start"],
        "end": info.loc[line, "End"],
    }
    for line in info.index
]

gap = 50
sequences = fetch_and_label_sequences(positions, gap=gap, strand=1)

  0%|          | 0/61 [00:00<?, ?it/s]

100%|██████████| 61/61 [01:28<00:00,  1.46s/it]


## perform mutation 

In [140]:
from Bio.SeqUtils import MeltingTemp as mt


def binding_mutation(mutation, seq, adjusted_start, adjusted_end, left, right):
    if mutation["ref"] == "-":
        mut_seq = seq[:adjusted_end] + mutation["alt"] + seq[adjusted_end:]
        add = len(mutation["alt"])
        if add < left:
            binding_left = seq[:adjusted_end][add - left :] + mutation["alt"]
        else:
            binding_left = mutation["alt"][-left:]
        binding_right = seq[adjusted_end:][:right]

    elif mutation["alt"] == "-":
        mut_seq = seq[:adjusted_start] + seq[adjusted_end:]
        binding_left = seq[:adjusted_start][-left:]
        binding_right = seq[adjusted_end:][:right]

    else:
        mut_seq = seq[:adjusted_start] + mutation["alt"] + seq[adjusted_end:]
        add = len(mutation["alt"])
        if add < left:
            binding_left = seq[:adjusted_start][add - left :] + mutation["alt"]
        else:
            binding_left = mutation["alt"][-left:]
        binding_right = seq[adjusted_end:][:right]
    return mut_seq, binding_left, binding_right


def perform_mutation(
    positions,
    mutations,
    sequences,
    gap=50,
    left_length=20,
    right_length=20,
    Tm_thre=45, 
    Tm_dif_thre=10,
    bind_single_thre=10,
):
    mutation_seqs = []
    for _ in range(len(mutations)):
        position = positions[_]
        mutation = mutations[_]
        sequence = sequences[_]
        left, right = left_length, right_length

        adjusted_start = gap  # Ensure start is not less than 1
        adjusted_end = int(position["end"] - position["start"] + gap + 1)
        seq = sequence["seq"]

        while True:
            mut_seq, binding_left, binding_right = binding_mutation(
                mutation, seq, adjusted_start, adjusted_end, left, right
            )
            Tm_left = mt.Tm_NN(binding_left, nn_table=mt.R_DNA_NN1)
            Tm_right = mt.Tm_NN(binding_right, nn_table=mt.R_DNA_NN1)
            left, right, flag = padlock_thre(Tm_left, Tm_right, left, right, 
                                             Tm_dif_thre=Tm_dif_thre, Tm_sing_thre=Tm_thre, 
                                             bind_sing_thre=bind_single_thre)
            if flag:
                break

        mutation_seq_info = {
            "gene": sequence["gene"],
            "id": sequence["id"],
            "molecule": sequence["molecule"],
            "seq": mut_seq,
            "binding_left": binding_left,
            "Tm_left": Tm_left,
            "binding_right": binding_right,
            "Tm_right": Tm_right,
        }

        mutation_seqs.append(mutation_seq_info)

    return mutation_seqs

In [141]:
mutations = [
    {
        "ref": info.loc[line, "Ref"],
        "alt": info.loc[line, "Alt"],
    }
    for line in info.index
]

mut_sequences = perform_mutation(
    positions=positions, mutations=mutations, sequences=sequences, gap=50
)

## generate fasta file

In [146]:
from Bio import SeqIO
from Bio.Seq import Seq
from Bio.SeqRecord import SeqRecord


def generate_binding_fastq(sequences, outpath):
    with open(outpath, "w") as f:
        for sequence in sequences:
            record = SeqRecord(
                Seq(sequence["binding_left"] + sequence["binding_right"]),
                id=sequence["id"],
                description=sequence['gene'],
            )
            SeqIO.write(record, f, "fasta")

In [147]:
generate_binding_fastq(sequences=sequences, outpath=os.path.join(workdir, 'non-mut-binding_Tm_revised.fasta'))
generate_binding_fastq(sequences=mut_sequences, outpath=os.path.join(workdir, 'mut-binding_Tm_revised.fasta'))

## blast_test

In [148]:
from Bio.Blast import NCBIXML
from copy import deepcopy

def extract_blast(sequences, blast_results):
    sequences_tmp = deepcopy(sequences)
    # read the id/plus-minus part/align_num
    with open(blast_results, "r") as blast_output:
        blast_records = NCBIXML.parse(blast_output)
        for num, blast_record in enumerate(blast_records):
            length = len(blast_record.alignments)
            sequences_tmp[num]["align_num"] = length
            sequences_tmp[num]['descrip'] = dict()
            for i in range(length):
                pm = blast_record.alignments[i].hsps[0].frame[1]
                sequences_tmp[num]["descrip"][i+1] = (
                    blast_record.descriptions[i].title + f"| pm={pm}"
                )

    return sequences_tmp

In [150]:
sequences_with_blast = extract_blast(sequences=sequences, blast_results=os.path.join(workdir, 'non_mut_revised.xml'))
mut_sequences_with_blast = extract_blast(sequences=mut_sequences, blast_results=os.path.join(workdir, 'mut_revised.xml'))

## perform minus for positive combined seq

In [207]:
def trans(seq):
    translib = {"A": "T", "T": "A", "C": "G", "G": "C"}
    seq_minus = "".join(list(reversed([translib[i] for i in seq])))
    return seq_minus


def specificity_judge(sequence_with_blast):
    gene = sequence_with_blast['gene']
    # print(gene)
    if ';' in gene:
        gene = list(gene.split(';'))
    else:
        gene = [gene]

    perform_minus = False
    specify = True
    if len(sequence_with_blast['descrip'].values()) == 0:
        pass
    else:
        for desc in sequence_with_blast['descrip'].values():
            flag = True
            for subgene in gene:
                if subgene.upper() in desc.upper():
                    flag = False
                    pm = int(desc.split('|')[-1].split('=')[-1])
                    if pm == 1:
                        perform_minus = True
            if flag:
                specify = False
    return perform_minus, specify
    
    
def minus(sequences_with_blast, mut_sequences_with_blast):
    sequences_final = []
    mut_sequences_final = []
    for _ in range(len(sequences_with_blast)):
        seq_record = {}
        mut_seq_record = {}

        sequence_with_blast = sequences_with_blast[_]
        mut_sequence_with_blast = mut_sequences_with_blast[_]

        seq_record['gene'] = sequence_with_blast['gene']
        mut_seq_record['gene'] = mut_sequence_with_blast['gene']
        
        perform_minus, seq_record['specificity'] = specificity_judge(sequence_with_blast)
        _, mut_seq_record['specificity'] = specificity_judge(mut_sequence_with_blast)
        
        seq_record['perform_minus'] = perform_minus
        mut_seq_record['perform_minus'] = perform_minus
 
        if perform_minus:
            seq_record['binding_left'] = trans(sequence_with_blast['binding_right'])
            seq_record['binding_right'] = trans(sequence_with_blast['binding_left'])
            seq_record['Tm_left'] = sequence_with_blast['Tm_right']
            seq_record['Tm_right'] = sequence_with_blast['Tm_left']
            seq_record['seq'] = sequence_with_blast['seq']
            
            mut_seq_record['binding_left'] = trans(mut_sequence_with_blast['binding_right'])
            mut_seq_record['binding_right'] = trans(mut_sequence_with_blast['binding_left'])
            mut_seq_record['Tm_left'] = mut_sequence_with_blast['Tm_right']
            mut_seq_record['Tm_right'] = mut_sequence_with_blast['Tm_left']
            mut_seq_record['seq'] = mut_sequence_with_blast['seq']

        else:
            seq_record['binding_left'] = sequence_with_blast['binding_left']
            seq_record['binding_right'] = sequence_with_blast['binding_right']
            seq_record['Tm_left'] = sequence_with_blast['Tm_left']
            seq_record['Tm_right'] = sequence_with_blast['Tm_right']
            seq_record['seq'] = sequence_with_blast['seq']
            
            mut_seq_record['binding_left'] = mut_sequence_with_blast['binding_left']
            mut_seq_record['binding_right'] = mut_sequence_with_blast['binding_right']
            mut_seq_record['Tm_left'] = mut_sequence_with_blast['Tm_left']
            mut_seq_record['Tm_right'] = mut_sequence_with_blast['Tm_right']
            mut_seq_record['seq'] = mut_sequence_with_blast['seq']
            
        sequences_final.append(seq_record)
        mut_sequences_final.append(mut_seq_record)
    
    return sequences_final, mut_sequences_final

In [208]:
sequences_final, mut_sequences_final = minus(sequences_with_blast, mut_sequences_with_blast)

In [211]:
interest = ['gene', 'binding_left','binding_right','Tm_left','Tm_right','specificity','perform_minus', 'seq']

df_dict = dict()
for inte in interest:
    df_dict[inte] = []

for _ in range(len(sequences_final)):
    sequence = sequences_final[_]
    mut_sequence = mut_sequences_final[_]
    for inte in interest:
        if inte == 'gene':
            df_dict[inte] += [sequence[inte], mut_sequence[inte]+'_mut']
        else:
            df_dict[inte] += [sequence[inte], mut_sequence[inte]]
    

In [None]:
df = pd.DataFrame(df_dict)
df['binding'] = df['binding_left'] + df['binding_right']
df = df[['gene', 'binding', 'binding_left','binding_right','Tm_left','Tm_right','specificity','perform_minus', 'seq']]
df.to_excel(os.path.join(workdir, 'binding_site.xlsx'))
df

## threshold by blast results

In [153]:
def thre_by_blast(sequences, ):
    non_specific = []
    for sequence in sequences:
        gene = sequence['gene']
        descrip = sequence['descrip']
        flag = True
        for desc in descrip.values():
            if gene.upper() not in desc.upper():
                flag = False
        if not flag:
            non_specific.append(sequence)
    
    return non_specific

In [None]:
non_specific = thre_by_blast(sequences=sequences_with_blast)
pprint(non_specific)

In [None]:
non_specific = thre_by_blast(sequences=mut_sequences_with_blast)
pprint(non_specific)