# core

> Core utilities for in-silico protein digestion and peptide analysis

This module provides the foundational functions for proteomics workflows, including FASTA file parsing, enzymatic digestion simulation, and peptide property calculations.

In [None]:
#| default_exp core

In [None]:
#| hide
from nbdev.showdoc import *

In [None]:
#| export
from __future__ import annotations
from typing import Dict, Union, Iterable
from pyteomics import parser
from pyteomics import mass
from pathlib import Path
from typing import Union
from Bio import SeqIO
from tqdm import tqdm
import pandas as pd
import numpy as np
import re
import os


In [None]:
#| export
# Get the repository root
if 'GITHUB_WORKSPACE' in os.environ:
    # In GitHub Actions
    REPO_ROOT = Path(os.environ['GITHUB_WORKSPACE'])
else:
    # Local development - find repo root
    REPO_ROOT = Path.cwd()
    while not (REPO_ROOT / 'settings.ini').exists():
        if REPO_ROOT == REPO_ROOT.parent:
            REPO_ROOT = Path.cwd()  # Fallback
            break
        REPO_ROOT = REPO_ROOT.parent

TEST_DATA = REPO_ROOT / 'test_data'

print(f"Repo root: {REPO_ROOT}")
print(f"Test data dir: {TEST_DATA}")
print(f"Test data exists: {TEST_DATA.exists()}")

Repo root: /Users/mtinti/git_projects/protein_cutter
Test data dir: /Users/mtinti/git_projects/protein_cutter/test_data
Test data exists: True


In [None]:
#| export
def load_fasta(fasta_path: Union[str, Path]) -> Dict[str, str]:
    """
    Load protein sequences from a FASTA file into a dictionary.
    
    Parses a FASTA file and returns a dictionary mapping protein
    identifiers to their sequences.
    
    Parameters
    ----------
    fasta_path : str or Path
        Path to the FASTA file.
    
    Returns
    -------
    Dict[str, str]
        Dictionary with protein IDs as keys and sequences as values.
    
    Raises
    ------
    FileNotFoundError
        If the FASTA file does not exist.
    ValueError
        If no sequences are found in the file.
    
    Examples
    --------
    ```python
        proteins = load_fasta("uniprot_human.fasta")
        print(f"Loaded {len(proteins)} proteins")
        
        # Access a specific protein
        sequence = proteins["P12345"]
    ```
    """

    path = os.path.abspath(os.curdir)
    print(path)
    
    fasta_path = Path(fasta_path)
    if not fasta_path.exists():
        raise FileNotFoundError(f"FASTA file not found: {fasta_path} {path}")
    
    protein_dict = {}
    with fasta_path.open('r') as handle:
        for record in SeqIO.parse(handle, "fasta"):
            protein_dict[record.id] = str(record.seq)
    
    if not protein_dict:
        raise ValueError(f"No sequences found in FASTA file: {fasta_path} {path}")
    
    return protein_dict

In [None]:
print(1)

1


In [None]:
#| export
protein_dict = load_fasta(TEST_DATA / 'test_sequence.fa')

/Users/mtinti/git_projects/protein_cutter/nbs


In [None]:
#| hide
assert(protein_dict['P15497'][0:5]=='MKAVV')

In [None]:
#| export
import re

protease_dict = dict()
protease_dict['trypsin_full'] = '[KR]'
protease_dict["trypsin"] = "([KR](?=[^P]))"



In [None]:
#| export
import re

def get_cleavage_sites(sequence: str, protease: str) -> list[int]:
    """
    Get the position of proteolytic cleavage sites in a sequence.
    """
    if protease not in protease_dict:
        raise ValueError(f"Unknown protease: {protease}. Available: {list(protease_dict.keys())}")
    
    pattern = re.compile(protease_dict[protease])
    return [m.start(0) for m in pattern.finditer(sequence)]

#Re-implementation of parser.xcleave from pyteomics
# to have full control on regular expression
# and to implement excision of n-term M
def xcleave(
    sequence: str,
    protease: str,
    missed_cleavages: int = 0,
    #N-terminal methionine excision (NME)
    nme: bool = True
    ) -> list[tuple[int, str]]:
    
    """
    Cleave a protein sequence and return peptides with their start positions.
    
    Works with any protease defined in protease_dict using regular expressions.
    Handles both C-terminal cleavage (e.g., trypsin) and N-terminal cleavage
    (e.g., Lys-N, Asp-N) enzymes.
    
    Parameters
    ----------
    sequence : str
        Protein amino acid sequence to digest.
    protease : str
        Protease name from protease_dict.
    missed_cleavages : int
        Number of allowed missed cleavage sites. Default is 0.
    
    Returns
    -------
    list of tuple (int, str)
        List of tuples containing start index and peptide sequence.
    """
    sites = get_cleavage_sites(sequence, protease)
    
    if not sites:
        return [(0, sequence)]
    
    # Determine if N-terminal or C-terminal cleavage based on regex
    # N-terminal enzymes use lookahead (?=X) - cut BEFORE the residue
    # C-terminal enzymes match the residue - cut AFTER the residue
    regex = protease_dict[protease]
    is_n_terminal = regex.startswith('(?=')
    
    if is_n_terminal:
        # Cut BEFORE the matched position
        cut_points = [0] + sites + [len(sequence)]
    else:
        # Cut AFTER the matched position
        cut_points = [0] + [s + 1 for s in sites] + [len(sequence)]
    
    peptides = []
    n_cuts = len(cut_points)
    
    for mc in range(missed_cleavages + 1):
        for i in range(n_cuts - mc - 1):
            start_pos = cut_points[i]
            end_pos = cut_points[i + mc + 1]
            peptide = sequence[start_pos:end_pos]
            peptides.append((start_pos, peptide))

    if nme:
        if peptides[0][1][0]=='M':
            #add the first peptide without the M
            peptides.append((1,peptides[0][1][1:]))
        
    peptides.sort(key=lambda x: (x[0], len(x[1])))
    
    return peptides

In [None]:
#| hide
def test_get_cleavage_sites():
    cleavage_sites = get_cleavage_sites("PEPTIDERANGEKATRATKPAA", "trypsin")
    #print(cleavage_sites)
    np.testing.assert_equal(cleavage_sites, [7, 12, 15])
    cleavage_sites = get_cleavage_sites("PEPTIDERANGEKATRATKPAA", "trypsin_full")
    np.testing.assert_equal(cleavage_sites, [7, 12, 15, 18])
    #print(cleavage_sites)

test_get_cleavage_sites()

In [None]:
#| hide
test_sequence = "MPEPTIDERANGEKATRATKPAA"

print(f"Sequence: {test_sequence}\n")

enzymes_to_test = ['trypsin', 'trypsin_full']

for enzyme in enzymes_to_test:
    print(f"{enzyme} (regex: {protease_dict[enzyme]}):")
    sites = get_cleavage_sites(test_sequence, enzyme)
    print(f"  Cleavage sites: {sites}")
    result = xcleave(test_sequence, enzyme, missed_cleavages=0)
    print(f"  Peptides: {[pep for _, pep in result]}")
    print()

Sequence: MPEPTIDERANGEKATRATKPAA

trypsin (regex: ([KR](?=[^P]))):
  Cleavage sites: [8, 13, 16]
  Peptides: ['MPEPTIDER', 'PEPTIDER', 'ANGEK', 'ATR', 'ATKPAA']

trypsin_full (regex: [KR]):
  Cleavage sites: [8, 13, 16, 19]
  Peptides: ['MPEPTIDER', 'PEPTIDER', 'ANGEK', 'ATR', 'ATK', 'PAA']



In [None]:
#parser.icleave??

In [None]:
#| export

def digest(
    sequence: str,
    protein_id: str,
    enzyme: str = "trypsin_full",
    missed_cleavages: int = 1,
    charge_states: list[int] | None = None,
    mass_range: tuple[float, float] = (800.0, 4000.0),
    min_pep_length: int = 5,
    max_pep_length: int = 35,
    fixed_mods: dict[str, float] | None = None,
    sort_by_mass: bool = False ) -> pd.DataFrame:
    
    """
    Perform in silico digestion of a protein sequence.
    
    Digests a protein with the specified enzyme and returns a DataFrame
    containing peptide information including masses, m/z values for multiple
    charge states, and flanking amino acids. Useful for creating digestion
    reports, calculating theoretical peptide coverage, and emPAI calculations.
    
    Parameters
    ----------
    sequence : str
        Protein amino acid sequence to digest.
    protein_id : str
        Protein identifier to include in the output DataFrame.
    enzyme : str
        Enzyme name for digestion. Supported enzymes include trypsin,
        lysc, gluc, chymotrypsin, and others from pyteomics.
        Default is trypsin.
    missed_cleavages : int
        Number of allowed missed cleavage sites. Default is 1.
    charge_states : list of int or None
        Charge states for m/z calculation. Default is 1 through 6.
    mass_range : tuple of float
        Min and max monoisotopic mass filter in Daltons.
        Default is 800.0 to 4000.0 Da.
    min_pep_length : int
        Minimum peptide length to retain. Default is 5.
    max_pep_length : int
        Maximum peptide length to retain. Default is 35.
    fixed_mods : dict or None
        Fixed modifications as residue to total mass mapping.
        Default is Carbamidomethyl on Cys with mass 160.0306 Da.
        Pass an empty dict for no modifications.
    sort_by_mass : bool
        Sort output by monoisotopic mass. Default is False.
    
    Returns
    -------
    pd.DataFrame
        DataFrame with the following columns:
        
        - start_index: Start position in protein sequence (0-based)
        - end_index: End position inclusive (0-based)
        - pep_seq: Peptide sequence
        - protein_id: Protein identifier
        - pep_length: Peptide length
        - prev_aa: Previous amino acid or dash at N-terminus
        - next_aa: Next amino acid or dash at C-terminus
        - extended_seq: Sequence with flanking amino acids
        - rep_extended_seq: Extended sequence with parentheses notation
        - mass_mono: Monoisotopic mass in Daltons
        - mz_N: Mass-to-charge ratio for each charge state N
    
    Raises
    ------
    ValueError
        If fixed_mods contains unknown amino acid residues.
    
    Notes
    -----
    The default fixed modification assumes Cys residues are alkylated
    with iodoacetamide during sample preparation, adding 57.02 Da
    to the Cys mass of 103.01 Da for a total of 160.03 Da.
    
    Examples
    --------
    Basic digestion with default parameters:
    ```python
        sequence = "MKTAYIAKQRQISFVKSHFSRQLEERLGLIEVQAPILSR"
        df = digest(sequence, protein_id="P12345")
        print(f"Generated {len(df)} peptides")
    ```
    """
    

    if charge_states is None:
        charge_states = [1, 2, 3, 4, 5, 6]

    # Default fixed mod: CAM on Cys (103.00919 + 57.02146 = 160.03065)
    if fixed_mods is None:
        fixed_mods = {"C": 160.0306}

    # Build custom mass dictionary with fixed modifications
    custom_aa_mass = mass.std_aa_mass.copy()

    # Validate fixed_mods keys
    unknown = set(fixed_mods) - set(custom_aa_mass)
    if unknown:
        raise ValueError(f"Unknown residues in fixed_mods: {sorted(unknown)}")

    custom_aa_mass.update(fixed_mods)

    # Digest the protein: returns (start_index, pep_seq)
    cleavage_results = xcleave(
        sequence,
        enzyme,
        missed_cleavages=missed_cleavages,
    )
    #print(cleavage_results)
    #cleavage_results = get_peptides_from_sites(sequence, "trypsin_full")

    df = pd.DataFrame(cleavage_results, columns=["start_index", "pep_seq"])
    df["protein_id"] = protein_id

    # Inclusive end index (0-based)
    df["pep_length"] = df["pep_seq"].str.len()
    df["end_index"] = df["start_index"] + df["pep_length"] - 1

    # Flanking amino acids (handle termini)
    seq_len = len(sequence)
    df["prev_aa"] = df["start_index"].apply(lambda i: sequence[i - 1] if i > 0 else "-")
    df["next_aa"] = df["end_index"].apply(lambda j: sequence[j + 1] if (j + 1) < seq_len else "-")

    # Extended representations
    df["extended_seq"] = df["prev_aa"] + df["pep_seq"] + df["next_aa"]
    df["rep_extended_seq"] = "(" + df["prev_aa"] + ")" + df["pep_seq"] + "(" + df["next_aa"] + ")"

    # Neutral monoisotopic mass (includes fixed mods via aa_mass)
    df["mass_mono"] = df["pep_seq"].apply(lambda s: mass.fast_mass2(s, aa_mass=custom_aa_mass))

    # m/z for different charge states using Pyteomics charge handling (robust across versions)
    for z in charge_states:
        df[f"mz_{z}"] = df["pep_seq"].apply(
            lambda s, charge=z: mass.fast_mass2(s, aa_mass=custom_aa_mass, charge=charge)
        )

    # Filters
    df = df[
        (df["pep_length"] >= min_pep_length)
        & (df["pep_length"] <= max_pep_length)
        & (df["mass_mono"].between(mass_range[0], mass_range[1]))
    ].copy()

    # Column order
    cols = [
        "start_index",
        "end_index",
        "pep_seq",
        "protein_id",
        "pep_length",
        "prev_aa",
        "next_aa",
        "extended_seq",
        "rep_extended_seq",
        "mass_mono",
    ] + [f"mz_{z}" for z in charge_states]
    df = df[cols]

    if sort_by_mass:
        df = df.sort_values("mass_mono", kind="mergesort")

    return df


In [None]:
#| hide
df = digest(
    sequence=protein_dict['P15497'],
    protein_id='P15497',
    enzyme='trypsin_full',
    missed_cleavages=0,
    sort_by_mass=True
  )
df.head(10)

Unnamed: 0,start_index,end_index,pep_seq,protein_id,pep_length,prev_aa,next_aa,extended_seq,rep_extended_seq,mass_mono,mz_1,mz_2,mz_3,mz_4,mz_5,mz_6
27,176,182,AHVETLR,P15497,7,R,Q,RAHVETLRQ,(R)AHVETLR(Q),824.450451,825.457727,413.232502,275.824093,207.119889,165.897367,138.415685
36,230,236,PVLEDLR,P15497,7,K,Q,KPVLEDLRQ,(K)PVLEDLR(Q),840.470518,841.477794,421.242535,281.164116,211.124906,169.10138,141.085696
23,156,162,VQELQDK,P15497,7,K,L,KVQELQDKL,(K)VQELQDK(L),858.444697,859.451973,430.229625,287.155509,215.618451,172.696216,144.081393
20,141,149,VAPLGEEFR,P15497,9,K,E,KVAPLGEEFRE,(K)VAPLGEEFR(E),1016.529095,1017.536372,509.271824,339.850308,255.13955,204.313095,170.428792
24,163,171,LSPLAQELR,P15497,9,K,D,KLSPLAQELRD,(K)LSPLAQELR(D),1025.586944,1026.594221,513.800749,342.869591,257.404013,206.124665,171.938434
37,237,247,QGLLPVLESLK,P15497,11,R,V,RQGLLPVLESLKV,(R)QGLLPVLESLK(V),1195.717624,1196.724901,598.866089,399.579818,299.936682,240.150801,200.293547
38,248,259,VSILAAIDEASK,P15497,12,K,K,KVSILAAIDEASKK,(K)VSILAAIDEASK(K),1215.671068,1216.678344,608.84281,406.230966,304.925043,244.14149,203.619121
32,205,216,EGGGSLAEYHAK,P15497,12,K,A,KEGGGSLAEYHAKA,(K)EGGGSLAEYHAK(A),1217.567665,1218.574942,609.791109,406.863165,305.399193,244.52081,203.935221
5,35,45,DFATVYVEAIK,P15497,11,K,D,KDFATVYVEAIKD,(K)DFATVYVEAIK(D),1254.649604,1255.656881,628.332079,419.223811,314.669677,251.937197,210.115544
18,130,138,WHEEVEIYR,P15497,9,K,Q,KWHEEVEIYRQ,(K)WHEEVEIYR(Q),1259.593486,1260.600763,630.80402,420.871772,315.905648,252.925974,210.939524


In [None]:
#| hide
assert('RAHVETLRQ' in df[df['start_index']==176]['extended_seq'].values )

In [None]:
#| export
def digest_to_empai_set(
    sequence: str,
    enzyme: str = "trypsin_full",
    missed_cleavages: int = 0,
    mz_range: tuple[float, float] = (200.0, 4000.0),
    min_charge: int = 1,          #  ignore 1+ ?? yes for now
    max_charge_cap: int = 6,      
    min_pep_length: int = 5,
    max_pep_length: int = 52,
    fixed_mods: dict[str, float] | None = None,
    fixed_mod_labels: dict[str, str] | None = None,
    basic_residues: Iterable[str] = ("K", "R", "H"),
) -> set[str]:
    """
    Digest a protein and return identifiable peptide entries for emPAI-like use.

    Produces Spectronaut-like strings:
      _PEPTIDE_.z
    where z is the charge state, and fixed mods are rendered in the peptide string,
    e.g. C[Carbamidomethyl (C)].

    Charge logic (sequence-aware) Drafted / not Implement
    -----------------------------
    For each peptide:
      max_charge_seq = (#K + #R + #H) + 1   (N-terminus)
      max_charge = min(max_charge_seq, max_charge_cap)
      charges tested = [min_charge .. max_charge]

    Filters
    -------
    - peptide length in [min_pep_length, max_pep_length]
    - m/z in [mz_range[0], mz_range[1]] for any allowed charge
    - m/z computed with fixed modifications via aa_mass

    Parameters
    ----------
    mz_range : (float, float)
        Min/max m/z window to consider "identifiable".
    min_charge : int
        Minimum charge state to consider (default 2).
    max_charge_cap : int
        Global practical cap on charge state (default 5).
    fixed_mods : dict[str, float] | None
        Residue -> new monoisotopic mass for that residue (fixed modification).
        Default is CAM on Cys: {'C': 160.0306}.
    fixed_mod_labels : dict[str, str] | None
        Residue -> Spectronaut-style label to insert into peptide string.
        Default is {'C': 'Carbamidomethyl (C)'}.
    basic_residues : Iterable[str]
        Residues counted as basic for charge upper bound (default K,R,H).

    Returns
    -------
    set[str]
        e.g., {"_DVAGAVEFWTDR_.2", "_DASGPAMTEIGEQPWGR_.3", ...}
    """
    if fixed_mods is None:
        fixed_mods = {"C": 160.0306}  # CAM-Cys

    if fixed_mod_labels is None:
        fixed_mod_labels = {"C": "Carbamidomethyl (C)"}

    # Custom AA masses with fixed mods baked in
    custom_aa_mass = mass.std_aa_mass.copy()
    unknown = set(fixed_mods) - set(custom_aa_mass)
    if unknown:
        raise ValueError(f"Unknown residues in fixed_mods: {sorted(unknown)}")
    custom_aa_mass.update(fixed_mods)

    # Digest
    cleavage_results = xcleave(
        sequence,
        enzyme,
        missed_cleavages=missed_cleavages,
    )
    #print(cleavage_results)
    
    mz_min, mz_max = mz_range
    out: set[str] = set()
    basic_set = set(basic_residues)

    def render_fixed_mods(peptide: str) -> str:
        # Insert Spectronaut-like fixed mod labels: C -> C[Carbamidomethyl (C)]
        s = peptide
        for aa, label in fixed_mod_labels.items():
            s = s.replace(aa, f"{aa}[{label}]")
        return s

    for _, pep_seq in cleavage_results:
        L = len(pep_seq)
        if L < min_pep_length or L > max_pep_length:
            #print(L, pep_seq, 'removed for L')
            continue
        
        # CONSIDER TO ADD
        # Sequence-aware max charge: (basic sites) + (N-terminus)
        #basic_count = sum(pep_seq.count(aa) for aa in basic_set)
        #max_charge_seq = basic_count + 1
        #max_charge = min(max_charge_seq, max_charge_cap)
        #max_charge = max_charge_cap
        #if max_charge < min_charge:
        #    print(L, pep_seq, 'removed for max_charge')
        #    continue  # no plausible charges to consider
        max_charge = max_charge_cap
        sn_seq = render_fixed_mods(pep_seq)

        for z in range(int(min_charge), int(max_charge) + 1):
            #print(z)
            mz = mass.fast_mass2(pep_seq, aa_mass=custom_aa_mass, charge=z)
            if mz_min <= mz <= mz_max:
                out.add(f"_{sn_seq}_.{z}")
                #print(_, L, pep_seq, 'retained for mz', mz, z)
            #else:
                #print(_,L, pep_seq, 'removed for mz', mz, z)

    return out

In [None]:
#| export
def collapse_empai_entries(empai_entries: set[str]) -> set[str]:
    """
    Collapse Spectronaut-like _PEPTIDE_.z entries to stripped peptide sequences:
      - removes leading/trailing underscores
      - removes charge annotation
      - removes modification annotations in brackets
    """
    stripped = set()

    for entry in empai_entries:
        # Remove leading/trailing underscores and charge (._z)
        core = entry.split('.')[0].strip("_")
        core = re.sub(r"\.\d+$", "", core)

        # Remove modification annotations: [Something]
        core = re.sub(r"\[[^\]]+\]", "", core)

        stripped.add(core)

    return stripped

In [None]:
#| hide
empay_dict = {}
for n in tqdm(protein_dict):
        empay_dict[n]=digest_to_empai_set(protein_dict[n])

100%|█████████████████████████████████████████████████████████████████████████████████████████████████████| 2/2 [00:00<00:00, 223.24it/s]


In [None]:
#| hide
assert('_AHVETLR_.1' in empay_dict['P15497'])

In [None]:
#| hide
assert('_AHVETLR_.6' not in empay_dict['P15497'])

In [None]:
#| hide
observable = collapse_empai_entries(empay_dict['P15497'])
assert('AHVETLR' in observable)

In [None]:
#| hide
'''>sp|O43504|LTOR5_HUMAN Ragulator complex protein LAMTOR5 OS=Homo sapiens OX=9606 GN=LAMTOR5 PE=1 SV=1
MEATLEQHLEDTMKNPSIVGVLCTDSQGLNLGCRGTLSDEHAGVISVLAQQAAKLTSDPT
DIPVVCLESDNGNIMIQKHDGITVAVHKMAS'''

'>sp|O43504|LTOR5_HUMAN Ragulator complex protein LAMTOR5 OS=Homo sapiens OX=9606 GN=LAMTOR5 PE=1 SV=1\nMEATLEQHLEDTMKNPSIVGVLCTDSQGLNLGCRGTLSDEHAGVISVLAQQAAKLTSDPT\nDIPVVCLESDNGNIMIQKHDGITVAVHKMAS'

In [None]:
#| hide
collapse_empai_entries(
    digest_to_empai_set('MEATLEQHLEDTMKNPSIVGVLCTDSQGLNLGCRGTLSDEHAGVISVLAQQAAKLTSDPTDIPVVCLESDNGNIMIQKHDGITVAVHKMAS'))

{'EATLEQHLEDTMK',
 'GTLSDEHAGVISVLAQQAAK',
 'HDGITVAVHK',
 'LTSDPTDIPVVCLESDNGNIMIQK',
 'MEATLEQHLEDTMK',
 'NPSIVGVLCTDSQGLNLGCR'}

In [None]:
#| hide
digest_to_empai_set('MEATLEQHLEDTMKNPSIVGVLCTDSQGLNLGCRGTLSDEHAGVISVLAQQAAKLTSDPTDIPVVCLESDNGNIMIQKHDGITVAVHKMAS')

{'_EATLEQHLEDTMK_.1',
 '_EATLEQHLEDTMK_.2',
 '_EATLEQHLEDTMK_.3',
 '_EATLEQHLEDTMK_.4',
 '_EATLEQHLEDTMK_.5',
 '_EATLEQHLEDTMK_.6',
 '_GTLSDEHAGVISVLAQQAAK_.1',
 '_GTLSDEHAGVISVLAQQAAK_.2',
 '_GTLSDEHAGVISVLAQQAAK_.3',
 '_GTLSDEHAGVISVLAQQAAK_.4',
 '_GTLSDEHAGVISVLAQQAAK_.5',
 '_GTLSDEHAGVISVLAQQAAK_.6',
 '_HDGITVAVHK_.1',
 '_HDGITVAVHK_.2',
 '_HDGITVAVHK_.3',
 '_HDGITVAVHK_.4',
 '_HDGITVAVHK_.5',
 '_LTSDPTDIPVVC[Carbamidomethyl (C)]LESDNGNIMIQK_.1',
 '_LTSDPTDIPVVC[Carbamidomethyl (C)]LESDNGNIMIQK_.2',
 '_LTSDPTDIPVVC[Carbamidomethyl (C)]LESDNGNIMIQK_.3',
 '_LTSDPTDIPVVC[Carbamidomethyl (C)]LESDNGNIMIQK_.4',
 '_LTSDPTDIPVVC[Carbamidomethyl (C)]LESDNGNIMIQK_.5',
 '_LTSDPTDIPVVC[Carbamidomethyl (C)]LESDNGNIMIQK_.6',
 '_MEATLEQHLEDTMK_.1',
 '_MEATLEQHLEDTMK_.2',
 '_MEATLEQHLEDTMK_.3',
 '_MEATLEQHLEDTMK_.4',
 '_MEATLEQHLEDTMK_.5',
 '_MEATLEQHLEDTMK_.6',
 '_NPSIVGVLC[Carbamidomethyl (C)]TDSQGLNLGC[Carbamidomethyl (C)]R_.1',
 '_NPSIVGVLC[Carbamidomethyl (C)]TDSQGLNLGC[Carbamidomethyl (C)]R_

In [None]:
#HMVLYFPATGNEPVVDGFFFVEGR

In [None]:
#| export
def fasta_to_peptide_set(
    fasta_path: Union[str, Path],
    enzyme: str = 'trypsin_full',
    missed_cleavages: int = 0,
    min_pep_length: int = 6,
    max_pep_length: int = 52,
    show_progress: bool = True) -> set[str]:

    """
    Digest all proteins in a FASTA file and return unique peptide sequences.
    
    Performs in silico digestion of every protein in a FASTA file and
    collects all resulting peptide sequences into a set. Useful for
    building canonical peptide databases, checking peptide detectability,
    or identifying novel peptides not present in reference proteomes.
    
    Parameters
    ----------
    fasta_path : str or Path
        Path to the input FASTA file.
    enzyme : str
        Enzyme name for digestion. Supported enzymes include trypsin,
        lysc, gluc, chymotrypsin, and others from pyteomics.
        Default is trypsin.
    missed_cleavages : int
        Number of allowed missed cleavage sites. Default is 0.
    min_pep_length : int
        Minimum peptide length to include. Default is 5.
    max_pep_length : int
        Maximum peptide length to include. Default is 52.
    show_progress : bool
        Display progress bar during processing. Default is True.
    
    Returns
    -------
    set of str
        Set of unique peptide sequences from all digested proteins.
    
    Raises
    ------
    FileNotFoundError
        If the FASTA file does not exist.
    
    Notes
    -----
    The returned set provides O(1) lookup time for checking if a peptide
    exists in the reference proteome. This is useful for:
    
    - Identifying canonical peptides in MS results
    - Flagging novel or proprietary peptides not in UniProt
    - Building theoretical peptide libraries for emPAI calculations
    - Filtering search results against known peptides
    
    Memory usage scales with the number of unique peptides. A typical
    human proteome digest produces approximately 500,000 to 2,000,000
    unique peptides depending on enzyme and missed cleavage settings.
    
    Examples
    --------
    Build a canonical peptide set from UniProt:
    ```python
        canonical = fasta_to_peptide_set("uniprot_human.fasta")
        print(f"Generated {len(canonical)} unique peptides")
    ```
    """    
    
    fasta_path = Path(fasta_path)
    if not fasta_path.exists():
        raise FileNotFoundError(f"FASTA file not found: {fasta_path}")
    
    peptide_set = set()
    
    with fasta_path.open('r') as handle:
        records = SeqIO.parse(handle, "fasta")
        if show_progress:
            from tqdm import tqdm
            records = tqdm(records, desc="Digesting proteins")

        for record in records:
            
            sequence=str(record.seq)
            cleavage_results = xcleave(
                sequence,
                enzyme,
                missed_cleavages=missed_cleavages
            )
            
            for _, pep_seq in cleavage_results:
                L = len(pep_seq)
                if L < min_pep_length or L > max_pep_length:
                    #print(L, pep_seq, 'removed for L')
                    continue
                peptide_set.add(pep_seq)
                
    
    return peptide_set

In [None]:
#| hide
peptide_set = fasta_to_peptide_set(TEST_DATA / 'test_sequence.fa')

Digesting proteins: 2it [00:00, 10994.24it/s]


In [None]:
#peptide_set

In [None]:
#| hide
assert('VAPLGEEFR' in peptide_set )
assert('QAVAPLGEEFR' in peptide_set)

In [None]:
#| export
def flag_proprietary_peptides_from_set(
    input_path: Union[str, Path],
    output_path: Union[str, Path],
    uniprot_peptides: set[str],
    sequence_col: str = 'PEP.StrippedSequence',
    new_col_name: str = 'is_novel',
    sep: str = None,
    show_progress: bool = True ) -> int:
    """
    Flag peptides as novel based on absence from a reference peptide set.
    
    Reads a peptide report file and adds a boolean column indicating
    whether each peptide is novel (True) or canonical (False). A peptide
    is considered novel if it is NOT present in the provided reference
    set of UniProt or canonical peptides.
    
    This function is useful for identifying proprietary ORF peptides
    that cannot be explained by known protein sequences, supporting
    evidence classification in proteomics workflows.
    
    Parameters
    ----------
    input_path : str or Path
        Path to input peptide file (TSV or CSV from Spectronaut or similar).
    output_path : str or Path
        Path for output file with the new flag column added.
    uniprot_peptides : set of str
        Reference set of canonical peptide sequences. Peptides NOT in
        this set are flagged as novel. Typically generated using
        fasta_to_peptide_set on a UniProt FASTA file.
    sequence_col : str
        Column name containing peptide sequences.
        Default is PEP.StrippedSequence.
    new_col_name : str
        Name for the new boolean flag column. Default is is_novel.
    sep : str or None
        Field separator. If None, auto-detects based on file extension
        where .csv uses comma and other extensions use tab.
    show_progress : bool
        Display progress bar during processing. Default is True.
    
    Returns
    -------
    int
        Number of peptides processed.
    
    Raises
    ------
    FileNotFoundError
        If the input file does not exist.
    ValueError
        If sequence_col is not found in the file or new_col_name
        already exists.
    
    Notes
    -----
    The flag logic is:
    
    - is_novel = True: Peptide NOT in reference set (potentially proprietary)
    - is_novel = False: Peptide found in reference set (canonical)
    """

    input_path = Path(input_path)
    output_path = Path(output_path)
    
    if not input_path.exists():
        raise FileNotFoundError(f"File not found: {input_path}")
    
    # Auto-detect separator from input file
    if sep is None:
        sep = ',' if input_path.suffix == '.csv' else '\t'
    
    # Check header before processing
    with input_path.open('r') as f:
        header = f.readline().rstrip('\n\r')
        columns = header.split(sep)
        
        if new_col_name in columns:
            raise ValueError(f"Column '{new_col_name}' already exists in file")
        
        if sequence_col not in columns:
            raise ValueError(f"Column '{sequence_col}' not found. Available: {columns[:10]}...")
    
    # Ensure output directory exists
    output_path.parent.mkdir(parents=True, exist_ok=True)
    
    peptide_count = 0
    seq_col_idx = columns.index(sequence_col)
    
    with input_path.open('r') as infile, output_path.open('w') as outfile:
        # Read and write header with new column
        header_line = infile.readline().rstrip('\n\r')
        outfile.write(f"{header_line}{sep}{new_col_name}\n")
        
        # Setup progress bar
        lines = infile
        if show_progress:
            from tqdm import tqdm
            lines = tqdm(infile, desc="Annotating peptides")
        
        for line in lines:
            line = line.rstrip('\n\r')
            fields = line.split(sep)
            
            peptide = fields[seq_col_idx]
            is_in_set = peptide not in uniprot_peptides
            
            outfile.write(f"{line}{sep}{is_in_set}\n")
            peptide_count += 1
    
    return peptide_count

In [None]:
#| hide
uniprot_peps = set(['DASGPAMTEIGEQPWGR', 'DVAGAVEFWTDR'])
spc_out = pd.read_csv(TEST_DATA / 'test_spectronaut_pep.tsv',sep='\t')
spc_out['PEP.StrippedSequence']
spc_out.head()

Unnamed: 0,PG.MolecularWeight,PG.ProteinAccessions,PG.Genes,PG.Organisms,PG.WBGene,PG.Locus,PG.Status,PEP.StrippedSequence,EG.PrecursorId,[1] 020_2025-DUN_DH-GB-2T1-A.raw.PG.IsSingleHit,...,[6] 020_2025-DUN_DH-GB-SLBP1+2-C.raw.EG.TotalQuantity (Settings),[7] 020_2025-DUN_DH-GB-SLBP1+2-TET-A.raw.EG.TotalQuantity (Settings),[8] 020_2025-DUN_DH-GB-SLBP1+2-TET-B.raw.EG.TotalQuantity (Settings),[9] 020_2025-DUN_DH-GB-SLBP1+2-TET-C.raw.EG.TotalQuantity (Settings),[10] 020_2025-DUN_DH-GB-SLBP1-A.raw.EG.TotalQuantity (Settings),[11] 020_2025-DUN_DH-GB-SLBP1-B-Rep-2.raw.EG.TotalQuantity (Settings),[12] 020_2025-DUN_DH-GB-SLBP1-C-Rep.raw.EG.TotalQuantity (Settings),[13] 020_2025-DUN_DH-GB-SLBP1-TET-A.raw.EG.TotalQuantity (Settings),[14] 020_2025-DUN_DH-GB-SLBP1-TET-B.raw.EG.TotalQuantity (Settings),[15] 020_2025-DUN_DH-GB-SLBP1-TET-C.raw.EG.TotalQuantity (Settings)
0,13796.3,Phleomycin,,Unknown,,,,DVAGAVEFWTDR,_DVAGAVEFWTDR_.2,False,...,29607.361328125,24625.78125,27913.017578125,33634.2265625,32040.609375,34004.15234375,31561.0546875,32193.07421875,33442.484375,30433.17578125
1,13796.3,Phleomycin,,Unknown,,,,DASGPAMTEIGEQPWGR,_DASGPAMTEIGEQPWGR_.2,False,...,16402.740234375,16614.609375,21239.26171875,26206.439453125,22974.56640625,27617.26953125,26588.005859375,26862.169921875,25289.5390625,21857.587890625
2,13796.3,Phleomycin,,Unknown,,,,DASGPAMTEIGEQPWGR,_DASGPAMTEIGEQPWGR_.3,False,...,1493.2821044921875,1043.6531982421875,2921.56005859375,3539.15234375,1961.1495361328125,3309.3720703125,3433.963134765625,3639.20458984375,3179.865478515625,2923.409423828125
3,13796.3,Phleomycin,,Unknown,,,,LTSAVPVLTAR,_LTSAVPVLTAR_.2,False,...,14565.984375,12778.6640625,15191.779296875,18907.958984375,17571.548828125,22430.3671875,21747.65625,20893.990234375,19671.70703125,15862.9970703125
4,13796.3,Phleomycin,,Unknown,,,,DPAGNCVHFVAEEQD,_DPAGNC[Carbamidomethyl (C)]VHFVAEEQD_.2,False,...,2699.176513671875,2801.34814453125,2980.920166015625,2851.24560546875,3146.41015625,3558.781982421875,4068.1572265625,4052.650390625,4167.5390625,3744.2060546875


In [None]:
#| hide
flag_proprietary_peptides_from_set(TEST_DATA / "test_spectronaut_pep.tsv", 
                  TEST_DATA / "test_spectronaut_pep_annoated.tsv", 
                  uniprot_peps                 
                  )
spc_out = pd.read_csv(TEST_DATA / 'test_spectronaut_pep_annoated.tsv',sep='\t')
spc_out.head(10)

Annotating peptides: 10it [00:00, 25130.64it/s]


Unnamed: 0,PG.MolecularWeight,PG.ProteinAccessions,PG.Genes,PG.Organisms,PG.WBGene,PG.Locus,PG.Status,PEP.StrippedSequence,EG.PrecursorId,[1] 020_2025-DUN_DH-GB-2T1-A.raw.PG.IsSingleHit,...,[7] 020_2025-DUN_DH-GB-SLBP1+2-TET-A.raw.EG.TotalQuantity (Settings),[8] 020_2025-DUN_DH-GB-SLBP1+2-TET-B.raw.EG.TotalQuantity (Settings),[9] 020_2025-DUN_DH-GB-SLBP1+2-TET-C.raw.EG.TotalQuantity (Settings),[10] 020_2025-DUN_DH-GB-SLBP1-A.raw.EG.TotalQuantity (Settings),[11] 020_2025-DUN_DH-GB-SLBP1-B-Rep-2.raw.EG.TotalQuantity (Settings),[12] 020_2025-DUN_DH-GB-SLBP1-C-Rep.raw.EG.TotalQuantity (Settings),[13] 020_2025-DUN_DH-GB-SLBP1-TET-A.raw.EG.TotalQuantity (Settings),[14] 020_2025-DUN_DH-GB-SLBP1-TET-B.raw.EG.TotalQuantity (Settings),[15] 020_2025-DUN_DH-GB-SLBP1-TET-C.raw.EG.TotalQuantity (Settings),is_novel
0,13796.3,Phleomycin,,Unknown,,,,DVAGAVEFWTDR,_DVAGAVEFWTDR_.2,False,...,24625.78125,27913.017578125,33634.2265625,32040.609375,34004.15234375,31561.0546875,32193.07421875,33442.484375,30433.17578125,False
1,13796.3,Phleomycin,,Unknown,,,,DASGPAMTEIGEQPWGR,_DASGPAMTEIGEQPWGR_.2,False,...,16614.609375,21239.26171875,26206.439453125,22974.56640625,27617.26953125,26588.005859375,26862.169921875,25289.5390625,21857.587890625,False
2,13796.3,Phleomycin,,Unknown,,,,DASGPAMTEIGEQPWGR,_DASGPAMTEIGEQPWGR_.3,False,...,1043.6531982421875,2921.56005859375,3539.15234375,1961.1495361328125,3309.3720703125,3433.963134765625,3639.20458984375,3179.865478515625,2923.409423828125,False
3,13796.3,Phleomycin,,Unknown,,,,LTSAVPVLTAR,_LTSAVPVLTAR_.2,False,...,12778.6640625,15191.779296875,18907.958984375,17571.548828125,22430.3671875,21747.65625,20893.990234375,19671.70703125,15862.9970703125,True
4,13796.3,Phleomycin,,Unknown,,,,DPAGNCVHFVAEEQD,_DPAGNC[Carbamidomethyl (C)]VHFVAEEQD_.2,False,...,2801.34814453125,2980.920166015625,2851.24560546875,3146.41015625,3558.781982421875,4068.1572265625,4052.650390625,4167.5390625,3744.2060546875,True
5,13796.3,Phleomycin,,Unknown,,,,DPAGNCVHFVAEEQD,_DPAGN[Deamidation (NQ)]C[Carbamidomethyl (C)]...,False,...,321.7603454589844,339.8725891113281,290.42327880859375,221.0309295654297,392.0259704589844,333.7810363769531,484.3554382324219,213.44088745117188,299.136962890625,True
6,13796.3,Phleomycin,,Unknown,,,,GLDELYAEWSEVVSTNFR,_GLDELYAEWSEVVSTNFR_.3,False,...,1599.41552734375,2850.877197265625,3140.3427734375,1978.1346435546875,1610.1181640625,1285.8214111328125,2614.275634765625,2498.137939453125,2051.8603515625,True
7,13796.3,Phleomycin,,Unknown,,,,DFVEDDFAGVVR,_DFVEDDFAGVVR_.2,False,...,16198.3369140625,17502.146484375,20858.0625,21224.94140625,22481.759765625,20827.43359375,18854.673828125,18972.21875,15566.208984375,True
8,13796.3,Phleomycin,,Unknown,,,,DFVEDDFAGVVR,_DFVEDDFAGVVR_.3,False,...,Filtered,163.9825439453125,181.18267822265625,Filtered,47.85918045043945,71.77069854736328,62.8223991394043,164.4523468017578,67.01526641845703,True
9,21496.45,>sp|Puromycin,,Unknown,,,,TEYKPTVR,_TEYKPTVR_.2,False,...,Filtered,Filtered,Filtered,Filtered,Filtered,Filtered,Filtered,Filtered,Filtered,True


In [None]:
#| export
def flag_proprietary_from_pg(
    input_path: Union[str, Path],
    output_path: Union[str, Path],
    accession_col: str = 'PG.ProteinAccessions',
    new_col_name: str = 'is_novel',
    uniprot_prefixes: tuple[str, ...] = ('>fl', '>sp'),
    keep_only_novel: bool = True, 
    sep: str = None,
    show_progress: bool = True ) -> dict[str, int]:

    """
    Flag peptides as novel based on protein accession prefixes.
    
    Reads a peptide report file and adds a boolean column indicating
    whether each entry maps exclusively to proprietary ORFs (True) or
    includes UniProt entries (False). Identification is based on checking
    if protein accessions contain UniProt prefixes like sp or fl.
    
    This function is useful for identifying peptides that provide
    evidence specifically for novel or proprietary ORF discoveries
    rather than known UniProt sequences.
    
    Parameters
    ----------
    input_path : str or Path
        Path to input peptide file (TSV or CSV from Spectronaut or similar).
    output_path : str or Path
        Path for output file with the new flag column added.
    accession_col : str
        Column name containing protein accessions.
        Default is PG.ProteinAccessions.
    new_col_name : str
        Name for the new boolean flag column. Default is is_novel.
    uniprot_prefixes : tuple of str
        Prefixes identifying UniProt entries in accession strings.
        Default is ('>fl', '>sp') for UniProt ORFs and Swiss-Prot entries.
    keep_only_novel : bool
        If True, only write rows where is_novel is True to output file.
        If False, write all rows with the flag column added.
        Default is False.
    sep : str or None
        Field separator. If None, auto-detects based on file extension
        where .csv uses comma and other extensions use tab.
    show_progress : bool
        Display progress bar during processing. Default is True.
    
    Returns
    -------
    dict with str keys and int values
        Summary statistics containing:
        
        - total: Total number of entries processed
        - proprietary_only: Entries mapping only to proprietary ORFs
        - has_uniprot: Entries with UniProt mappings
        - written: Number of lines written to output file
    
    Raises
    ------
    FileNotFoundError
        If the input file does not exist.
    ValueError
        If accession_col is not found in the file or new_col_name
        already exists.
    
    Notes
    -----
    The flag logic is:
    
    - is_novel = True: Accessions contain NO UniProt prefixes (proprietary only)
    - is_novel = False: Accessions contain at least one UniProt prefix
    
    Prefix matching is case-insensitive. This approach complements
    flag_proprietary_peptides_from_set which checks peptide sequences
    directly against a reference database.
    
    When keep_only_novel is True, the output file will only contain
    proprietary entries, useful for creating filtered reports focused
    on novel discoveries.
    
    """

    
    input_path = Path(input_path)
    output_path = Path(output_path)
    
    if not input_path.exists():
        raise FileNotFoundError(f"File not found: {input_path}")
    
    # Auto-detect separator from input file
    if sep is None:
        sep = ',' if input_path.suffix == '.csv' else '\t'
    
    # Check header before processing
    with input_path.open('r') as f:
        header = f.readline().rstrip('\n\r')
        columns = header.split(sep)
        
        if new_col_name in columns:
            raise ValueError(f"Column '{new_col_name}' already exists in file")
        
        if accession_col not in columns:
            raise ValueError(f"Column '{accession_col}' not found. Available: {columns[:10]}...")
    
    # Ensure output directory exists
    output_path.parent.mkdir(parents=True, exist_ok=True)
    
    # Initialize counters
    stats = {
        'total': 0,
        'proprietary_only': 0,
        'has_uniprot': 0,
        'written': 0,
    }
    
    accession_col_idx = columns.index(accession_col)
    
    def contains_uniprot(accessions: str) -> bool:
        """Check if accession string contains any UniProt identifiers."""
        accessions_lower = accessions.lower()
        return any(prefix in accessions_lower for prefix in uniprot_prefixes)
    
    with input_path.open('r') as infile, output_path.open('w') as outfile:
        # Read and write header with new column
        header_line = infile.readline().rstrip('\n\r')
        outfile.write(f"{header_line}{sep}{new_col_name}\n")
        
        # Setup progress bar
        lines = infile
        if show_progress:
            from tqdm import tqdm
            lines = tqdm(infile, desc="Flagging proprietary peptides")
        
        for line in lines:
            line = line.rstrip('\n\r')
            fields = line.split(sep)
            
            accessions = fields[accession_col_idx]
            has_uniprot = contains_uniprot(accessions)
            is_novel = not has_uniprot
            
            # Update stats
            stats['total'] += 1
            if is_novel:
                stats['proprietary_only'] += 1
            else:
                stats['has_uniprot'] += 1
            
            # Write line (filtered or all)
            if keep_only_novel:
                if is_novel:
                    outfile.write(f"{line}{sep}{is_novel}\n")
                    stats['written'] += 1
            else:
                outfile.write(f"{line}{sep}{is_novel}\n")
                stats['written'] += 1
    
    return stats

In [None]:
#| hide
spc_out = pd.read_csv(TEST_DATA / 'test_spectronaut_pep_annoated.tsv',sep='\t')
spc_out.head(10)
res = flag_proprietary_from_pg(
    input_path = TEST_DATA / "test_spectronaut_pep.tsv", 
    output_path= TEST_DATA / "test_spectronaut_pep_annoated.tsv", 
    keep_only_novel=False
                  )
assert(res=={'total': 10, 'proprietary_only': 9, 'has_uniprot': 1, 'written': 10})

Flagging proprietary peptides: 10it [00:00, 22477.51it/s]


In [None]:
res

{'total': 10, 'proprietary_only': 9, 'has_uniprot': 1, 'written': 9}

In [None]:
#| hide
res = flag_proprietary_from_pg(
    input_path = TEST_DATA / "test_spectronaut_pep.tsv", 
    output_path= TEST_DATA / "test_spectronaut_pep_annoated.tsv", 
    keep_only_novel=True
                  )
assert(res=={'total': 10, 'proprietary_only': 9, 'has_uniprot': 1, 'written': 9})

In [None]:
#| export
def load_peptides_from_fasta(
    fasta_path: Union[str, Path],
    show_progress: bool = True) -> set[str]:

    """
    Load unique peptide sequences from a FASTA file.

    This function reads a FASTA-formatted file and returns all unique
    peptide sequences found in the file. Header lines (those starting
    with `>` ) are ignored, and each non-empty sequence line is treated
    as a complete peptide.

    Parameters
    ----------
    fasta_path : str or pathlib.Path
        Path to the FASTA file containing peptide sequences.
    show_progress : bool, default=True
        If `True`, print progress messages indicating loading status
        and the number of unique peptides loaded.

    Returns
    -------
    set of str
        A set containing unique peptide sequences extracted from the
        FASTA file.

    Raises
    ------
    FileNotFoundError
        If the provided FASTA file does not exist.

    Notes
    -----
    - This function assumes that each peptide sequence is contained
      on a single line.
    - Duplicate peptide sequences are automatically removed by using
      a set.
    - FASTA headers and empty lines are ignored.

    Examples
    --------
    Load peptides from a FASTA file:

    ```python
    peptides = load_peptides_from_fasta("peptides.fasta")
    len(peptides)
    ```
    """
    

    fasta_path = Path(fasta_path)
    
    if not fasta_path.exists():
        raise FileNotFoundError(f"FASTA file not found: {fasta_path}")
    
    peptides = set()
    
    if show_progress:
        print(f"Loading peptides from {fasta_path.name}...")
    
    with fasta_path.open('r') as f:
        for line in f:
            # Skip header lines
            if line.startswith('>'):
                continue
            
            # Strip whitespace and add to set
            seq = line.strip()
            if seq:  # Skip empty lines
                peptides.add(seq)
    
    if show_progress:
        print(f"Loaded {len(peptides):,} unique peptides")
    
    return peptides

In [None]:
#| hide
pep = load_peptides_from_fasta(TEST_DATA / "pep.fa")
assert('MLFCSCQDQACAER' in pep)

In [None]:
#| hide
import nbdev; nbdev.nbdev_export()