<a href="https://colab.research.google.com/github/yc386/orthrus_metaproteomics/blob/main/orthrus_cloud_stable_v100/unit_test_matching_Bayes_ranking.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
%pip install -q ipytest pytest
import ipytest
ipytest.autoconfig()

[?25l   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m0.0/1.6 MB[0m [31m?[0m eta [36m-:--:--[0m[2K   [91m━━━━━━━━━━[0m[91m╸[0m[90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m0.4/1.6 MB[0m [31m14.4 MB/s[0m eta [36m0:00:01[0m[2K   [91m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m[91m╸[0m [32m1.6/1.6 MB[0m [31m31.3 MB/s[0m eta [36m0:00:01[0m[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m1.6/1.6 MB[0m [31m21.7 MB/s[0m eta [36m0:00:00[0m
[?25h

In [None]:
import pandas as pd
import re
from itertools import chain
import logging as log
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import MinMaxScaler
from sklearn.naive_bayes import GaussianNB
from sklearn.metrics import accuracy_score, precision_score, f1_score


# ----------------------------- matching & Bayes ranking ------------------------------- #


#prepare overlapping sequence tags for string matching
def get_seq_tags(sequence: str, k: int):
    return set(sequence[i:i+k] for i in range(len(sequence) - k + 1))


#change chunk size here for memory if needed
def matching_count_v5(fasta_df: pd.DataFrame, casanovo_df: pd.DataFrame, k: int, chunk_size: int=10000):
    log.info("Generating sequence tags (k=%d)...", k)
    sequence_set = get_seq_tags(''.join(chain.from_iterable(casanovo_df['sequence_naked'].astype(str))), k)
    log.info("Generated %d unique tags from Casanovo outputs.", len(sequence_set))
    result_df = pd.DataFrame()
    for start in range(0, len(fasta_df), chunk_size):
        chunk = fasta_df.iloc[start:start+chunk_size].copy()
        chunk['seq_tags'] = chunk['Sequence'].astype(str).str.replace('I', 'L').apply(lambda x: get_seq_tags(x, k))
        chunk['matched_count'] = chunk['seq_tags'].apply(lambda seq_tags: len(seq_tags & sequence_set))
        chunk = chunk.assign(matched=chunk['matched_count'].apply(lambda x: 1 if x >= 2 else 0))
        result_df = pd.concat([result_df, chunk], ignore_index=True)
    log.info(
        "Tag matching complete: total matched tag counts=%d",
        int(result_df["matched_count"].sum()))
    return result_df


#get tryptic peptides per database entry
def count_tryptic_peptides(sequence: str):
    pattern=r'(?<=[KR])'
    peptides = re.split(pattern, sequence)
    filtered_peptides = [peptide for peptide in peptides if len(peptide) >= 6]
    return len(filtered_peptides)


#prepare a dataframe for NB classification
def prep_Bayes(df: pd.DataFrame):
    df1=df.assign(length=df['Sequence'].astype(str).str.len(),
                 tryptic_count=df['Sequence'].apply(count_tryptic_peptides),
                 tag_count=df['seq_tags'].apply(len))
    df2=df1.assign(SAF=df1['matched_count']/df1['length'],
                 try_ratio=df1['tryptic_count']/df1['tag_count'])
    return df2


# bayes ranking
def get_bayes_ranking_test(df: pd.DataFrame, threshold: float = 0.95):
    m=prep_Bayes(df)
    required_columns = {'SAF', 'try_ratio', 'matched'}
    if not required_columns.issubset(m.columns):
        missing = required_columns - set(m.columns)
        raise ValueError(f"Missing columns in DataFrame: {missing}")
    m1 = m[m['tag_count']>0]
    X = m1[['SAF', 'try_ratio']].to_numpy()
    y = m1['matched'].to_numpy()
    scaler = MinMaxScaler()
    X_scaled = scaler.fit_transform(X.reshape(-1, 1)).reshape(*X.shape)
    X_train, X_test, y_train, y_test = train_test_split(X_scaled, y, test_size=0.2, random_state=7)
    gnb = GaussianNB()
    gnb.fit(X_train, y_train)
    y_pred = gnb.predict(X_test)
    accuracy = accuracy_score(y_test, y_pred)
    precision = precision_score(y_test, y_pred)
    f1 = f1_score(y_test, y_pred)
    log.info("GaussianNB ▶ accuracy=%.4f, precision=%.4f, f1=%.4f", accuracy, precision, f1)
    whole_pred = gnb.predict(X_scaled)
    class_probabilities = gnb.predict_proba(X_scaled)
    m2 = m1.assign(pred=class_probabilities[:, 1])
    m3 = m2[m2['pred']>=threshold]
    log.info("Shortlisted %d proteins at ≥ %.2f.", m3.shape[0], threshold)
    return m3

In [None]:
%%ipytest -v
import pandas as pd
import pytest


def test_get_seq_tags_basic():
    assert get_seq_tags("ABCDE", 2) == {"AB", "BC", "CD", "DE"}
    assert get_seq_tags("ABCDE", 6) == set()
    assert get_seq_tags("", 1) == set()

@pytest.mark.parametrize(
    "seq,expected",
    [
        ("AAAAAA", 1),
        ("AAAARBBB", 0),
        ("AAAAAKBBBBBB", 2),
    ],
)


def test_count_tryptic_peptides(seq, expected):
    assert count_tryptic_peptides(seq) == expected


@pytest.fixture()
def sample_data():
    casanovo_df = pd.DataFrame({
        "sequence_naked": ["ACDEFGHIK", "LMNPQRSTV", "ACDLMN"]
    })
    fasta_df = pd.DataFrame({
        "Accession": ["prot1", "prot2", "prot3"],
        "Sequence": ["ACDEFGHIKLMNP", "TTTTTIKKKKKK", "QRSTVACDLMNXX"]
    })
    return fasta_df, casanovo_df, 3  # k=3


def test_matching_count_v5_structure(sample_data):
    fasta_df, casanovo_df, k = sample_data
    out = matching_count_v5(fasta_df, casanovo_df, k, chunk_size=2)
    for col in ["seq_tags", "matched_count", "matched"]:
        assert col in out.columns
    assert all(isinstance(x, set) for x in out["seq_tags"])
    assert set(out["matched"].unique()).issubset({0, 1})


def test_prep_Bayes_columns(sample_data):
    fasta_df, casanovo_df, k = sample_data
    out = matching_count_v5(fasta_df, casanovo_df, k)
    prepped = prep_Bayes(out)
    assert {"length","tryptic_count","tag_count","SAF","try_ratio"}.issubset(prepped.columns)
    assert (prepped["tag_count"] == prepped["seq_tags"].apply(len)).all()


def test_get_bayes_ranking_end_to_end(sample_data):
    fasta_df, casanovo_df, k = sample_data
    out = matching_count_v5(fasta_df, casanovo_df, k)
    ranked = get_bayes_ranking_test(out, threshold=0.0)
    assert "pred" in ranked.columns
    assert (ranked["tag_count"] > 0).all()


def test_bayes_missing_columns_error(sample_data):
    fasta_df, casanovo_df, k = sample_data
    out = matching_count_v5(fasta_df, casanovo_df, k).drop(columns=["matched"])
    with pytest.raises(ValueError):
        get_bayes_ranking_test(out)

platform linux -- Python 3.12.11, pytest-8.4.2, pluggy-1.6.0
rootdir: /content
plugins: typeguard-4.4.4, langsmith-0.4.27, anyio-4.10.0
collected 8 items

t_74883817bd6b42bc9459ed7eaf71c200.py [32m.[0m[32m.[0m[32m.[0m[32m.[0m[32m.[0m[32m.[0m[32m.[0m[32m.[0m[32m                                               [100%][0m

