In [25]:
# Loading libraries
import subprocess
import joblib
from Bio import SeqIO
import pandas as pd
import numpy as np
from sklearn.preprocessing import LabelEncoder
from sklearn.ensemble import RandomForestClassifier

In [92]:
def score_sequence(sequence_file, hmm_file):
    evalues = {}
    sequences = {}
    
    result = subprocess.run(['hmmsearch', "--incE", "1000000", hmm_file, sequence_file], text=True, capture_output=True, check=True)
    output = result.stdout
    if "[No targets detected that satisfy reporting thresholds]" in output:
        return [], [], []
    lines = output.split("E-value")[3].split("Domain annotation for each sequence (and alignments):")[0].split("\n")
    
    lines = lines[2:-3]
    evalues = []
    scores = []
    sequences = []
    for line in lines:
        evalues.append(float(line.split()[0]))
        sequences.append(line.split()[-1])
        scores.append(float(line.split()[1]))
    return evalues, sequences, scores

def build_scoring_matrix(sequences_files, HMM_files):
    scoring_matrix = {}
    scoring_dfs = []
    all_labels = []

    for hmm_index, hmm_file in enumerate(HMM_files):
        scoring_matrix[hmm_file.split(".")[0]] = {}
        all_evalues = []
        all_scores = []
        row_names = []
        labels = []
        lengths = []
        for seq_index, seq_file in enumerate(sequences_files):
         
            evalues, seq_names, scores = score_sequence(seq_file, hmm_file)
            all_evalues.extend(evalues)
            row_names.extend(seq_names)
            all_scores.extend(scores)   
            ground_truth = [seq_file.split(".")[0]] * len(evalues)
            labels.extend(ground_truth)
            for single_seq in SeqIO.parse(seq_file, "fasta"):
                if single_seq.id in row_names:
                    lengths.append(len(single_seq.seq))     

        scoring_matrix[hmm_file.split(".")[0]]["Evalue"] = all_evalues
        scoring_matrix[hmm_file.split(".")[0]][ " score/length"] = np.array(all_scores)/np.array(lengths)
        scoring_matrix[hmm_file.split(".")[0]]["labels"] = labels
        scoring_matrix[hmm_file.split(".")[0]]["sequences"] = row_names
        
    for key, value in scoring_matrix.items():
        temp_df = pd.DataFrame(value).set_index('sequences')  # set 'names' as index
        temp_df.columns = [f'{col}{key}' for col in temp_df.columns]  # rename columns
        scoring_dfs.append(temp_df)
    scoring_data_frame = pd.concat(scoring_dfs, axis=1)
    #print(list(scoring_data_frame.keys()))
# replace None values with np.nan
    scoring_data_frame = scoring_data_frame.replace({None: np.nan})
 
    column_label_names = ["labels"+hmm_file.split(".")[0] for file in sequences_files]
    scoring_data_frame["labels"] = None
    scoring_data_frame["labels"] = scoring_data_frame[column_label_names].bfill(axis=1).iloc[:, 0]
    scoring_data_frame = scoring_data_frame.drop(column_label_names, axis=1)
    return scoring_data_frame

In [169]:
def SNAREClassifier(seq_path, score_null_replace = 0.01, eval_null_replace = 0.9):
    # prot_sequence = SeqIO.read(seq_path, "fasta")

    # Loading the HMM models
    HMM_files = ['Qa.hmm', 'Qb.hmm', 'Qc.hmm', 'R.hmm', 'Snap.hmm']

    # Loading the random forest model
    rf1 = joblib.load('rf_snap.pkl')

    # Scoring the sequence
    scoring_data_frame = build_scoring_matrix([seq_path], HMM_files)

    # Data preprocessing

    # Define the columns to fill null values in
    eval_columns = [col for col in scoring_data_frame.columns if 'Evalue' in col]
    score_columns = [col for col in scoring_data_frame.columns if ' score' in col]

    # Fill null values in the evalue columns
    for column in eval_columns:
        scoring_data_frame[column] = scoring_data_frame[column].fillna(eval_null_replace)

    # Fill null values in the score columns
    for column in score_columns:
        scoring_data_frame[column] = scoring_data_frame[column].fillna(score_null_replace)

   # Define the class mapping
    class_mapping1 = {0: 'Qa', 1: 'Qb', 2: 'Qc', 3: 'R', 4: 'SNAP'}

    # Predicting the labels
    columns_to_drop = ['labels', 'labelsQa', 'labelsQb', 'labelsQc', 'labelsR']
    prediction1 = rf1.predict(scoring_data_frame.drop(columns_to_drop, axis=1))
    prediction_proba1 = rf1.predict_proba(scoring_data_frame.drop(columns_to_drop, axis=1))

    if prediction1[0] == 0:
        HMM_files = ['blueQa.hmm', 'greenQa.hmm', 'redQa.hmm', 'pinkOr.hmm', 'purpleQa.hmm', 'yellowQa.hmm']
        rfQa2 = joblib.load('rfQa2.pkl')

        scoring_data_frame = build_scoring_matrix([seq_path], HMM_files)

        rename_dictQa = {
            ' score/lengthblueQa': ' score/lengthblue',
            ' score/lengthgreenQa': ' score/lengthgreen',
            ' score/lengthpinkOrQa': ' score/lengthpinkOr',
            ' score/lengthpurpleQa': ' score/lengthpurple',
            ' score/lengthredQa': ' score/lengthred',
            ' score/lengthyellowQa': ' score/lengthyellow',
            'EvalueblueQa': 'Evalueblue',
            'EvaluegreenQa': 'Evaluegreen',
            'EvaluepinkOrQa': 'EvaluepinkOr',
            'EvaluepurpleQa': 'Evaluepurple',
            'EvalueredQa': 'Evaluered',
            'EvalueyellowQa': 'Evalueyellow'
        }

        scoring_data_frame = scoring_data_frame.rename(columns=rename_dictQa)

        eval_columns = [col for col in scoring_data_frame.columns if 'Evalue' in col]
        score_columns = [col for col in scoring_data_frame.columns if ' score' in col]

        for column in eval_columns:
            scoring_data_frame[column] = scoring_data_frame[column].fillna(eval_null_replace)
        for column in score_columns:
            scoring_data_frame[column] = scoring_data_frame[column].fillna(score_null_replace)

        columns_to_dropQa = ['labels', 'labelsblueQa', 'labelsgreenQa', 'labelspinkOr', 'labelspurpleQa', 'labelsredQa']
        predictionQa = rfQa2.predict(scoring_data_frame.drop(columns_to_dropQa, axis=1))
        prediction_probaQa = rfQa2.predict_proba(scoring_data_frame.drop(columns_to_dropQa, axis=1))

        class_mappingQa = {0: 'IIIb', 1: 'IIIa', 2: 'IV', 3: 'IVFungi', 4: 'I', 5: 'II'}
        class_label2 = class_mappingQa[predictionQa[0]]
        class_proba2 = prediction_probaQa[0, predictionQa[0]]

    elif prediction1[0] == 1:
        HMM_files = ['blueQb.hmm', 'greenQb.hmm', 'redQb.hmm', 'purpleQb.hmm']
        rfQb = joblib.load('rfQb.pkl')

        scoring_data_frame = build_scoring_matrix([seq_path], HMM_files)

        rename_dictQb = {
            ' score/lengthblueQb': ' score/lengthblue',
            ' score/lengthgreenQb': ' score/lengthgreen',
            ' score/lengthpurpleQb': ' score/lengthpurple',
            ' score/lengthredQb': ' score/lengthred',
            'EvalueblueQb': 'Evalueblue',
            'EvaluegreenQb': 'Evaluegreen',
            'EvaluepurpleQb': 'Evaluepurple',
            'EvalueredQb': 'Evaluered'
        }

        scoring_data_frame = scoring_data_frame.rename(columns=rename_dictQb)

        eval_columns = [col for col in scoring_data_frame.columns if 'Evalue' in col]
        score_columns = [col for col in scoring_data_frame.columns if ' score' in col]

        for column in eval_columns:
            scoring_data_frame[column] = scoring_data_frame[column].fillna(eval_null_replace)

        for column in score_columns:
            scoring_data_frame[column] = scoring_data_frame[column].fillna(score_null_replace)

        columns_to_dropQb = ['labels', 'labelsblueQb', 'labelsgreenQb', 'labelsredQb']
        predictionQb = rfQb.predict(scoring_data_frame.drop(columns_to_dropQb, axis=1))
        prediction_probaQb = rfQb.predict_proba(scoring_data_frame.drop(columns_to_dropQb, axis=1))

        class_mappingQb = {0: 'II ER-Golgi', 1: 'IIIb', 2: 'I', 3: 'II Intra-Golgi'}
        class_label2 = class_mappingQb[predictionQb[0]]
        class_proba2 = prediction_probaQb[0, predictionQb[0]]

    elif prediction1[0] == 2:
        HMM_files = ['blueQc.hmm', 'purpleQc.hmm', 'redQc.hmm', 'greenQc.hmm']
        rfQc = joblib.load('rfQc.pkl')

        scoring_data_frame = build_scoring_matrix([seq_path], HMM_files)

        rename_dictQc = {
            ' score/lengthblueQc': ' score/lengthblue',
            ' score/lengthgreenQc': ' score/lengthgreen',
            ' score/lengthpurpleQc': ' score/lengthpurple',
            ' score/lengthredQc': ' score/lengthred',
            'EvalueblueQc': 'Evalueblue',
            'EvaluegreenQc': 'Evaluegreen',
            'EvaluepurpleQc': 'Evaluepurple',
            'EvalueredQc': 'Evaluered'
        }

        scoring_data_frame = scoring_data_frame.rename(columns=rename_dictQc)

        eval_columns = [col for col in scoring_data_frame.columns if 'Evalue' in col]
        score_columns = [col for col in scoring_data_frame.columns if ' score' in col]

        for column in eval_columns:
            scoring_data_frame[column] = scoring_data_frame[column].fillna(eval_null_replace)

        for column in score_columns:
            scoring_data_frame[column] = scoring_data_frame[column].fillna(score_null_replace)

        columns_to_dropQc = ['labels', 'labelsblueQc', 'labelspurpleQc', 'labelsredQc']
        predictionQc = rfQc.predict(scoring_data_frame.drop(columns_to_dropQc, axis=1))
        prediction_probaQc = rfQc.predict_proba(scoring_data_frame.drop(columns_to_dropQc, axis=1))

        class_mappingQc = {0: 'I', 1: 'IIIc', 2: 'IIIb', 3: 'II'}
        class_label2 = class_mappingQc[predictionQc[0]]
        class_proba2 = prediction_probaQc[0, predictionQc[0]]


    
    elif prediction1[0] == 3:
        HMM_files = ['yellowR.hmm', 'purpleR.hmm', 'redR.hmm', 'blueR.hmm', 'greenR.hmm']
        rfR = joblib.load('rfR.pkl')

        scoring_data_frame = build_scoring_matrix([seq_path], HMM_files)

        rename_dictR = {
            ' score/lengthblueR': ' score/lengthblue',
            ' score/lengthgreenR': ' score/lengthgreen',
            ' score/lengthpurpleR': ' score/lengthpurple',
            ' score/lengthredR': ' score/lengthred',
            ' score/lengthyellowR': ' score/lengthyellow',
            'EvalueblueR': 'Evalueblue',
            'EvaluegreenR': 'Evaluegreen',
            'EvaluepurpleR': 'Evaluepurple',
            'EvalueredR': 'Evaluered',
            'EvalueyellowR': 'Evalueyellow'
        }

        # Rename the columns
        scoring_data_frame = scoring_data_frame.rename(columns=rename_dictR)

        eval_columns = [col for col in scoring_data_frame.columns if 'Evalue' in col]
        score_columns = [col for col in scoring_data_frame.columns if ' score' in col]

        for column in eval_columns:
            scoring_data_frame[column] = scoring_data_frame[column].fillna(eval_null_replace)

        for column in score_columns:
            scoring_data_frame[column] = scoring_data_frame[column].fillna(score_null_replace)


        columns_to_dropR = ['labels', 'labelsyellowR', 'labelspurpleR', 'labelsredR', 'labelsblueR']
        predictionR = rfR.predict(scoring_data_frame.drop(columns_to_dropR, axis=1))
        prediction_probaR = rfR.predict_proba(scoring_data_frame.drop(columns_to_dropR, axis=1))

        class_mappingR = {0: 'III', 1: 'Reg', 2: 'II', 3: 'IV', 4: 'I'}
        class_label2 = class_mappingR[predictionR[0]]
        class_proba2 = prediction_probaR[0, predictionR[0]]

    elif prediction1[0] == 4:
        HMM_files = ['blueSnap.hmm', 'greenSnap.hmm', 'redSnap.hmm']
        rfSnap = joblib.load('rfSnap.pkl')
        scoring_data_frame = build_scoring_matrix([seq_path], HMM_files)
        
        rename_dictSnap = {
            ' score/lengthblueSnap': ' score/lengthblue',
            ' score/lengthgreenSnap': ' score/lengthgreen',
            ' score/lengthredSnap': ' score/lengthred',
            'EvalueblueSnap': 'Evalueblue',
            'EvaluegreenSnap': 'Evaluegreen',
            'EvalueredSnap': 'Evaluered'
        }
        scoring_data_frame = scoring_data_frame.rename(columns=rename_dictSnap)
        eval_columns = [col for col in scoring_data_frame.columns if 'Evalue' in col]
        score_columns = [col for col in scoring_data_frame.columns if ' score' in col]
        for column in eval_columns:
            scoring_data_frame[column] = scoring_data_frame[column].fillna(eval_null_replace)

        for column in score_columns:
            scoring_data_frame[column] = scoring_data_frame[column].fillna(score_null_replace)

        columns_to_dropSnap = ['labels', 'labelsblueSnap', 'labelsgreenSnap']
        predictionSnap = rfSnap.predict(scoring_data_frame.drop(columns_to_dropSnap, axis=1))
        prediction_probaSnap = rfSnap.predict_proba(scoring_data_frame.drop(columns_to_dropSnap, axis=1))
        class_mappingSnap = {0: 'SN29', 1: 'sec9 Fungi', 2: 'SN25'}
        class_label2 = class_mappingSnap[predictionSnap[0]]
        class_proba2 = prediction_probaSnap[0, predictionSnap[0]]

        

    # Get the class label
    class_label1 = class_mapping1[prediction1[0]]

    # Get the probability of the predicted class
    class_proba1 = prediction_proba1[0, prediction1[0]]

    return f"Main group: {class_label1} with probability {class_proba1},  Sub group: {class_label2} with probability {class_proba2}"
    

 

In [None]:
SNAREClassifier("testQa.fasta")

In [None]:
import os

def classify_sequences(fasta_file):
    # Read the fasta file
    sequences = SeqIO.parse(fasta_file, "fasta")

    # Apply the SNAREClassifier function to each sequence
    for sequence in sequences:
        # Delete the temporary fasta file if it exists
        if os.path.exists("temp.fasta"):
            os.remove("temp.fasta")
        
        # Write the sequence to a temporary fasta file
        with open("temp.fasta", "w") as temp_file:
            SeqIO.write(sequence, temp_file, "fasta")
        
        # Apply the SNAREClassifier function
        result = SNAREClassifier("temp.fasta")
        
        # Print or store the result
        print(f"Result for sequence {sequence.id}: {result}\n ")

classify_sequences("test.fasta")