In [None]:
from pathlib import Path

import h5py
import torch
import torch.nn.functional as F
import numpy as np
import numba as nb
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns
import vizta
import spectrum_utils.plot as sup
import spectrum_utils.spectrum as sus

Path("figures").mkdir(exist_ok=True)
pal = vizta.mpl.set_theme(context="poster", style="talusbio")
b_color, y_color, *_ = sns.color_palette("talusbio")
sup.colors["b"] = b_color
sup.colors["y"] = y_color

base_path = Path("../data/colab/fragment-prediction")
preds = torch.load(base_path / "test.pt")

In [None]:
@nb.njit
def vecs2seqs(vecs, charge):
    """Convert Prosit vectors to peptide sequenes"""
    alphabet = list("ACDEFGHIKLMNPQRSTVWY") + ["M[Oxidation]"]
    for idx, (seq_idx, z) in enumerate(zip(vecs, charge)):
        yield "".join([alphabet[i - 1] for i in seq_idx if i]) + f"/{z}"

@nb.njit
def flip_and_shift_b_ions(ions, n_ions):
    """Flip the b_ions and shift them one down.

    This let's them match the order of our Transformer model.
    """
    for idx, n_ion in enumerate(n_ions):
        ions[idx, 1:n_ion+1, 1, :] = ions[idx, n_ion-1::-1, 1, :]

    return ions


def reformat(array):
    array = array.reshape([-1, 29, 2, 3])
    n_ions = (array[:, :, 0, 0] >= 0).sum(axis=1)
        
    # Need an extra space because we want to shift b ions.
    array = np.pad(
        array,
        ((0, 0), (0, 1), (0, 0), (0, 0)), 
        "constant", 
        constant_values=-1,
    )
    array = flip_and_shift_b_ions(array, n_ions)
    array[:, 0, 1, :] = -1
    return torch.tensor(array).flatten(start_dim=1)

with h5py.File("../data/proteometools/test.hdf5") as data:
    n_rows = data["scan_number"].shape[0]
    step = int(np.floor(n_rows / 100_000))
    spectral_angle = data["spectral_angle"][::step]
    masses = data["masses_raw"][::step]
    intensities = data["intensities_raw"][::step]
    seq = vecs2seqs(
        data["sequence_integer"][::step],
        np.argmax(data["precursor_charge_onehot"][::step], axis=1) + 1,
    )
    intensities = reformat(intensities)
    masses = reformat(masses)
    
    seq = pd.Series(seq)

In [None]:
def masked_spectral_angle(true, pred):
    """This is an PyTorch adaptation of the Prosit implementation here:
    https://github.com/kusterlab/prosit/blob/dd16c47f8c3f4cfbd7ae84a1ca92a4d117e5e9ec/prosit/losses.py#L4-L16
    """
    true = true.flatten(start_dim=1)
    pred = pred.flatten(start_dim=1)
    epsilon = torch.finfo(torch.float32).eps
    pred_masked = ((true + 1) * pred) / (true + 1 + epsilon)
    true_masked = ((true + 1) * true) / (true + 1 + epsilon)
    pred_norm = F.normalize(true_masked, p=2, dim=-1)
    true_norm = F.normalize(pred_masked, p=2, dim=-1)
    product = torch.sum(pred_norm * true_norm, dim=1)
    arccos = torch.acos(product)
    return 2 * arccos / np.pi


pred_sa = 1 - masked_spectral_angle(intensities, preds).detach().numpy()

In [None]:
seq[seq.str.contains("EDITH")]

In [None]:
idx = 56582
edith_mz = masses[idx, ...]
edith_int = intensities[idx, ...]
edith_pred = preds[idx, ...]
edith_seq = seq[idx]

is_peak = edith_int >= 0
edith_mz = edith_mz[is_peak]
edith_int = edith_int[is_peak]
edith_pred = edith_pred[is_peak]

edith_seq, edith_mz.shape, edith_int.shape

edith_spec = (
    sus.MsmsSpectrum("Measured Intensities", mz=edith_mz, intensity=edith_int, precursor_mz=0, precursor_charge=4)
    .annotate_proforma(edith_seq, 20, "ppm", ion_types="by", max_ion_charge=3)
)

edith_pred_spec = (
    sus.MsmsSpectrum("Predicted Intensities", mz=edith_mz, intensity=edith_pred, precursor_mz=0, precursor_charge=4)
    .annotate_proforma(edith_seq, 20, "ppm", ion_types="by", max_ion_charge=3)
)

def annotate_ion_type(annotation, ion_types="aby"):
    if annotation.ion_type[0] in ion_types:
        if abs(annotation.isotope) == 1:
            iso = "+i" if annotation.isotope > 0 else "-i"
        elif annotation.isotope != 0:
            iso = f"{annotation.isotope:+}i"
        else:
            iso = ""
        nl = {"-NH3": "*", "-H2O": "o"}.get(annotation.neutral_loss, "")
        return f"{annotation.ion_type}{iso}{'+' * annotation.charge}{nl}"
    else:
        return ""

In [None]:
_, ax = plt.subplots(1, 1, figsize=(12.45, 9.12))
dc_angles = pd.DataFrame({"Model": "Depthcharge\nTransformer", "sa": 1 - pred_sa})
prosit_angles = pd.DataFrame({"Model": "Prosit GRU (2019)", "sa": 1 - spectral_angle})
angles = pd.concat([dc_angles, prosit_angles])
sns.ecdfplot(data=angles, x="sa", hue="Model", ax=ax, stat="count")
ax.set_xlabel("1 - Spectral Angle")
ax.set_ylabel("Number of Peptides")
ax.grid(False)
plt.tight_layout()
plt.savefig("figures/intensities.png", dpi=300, transparent=True)

_, ax = plt.subplots(1, 1, figsize=(26, 10.43))
sup.mirror(edith_spec, edith_pred_spec, dict(grid=False, annot_fmt=annotate_ion_type), ax=ax)
ax.set_ylim(-1.35, 1.35)
ax.annotate("Measured", (20, 1.2), va="top")
ax.annotate("Depthcharge Predicted", (20, -1.2), va="bottom")
ax.set_title(edith_seq, ha="left", loc="left")
ax.grid(False)
plt.tight_layout()
plt.savefig("figures/spectrum.png", dpi=300, transparent=True)