# Scoring

> Scoring functions to calculate kinase score based on substrate sequence

In [None]:
#| default_exp score

## Overview

This module provides functions to calculate kinase scores based on substrate sequences using Position-Specific Scoring Matrices (PSSMs) from either PSPA data or kinase-substrate datasets.

---

**Utility Functions**

These helper functions prepare and parse phosphorylation site sequences.

`cut_seq` — Extracts a substring from a sequence based on positions relative to its center (the phosphorylation site).

```python
cut_seq(
    input_string='AAkUuPSFSTtH',  # site sequence (15-mer or similar)
    min_position=-5,              # start position relative to center
    max_position=4,               # end position relative to center
)
# Returns: 'AkUuPSFSTt'
```

`STY2sty` — Converts uppercase S/T/Y (unphosphorylated) to lowercase s/t/y (phosphorylated) in a sequence.

```python
STY2sty(
    input_string='AAkUuPSFSTtH',  # sequence with uppercase STY
)
# Returns: 'AAkUuPsFsttH'
```

`get_dict` — Parses a site sequence into position-annotated amino acid keys (e.g., `-3P`, `0s`, `1Q`) for PSSM lookup.

```python
get_dict(
    input_string='PSVEPPLsQETFSDL',  # 15-mer phosphosite sequence
)
# Returns: ['-7P', '-6S', '-5V', ..., '0s', '1Q', ..., '7L']
```

---

**Scoring Functions**

These functions aggregate PSSM values across positions to produce a single kinase score.

`sumup` — Simple summation of PSSM values at each position. Used with log-odds (LO) matrices.

```python
sumup(
    values=[0.5, 1.2, -0.3, 0.8],  # PSSM values at each position
    kinase='ATM',                  # kinase name (unused, for API consistency)
)
# Returns: sum of values
```

`multiply` — Log-space multiplication of PSSM probabilities, normalized by number of possible amino acids. Used with probability matrices.

```python
multiply(
    values=[0.04, 0.05, 0.03],  # probability values from PSSM
    kinase='ATM',              # kinase name (unused)
    num_aa=23,                 # number of amino acids (23 for CDDM, 20 for uppercase)
)
# Returns: log2(product) + normalization factor
```

`multiply_pspa` — Like `multiply`, but uses a kinase-specific normalization factor from the PSPA random amino acid library.

```python
multiply_pspa(
    values=[1, 2, 3, 4, 5],  # PSSM probability values
    kinase='PDHK1',          # kinase name (determines normalization)
)
# Returns: np.float64(22.907...)
```

---

**Single Sequence Prediction**

`predict_kinase` — Scores all kinases against a single phosphosite sequence. Returns a sorted Series of kinase scores.

```python
predict_kinase(
    input_string='PSVEPPLsQETFSDL',  # 15-mer site sequence (lowercase = phosphosite)
    ref=Data.get_pspa(),             # reference PSSM DataFrame (kinases × positions)
    func=multiply_pspa,              # aggregation function (sumup or multiply_*)
    to_lower=False,                  # convert STY→sty before scoring
    to_upper=False,                  # convert all to uppercase before scoring
    verbose=True,                    # print which positions were used
)
# Returns: pd.Series of scores indexed by kinase, sorted descending
```

---

**Batch Prediction (DataFrame)**

`predict_kinase_df` — Scores all kinases against multiple sequences in a DataFrame. Optimized for large-scale scoring.

```python
predict_kinase_df(
    df=Data.get_human_site().head(100),  # DataFrame containing site sequences
    seq_col='site_seq',                   # column name with sequences
    ref=Data.get_pspa(),                  # reference PSSM DataFrame
    func=multiply_pspa,                   # aggregation function
    to_lower=False,                       # convert STY→sty
    to_upper=False,                       # convert all to uppercase
)
# Returns: DataFrame (rows=input sites, cols=kinases) of scores
```

---

**Percentile Scoring**

These functions convert raw scores to percentiles based on a reference distribution.

`get_pct` — Calculates percentile rank for a single sequence against all kinases.

```python
get_pct(
    site='PSVEPPLsQETFSDL',       # phosphosite sequence
    ref=Data.get_pspa_st(),       # reference PSSM
    func=multiply_pspa,           # scoring function
    pct_ref=Data.get_pspa_st_pct(),  # percentile reference distribution
)
# Returns: DataFrame with columns ['log2(score)', 'percentile']
```

`get_pct_df` — Converts a DataFrame of raw scores to percentile ranks (vectorized).

```python
get_pct_df(
    score_df=out,                    # output from predict_kinase_df
    pct_ref=Data.get_pspa_st_pct(),  # reference distribution for percentiles
)
# Returns: DataFrame of percentile ranks (same shape as score_df)
```

---

**Configuration Presets**

`Params` — Returns pre-configured parameter dictionaries for different scoring modes.

```python
Params(
    name='PSPA',  # preset name: 'CDDM', 'CDDM_upper', 'PSPA_st', 'PSPA_y', 'PSPA'
    load=True,    # whether to load the reference DataFrame immediately
)
# Returns: dict with keys 'ref', 'func', and optionally 'to_upper'/'to_lower'

# List available presets:
Params() # Returns: ['CDDM', 'CDDM_upper', 'PSPA_st', 'PSPA_y', 'PSPA']
```

| Preset | Reference | Function | Use Case |
|--------|-----------|----------|----------|
| `CDDM` | Log-odds matrix | `sumup` | Known phosphorylated status |
| `CDDM_upper` | Log-odds (uppercase) | `sumup` | All uppercase only, unknown phosphorylated status |
| `PSPA_st` | PSPA S/T kinases | `multiply_pspa` | Known phosphorylated status; S/T only |
| `PSPA_y` | PSPA Y kinases | `multiply_pspa` | Known phosphorylated status; Y only |
| `PSPA` | All PSPA kinases | `multiply_pspa` | Known phosphorylated status |

---

**Typical Workflow**

```python
# Single sequence scoring
scores = predict_kinase(
    input_string='PSVEPPLsQETFSDL',
    **Params('PSPA'),
)

# Batch scoring with percentiles on S/T sites
df = Data.get_human_site()
score_df = predict_kinase_df(
    df=df,
    seq_col='site_seq',
    **Params('PSPA_st'),
)
pct_df = get_pct_df(
    score_df=score_df,
    pct_ref=Data.get_pspa_st_pct(),
)
```

## Setup

In [None]:
#| export
import numpy as np, pandas as pd
from katlas.data import *
from katlas.utils import *
from katlas.pssm.core import *
from typing import Callable
from functools import partial

## Utils

In [None]:
#| export 
def cut_seq(input_string: str, # site sequence
            min_position: int, # minimum position relative to its center
            max_position: int, # maximum position relative to its center
            ):
    
    "Extract sequence based on a range relative to its center position"
    
    # Find the center position of the string
    center_position = len(input_string) // 2

    # Calculate the start and end indices
    start_index = max(center_position + min_position, 0)  # Ensure start_index is not negative
    end_index = min(center_position + max_position + 1, len(input_string))  # Ensure end_index does not exceed string length

    # Extract and return the substring
    return input_string[start_index:end_index]

In [None]:
cut_seq('AAkUuPSFSTtH',-5,4)

'AkUuPSFSTt'

In [None]:
#| export
def STY2sty(input_string: str):
    "Replace all 'STY' with 'sty' in a sequence"    
    return input_string.replace('S', 's').replace('T', 't').replace('Y', 'y')

In [None]:
STY2sty('AAkUuPSFSTtH') # convert all capital STY to sty in a string

'AAkUuPsFsttH'

In [None]:
#| export
def get_dict(input_string:str, # phosphorylation site sequence
            ):
    
    "Get a dictionary of input string; no need for the star in the middle; make sure it is 15 or 10 length"

    center_index = len(input_string) // 2
    center_char = input_string[center_index]

    result = []

    for i, char in enumerate(input_string):
        position = i - center_index

        if char.isalpha():
            result.append(f"{position}{char}")

    return result

In [None]:
cols = get_dict("PSVEPPLsQETFSDL")
cols

['-7P',
 '-6S',
 '-5V',
 '-4E',
 '-3P',
 '-2P',
 '-1L',
 '0s',
 '1Q',
 '2E',
 '3T',
 '4F',
 '5S',
 '6D',
 '7L']

## Scoring func

### Multiply

In [None]:
#| export
def multiply(values, # list of values, possibilities of amino acids at certain positions
                  kinase=None,
             num_aa=23, # number of amino acids, 23 for standard CDDM, 20 for all uppercase CDDM
            ):
    
    "Multiply the possibilities of the amino acids at each position in a phosphorylation site"
    

    # Using the logarithmic property: log(a*b) = log(a) + log(b)
    # Compute the sum of the logarithms of the values and the scale factor
    values = [v+EPSILON for v in values]
    log_sum = np.sum(np.log2(values)) + (len(values) - 1) * np.log2(num_aa)

    return log_sum

$$
\text{Score} = \log_2 \left( \frac{ \prod P_{\text{KinX}}(\text{AA}, \text{Position}) }{ \left( \frac{1}{\#\text{Random AA}} \right)^{\text{length(Position except 0)}} } \right)
$$

The function implement formula from [Johnson et al. Nature: An atlas of substrate specificities for the human serine/threonine kinome, Supplementary Note2](https://static-content.springer.com/esm/art%3A10.1038%2Fs41586-022-05575-3/MediaObjects/41586_2022_5575_MOESM1_ESM.pdf) (page 160)

In [None]:
#| export
multiply_23 = partial(multiply,num_aa=23)

In [None]:
#| export
multiply_20 = partial(multiply,num_aa=20)

Multiply class, consider the dynamics of scale factor

In [None]:
#| export
def multiply_pspa(values, kinase, num_aa_dict=Data.get_num_dict()):
    "Multiply values, consider the dynamics of scale factor, which is PSPA random aa number."

    # Check if any values are less than or equal to zero
    if np.any(np.array(values) == 0):
        return np.nan
    else:
        # Retrieve the divide factor from the dictionary
        divide_factor = num_aa_dict[kinase]

        # Using the logarithmic property: log(a*b) = log(a) + log(b)
        # Compute the sum of the logarithms of the values and the divide factor
        log_sum = np.sum(np.log2(values)) + (len(values) - 1) * np.log2(divide_factor)

        return log_sum

In [None]:
multiply_pspa(values=[1,2,3,4,5],kinase='PDHK1')

np.float64(22.906890595608516)

### Sum

In [None]:
#| export
def sumup(values, # list of values, possibilities of amino acids at certain positions
          kinase=None, 
         ):
    "Sum up the possibilities of the amino acids at each position in a phosphorylation site sequence"
    return sum(values)

## Predict kinase

In [None]:
#| export
def duplicate_ref_zero(df: pd.DataFrame) -> pd.DataFrame:
    """
    If '0S', '0T', '0Y' exist with non-zero values, create '0s', '0t', '0y' with same values.
    If '0s', '0t', '0y' exist with non-zero values, create '0S', '0T', '0Y' with same values.
    """
    df = df.copy()
    pairs = [('0S', '0s'), ('0T', '0t'), ('0Y', '0y')]

    for upper, lower in pairs:
        if upper in df.columns and (df[upper] != 0).any():
            df[lower] = df[upper]
        elif lower in df.columns and (df[lower] != 0).any():
            df[upper] = df[lower]

    return df

In [None]:
#| export
def preprocess_ref(ref):
    "Convert pS/T/Y in ref columns to s/t/y if any; mirror 0S/T/Y to 0s/t/y."
    ref = ref.copy()
    # if ref contains pS,pT,pY columns, convert them to s,t,y for scoring
    ref.columns=ref.columns.map(pSTY2sty)
    # duplicate 0S/T/Y to 0s/t/y (or the opposite) to ensure equal treatment of zero position
    return duplicate_ref_zero(ref)

In [None]:
#| export
def predict_kinase(input_string: str, # site sequence
                   ref: pd.DataFrame, # reference dataframe for scoring
                   func: Callable, # function to calculate score
                   to_lower: bool=False, # convert capital STY to lower case
                   to_upper: bool=False, # convert all letter to uppercase
                   verbose=True
                   ):
    "Predict kinase given a phosphorylation site sequence"
 
    input_string = check_seq(input_string)

    if to_lower: input_string = STY2sty(input_string)

    if to_upper: input_string = input_string.upper()

    ref = preprocess_ref(ref)
    
    results = []
    
    for kinase, row in ref.iterrows():
        
        # Convert the row into a dictionary, excluding NaN values, to create a PSSM dictionary for a kinase
        r_dict = row.dropna().to_dict()
        
        # Extract position+amino acid name from the input string and filter them against the name in PSSM
        pos_aa_name = get_dict(input_string)
        pos_aa_name = [key for key in pos_aa_name if key in r_dict.keys()]
    
        # Collect corresponding PSSM values for these positions and amino acids
        pos_aa_val = [r_dict[key] for key in pos_aa_name] # Further checks for NaN values
        
        # Calculate the score for this kinase using the specified function
        score = func(pos_aa_val, kinase)
        results.append(score)
    
    if verbose:
        print(f'considering string: {pos_aa_name}')

    out = pd.Series(results, index=ref.index).sort_values(ascending=False)
        
    return out.round(3).dropna()

PSPA scoring:

In [None]:
pspa_ref = Data.get_pspa()

In [None]:
predict_kinase("PSVEPPLsQETFSDL",ref=pspa_ref,func=multiply_pspa)

NameError: name 'pSTY2sty' is not defined

CDDM scoring, LO + sum

In [None]:
ref=Data.get_cddm_LO() # Data.get_cddm_LO_upper()

In [None]:
predict_kinase("PSVEPPLsQETFSDL",ref=ref,func=sumup)

CDDM scoring, PSSM + multiply (#23aa)

In [None]:
ref=Data.get_cddm() # Data.get_cddm_upper()
predict_kinase("PSVEPPLsQETFSDL",ref=ref,func=multiply_23)

CDDM scoring, PSSM + multiply (#20aa)

In [None]:
ref=Data.get_cddm_upper() # Data.get_cddm_upper()
predict_kinase("PSVEPPLsQETFSDL",ref=ref,func=multiply_20)

## Params

Here we provide different PSSM settings from either PSPA data or kinase-substrate dataset for kinase prediction:

In [None]:
#| export
def Params(name=None, load=True):
    def lazy(f): return lambda: f().astype('float32')
    
    params = {
        "CDDM": {'ref': lazy(Data.get_cddm_LO), 'func': sumup},
        "CDDM_upper": {'ref': lazy(Data.get_cddm_LO_upper), 'func': sumup, 'to_upper': True},
        "PSPA_st": {'ref': lazy(Data.get_pspa_st), 'func': multiply_pspa},
        "PSPA_y": {'ref': lazy(Data.get_pspa_tyr), 'func': multiply_pspa},
        "PSPA": {'ref': lazy(Data.get_pspa), 'func': multiply_pspa},
    }

    if name is None:
        return list(params.keys())

    cfg = params[name]
    if load and callable(cfg['ref']):
        cfg['ref'] = cfg['ref']()  # actually load now
    return cfg

In [None]:
Params()

In [None]:
for p in ['PSPA', 'CDDM','CDDM_upper']:
    print(predict_kinase("PSVEPPLsQETFSDL",**Params(p)).head())

NameError: name 'Params' is not defined

## Predict kinase in df

In [None]:
#| export
def multiply_generic(merged_df, kinases, df_index, divide_factor_func):
    """Multiply-based log-sum aggregation across kinases."""
    out = {}
    log2 = np.log2  # local alias for speed
    
    for kinase in tqdm(kinases, desc="Computing multiply_generic"):
        divide_factor = divide_factor_func(kinase)
        df = merged_df[['input_index', kinase]].dropna()
        if df.empty:
            out[kinase] = pd.Series(index=df_index, dtype=float)
            continue
        
        log_values = log2(df[kinase] + EPSILON)
        grouped = df.assign(log_value=log_values).groupby('input_index')['log_value']
        
        # vectorized form
        log_sum = grouped.sum() + (grouped.count() - 1) * log2(divide_factor)
        out[kinase] = log_sum

    return pd.DataFrame(out).reindex(df_index)

In [None]:
#| export
def predict_kinase_df(df, seq_col, ref, func, to_lower=False, to_upper=False):
    """
    Predict kinase scores based on reference PSSM or weight matrix.
    Applies preprocessing, merges long format keys, then aggregates using given func.
    """
    print(f"Input dataframe has {len(df)} rows")
    print("Preprocessing...")

    ref = preprocess_ref(ref)
    df = df.copy()
    df[seq_col] = check_seqs(df[seq_col])  # accepts both Series and DataFrame per your earlier fix

    if to_lower:
        df[seq_col] = df[seq_col].apply(STY2sty)
    if to_upper:
        df[seq_col] = df[seq_col].str.upper()

    # Align sequence length to ref
    pos = ref.columns.str[:-1].astype(int)
    df[seq_col] = df[seq_col].apply(partial(cut_seq, min_position=pos.min(), max_position=pos.max()))

    print("Preprocessing done. Expanding sequences...")

    # Convert sequences to long-form keys
    input_keys_df = (
        df.assign(keys=df[seq_col].apply(get_dict))
          .explode('keys')
          .reset_index(names='input_index')[['input_index', 'keys']]
          .rename(columns={'keys': 'key'})
          .set_index('key')
    )

    print("Merging reference...")
    ref_T = ref.T.astype('float32')
    merged_df = input_keys_df.merge(ref_T, left_index=True, right_index=True, how='inner')
    print("Merge complete.")

    # Dispatch by func
    if func == sumup:
        out = merged_df.groupby('input_index').sum().reindex(df.index)
    elif func in (multiply_pspa, multiply_23, multiply_20):
        num_dict = Data.get_num_dict() if func == multiply_pspa else None
        divisor = (
            (lambda k: num_dict[k])
            if func == multiply_pspa else
            (lambda k: 23 if func == multiply_23 else 20)
        )
        out = multiply_generic(merged_df, ref_T.columns, df.index, divide_factor_func=divisor)
    else:
        raise ValueError(f"Unknown function: {func}")

    return out.round(3)

In [None]:
df=Data.get_human_site()

In [None]:
# for p in ['CDDM', 'CDDM_upper', 'PSPA_st', 'PSPA_y', 'PSPA']:
#     out = predict_kinase_df2(df.head(10), seq_col='site_seq', **Params(p))
#     print(out.head())

In [None]:
out = predict_kinase_df(df.head(100), seq_col='site_seq', **Params('PSPA'))

NameError: name 'Params' is not defined

## Percentile scoring

In [None]:
#| export
def get_pct(site,ref,func,pct_ref):
    
    "Replicate the precentile results from The Kinase Library."
    
    # As here we try to replicate the results, we use site.upper(); consider removing it for future version.
    score = predict_kinase(site.upper(),ref=ref,func=func)
    
    percentiles = {}
    for kinase in score.index: 
        # Get the values from `ref` for this kinase
        ref_values = pct_ref[kinase].values
        # Calculate how many values in `ref` are less than the new score
        less = np.sum(ref_values < score[kinase])
        # Calculate how many values are equal to the new score
        equal = np.sum(ref_values == score[kinase])
        # Calculate the percentile rank
        percentile = (less + 0.5 * equal) / len(ref_values) * 100
        percentiles[kinase] = percentile
        
    pct = pd.Series(percentiles)
    final = pd.concat([score,pct],axis=1)
    final.columns=['log2(score)','percentile']
    return final

In [None]:
st_pct = Data.get_pspa_st_pct()
y_pct = Data.get_pspa_tyr_pct()

In [None]:
out = get_pct('PSVEPPLyQETFSDL',**Params('PSPA_y'), pct_ref=y_pct)
out.sort_values('percentile',ascending=False)

In [None]:
get_pct('PSVEPPLsQETFSDL',**Params('PSPA_st'), pct_ref=st_pct)

In [None]:
#| export
def get_pct_df(score_df, # output from predict_kinase_df 
               pct_ref, # a reference df for percentile calculation
              ):
    
    "Replicate the precentile results from The Kinase Library."

    # Create an array to hold percentile ranks
    percentiles = np.zeros(score_df.shape)
    
    # Calculate percentiles for each column in a vectorized manner
    for i, kinase in tqdm(enumerate(score_df.columns),total=len(score_df.columns)):
        ref_values = np.sort(pct_ref[kinase].values)
        
        # Use searchsorted to find indices where the scores would be inserted to maintain order
        indices = np.searchsorted(ref_values, score_df[kinase].values, side='right')
        
        # Calculate percentile ranks
        percentiles[:, i] = indices / len(ref_values) * 100

    # Convert the array to a DataFrame with appropriate indices and columns
    percentiles_df = pd.DataFrame(percentiles, index=score_df.index, columns=score_df.columns).astype(float).round(3)
    
    return percentiles_df

```python
# substrate score first
score_df = predict_kinase_df(df_sty,'site_seq', **Params('PSPA_st'))

#get percentile reference
pct_ref = Data.get_pspa_st_pct()

# calculate percentile score
pct = get_pct_df(score_df,pct_ref)
```

## End

In [None]:
#| hide
import nbdev; nbdev.nbdev_export()