## app backend settings and internal functions

In [1]:
from IPython import get_ipython

In [None]:
import pandas as pd
import hvplot.pandas  # Adds .hvplot and .interactive methods to Pandas dataframes
import panel as pn  # Panel is a simple, flexible and enterprise-ready data app framework

pn.extension(sizing_mode="stretch_width", template="fast")
pd.set_option("display.precision", 0)

PALETTE = [
    "#ff6f69",
    "#ffcc5c",
    "#88d8b0",
]
ACCENT_BASE_COLOR = PALETTE[2]

In [None]:
import holoviews as hv
from holoviews import opts
import matplotlib.pyplot as plt
from typing import List, Set, Dict, Tuple, Union, Any
from dataclasses import dataclass
import datashader as ds
import colorcet as cc
from holoviews.operation.datashader import datashade, shade, dynspread, spread, rasterize

from pyteomics import mzml, auxiliary
import numba
import matplotlib.pyplot as plt
import pandas as pd
import scipy.stats as stats
import numpy  as np
from scipy.optimize import curve_fit
from collections import defaultdict

from bokeh.palettes import Magma, Inferno, Plasma, Viridis, Cividis
from holoviews.plotting.util import bokeh_palette_to_palette
from matplotlib.cm import get_cmap
from bokeh.models import HoverTool
# import random, pandas as pd, numpy as np, holoviews as hv, datashader as ds, colorcet as cc
# from hv.datashader import datashade, shade, dynspread, spread, rasterize

hv.extension('bokeh', 'matplotlib', 'plotly')

TIMD_CONST = 1.002371  # TopDown isotope mass difference 55k u see OpenMS::Constants in kyowons branch

base_dir = "/workspaces/TopDown/data/cyto/"

In [None]:
@dataclass
class SpecRef:
    spectrum_id: str
    tolerance: float
    massoffset: float
    chargemass: float
    isotoperangelimits: List[Tuple[int]]
    chargerangelimits: np.ndarray

@dataclass
class TargetRef:
    spectrum_id: str
    peak_index: int 
    mass: float
    charge: int
    mass_matches: np.ndarray
    intensity_matches: np.ndarray
    isotope_matches: np.ndarray

def parse_deconv_spectra_meta(spectrum: Dict[str,Any]) -> SpecRef:
  tolerance, massoffset, chargemass, *peaknotes = spectrum['DeconvMassInfo'].split(';')
  tolerance = float(tolerance.split('=')[1])
  massoffset = float(massoffset.split('=')[1])
  chargemass = float(chargemass.split('=')[1])
  peaknotes[0] = peaknotes[0].split('=')[1]
  peaknotes = [i.split(',') for i in peaknotes if i]  # if i necessary because of trailing ,
  chargeranges, isotoperanges = list(map(list, zip(*peaknotes)))
  chargerangelimits = [tuple(map(int, i.split(':'))) for i in chargeranges]
  # fix 0 charges is too paranoid? chargerangelimits = [ min(1, i[1]) if i[0]<1 else i for i in chargerangelimits]  
  isotoperangelimits = [(0,int(j)) for j in isotoperanges]           
  # TODO check len(spectrum) == len(chargerangelimits) == len(isotoperangelimits)
  specref = SpecRef(spectrum['id'],
                    tolerance,massoffset,chargemass,
                    chargerangelimits,isotoperangelimits)
  return specref

def delta_ppm(m1,m2):
  return (m1 - m2) / m2 * 10**6

def get_match_window(mass,tolerance):
  """
  tolerance in ppm 
  mass in m/z
  returned is window border left and window border right
  """
  tol = tolerance/2
  md = ((tol / 10**6) * mass) 
  return mass-md, mass+md  

def calc_mz(givenmass, z, iso, massoffset, chargemass):
  return (givenmass - massoffset + iso * TIMD_CONST)/z + chargemass

def calc_range(givenmass, z, iso_min, iso_max, massoffset, chargemass):
  return ((givenmass - massoffset + (iso_min-2) * TIMD_CONST)/z + chargemass, 
          (givenmass - massoffset + (iso_max+2) * TIMD_CONST)/z + chargemass )

def discharge_mz(givenmass, z, chargemass):
  return (givenmass - chargemass) * z

v_discharge_mz = np.vectorize(discharge_mz)
# TODO some peaks have 0 matches! => cannot call `vectorize` on size 0 inputs unless `otypes` is set

def get_source_peaks(range_l, range_r, source_spectrum):
  target_idx = np.where(np.logical_and(source_spectrum["m/z array"] > range_l, source_spectrum["m/z array"] < range_r))
  return source_spectrum["m/z array"][target_idx], \
          source_spectrum["intensity array"][target_idx], \
          np.ones(len(target_idx[0]))*-1

def acquire_targets_per_spectrum(deconv_spectrum: Dict[str,Any], source_spectrum: Dict[str,Any]):
  target_matches = list()
  specref = parse_deconv_spectra_meta(deconv_spectrum)
  # TODO replace for-cascade with product 
  # see (http://stephantul.github.io/python/2019/07/20/product/
  # or https://note.nkmk.me/en/python-itertools-product/)
  for i in range(0,len(deconv_spectrum["m/z array"])):
    for z in range(specref.chargerangelimits[i][0], specref.chargerangelimits[i][1]+1):
      range_l, range_r = calc_range(deconv_spectrum["m/z array"][i], z, 
                                    specref.isotoperangelimits[i][0], 
                                    specref.isotoperangelimits[i][1], 
                                    specref.massoffset, specref.chargemass)
      target_range_mz, target_range_int, target_range_iso = get_source_peaks(range_l, range_r, source_spectrum)
      if target_range_mz.size > 0:
        target_range_mass = v_discharge_mz(target_range_mz, z, specref.chargemass)
      else:
        target_range_mass = target_range_mz
      for e in range(specref.isotoperangelimits[i][0], specref.isotoperangelimits[i][1]+1):
          target_mz_iso = calc_mz(source_spectrum["m/z array"][i],z,e, specref.massoffset, specref.chargemass)
          mz_window_l, mz_window_r = get_match_window(target_mz_iso, specref.tolerance)
          np.put(target_range_iso, np.where(np.logical_and(target_range_mz > mz_window_l, target_range_mz < mz_window_r)), e)
      target_matches.append(
          TargetRef(deconv_spectrum["id"],
                    i, deconv_spectrum["m/z array"][i], z,
                    target_range_mass, target_range_int, target_range_iso)
      )
  return target_matches


In [None]:

with mzml.read(base_dir + "190226_Cyto_1_FD_500ng_deconv.mzML") as reader:
	deconv_spectra = [spectrum for spectrum in reader]	

with mzml.read(base_dir + "190226_Cyto_1_FD_500ng_annotated.mzML") as reader:
	annot_spectra = [spectrum for spectrum in reader]

vis_dict = dict()
for s_a, s_o in zip(deconv_spectra, annot_spectra):
  vis_dict[s_a['spectrum title']] = acquire_targets_per_spectrum(s_a,s_o)

## figure code

In [None]:
for ori_spectrum, spectrum in zip(annot_spectra,deconv_spectra):
  if ".165." in spectrum["spectrum title"]:
    break

hv.extension('matplotlib')
# hv.extension('bokeh')
peak_coord = pd.DataFrame(np.concatenate([spectrum["m/z array"][np.newaxis].T, spectrum["intensity array"][np.newaxis].T], axis=1), columns = ['mass','intensity'])
fig_d = hv.Spikes(peak_coord).opts(color='green', title="Deconvolved Spectrum {}".format(spectrum["id"]))
peak_coord = pd.DataFrame(np.concatenate([ori_spectrum["m/z array"][np.newaxis].T, ori_spectrum["intensity array"][np.newaxis].T], axis=1), columns = ['m/z','intensity'])
fig_o = hv.Spikes(peak_coord).opts(color='blue', title="Original Spectrum {}".format(spectrum["id"]))
fig_s = fig_o.opts(aspect=3, padding=0.1) + fig_d.opts(aspect=3, padding=0.1)
# fig_s = fig_o.opts(aspect=2, padding=0.10) + fig_d.opts(aspect=2, padding=0.1)
fig_s.opts(aspect_weight=True, tight=False, fig_inches=300, fig_size=3).cols(1)
# fig_s.opts(aspect_weight=True, tight=False).cols(1)
for title, target_matches in vis_dict.items():
  if ".165." in title:
    break

target_matches[-1]
plot_title = ' '.join([spectrum["id"].split(' ')[-1], 'precursor mass =', str(target_matches[-1].mass)])

hv.extension('matplotlib')
#each peak has its matches
last_peak = filter(lambda t: t.peak_index==19, target_matches)
#for each charge there is an element in target_matches with resp. peak index
vis_peaks = list()
for t in last_peak:
  for idx, m in enumerate(t.mass_matches):
    vis_peaks.append(
        {('y', 'x', 'z'): [[t.charge,m,0],[t.charge,m,t.intensity_matches[idx]]], 
         'type': 'noise' if t.isotope_matches[idx]<0 else 'isomatch'}
    )

fig = hv.Path3D(vis_peaks, vdims='type')
fig.opts(
        ylabel="Charge",
        xlabel="Mass",
        zlabel="Intensity",
        color="blue"
).opts(fig_size=300, title=plot_title, invert_yaxis=False)


## sidebar wigets

You can find the widget reference guides [here](https://panel.holoviz.org/reference/index.html#widgets).

In [None]:
frequency = pn.widgets.FileInput(
    value="static", name="Input mzML file"
).servable(area="sidebar")
window = pn.widgets.StaticText(
    value="blabla", name="Announcement"
).servable(area="sidebar")
filecontent = pn.widgets.DataFrame(
    pd.DataFrame({'spectrum level':[s['ms level'] for s in deconv_spectra]}),
    # pd.DataFrame({'spectrum':['MS1','MS2']}),
    height=300, frozen_columns=1,
).servable(area="sidebar")
filecontent.selection = [164]
rotation = pn.widgets.IntSlider(
    value=40, start=0, end=180, name="3D rotation"
).servable(area="sidebar")

## serve panels

In [None]:
pn.pane.Markdown("""
First the selected spectrum, and from there the peaks deconvolution source(s)
""").servable()

In [None]:
def message():
    return f"""Then details on the selected spectrum **{filecontent.selection}**."""

# imessage = ipipeline.pipe(message, nrows=nrows)
# imessage

In [None]:
vis_spec_df = pn.widgets.DataFrame(
    pd.DataFrame(np.concatenate([spectrum["m/z array"][np.newaxis].T, spectrum["intensity array"][np.newaxis].T], axis=1), columns = ['mass','intensity']),
    height=500, frozen_columns=1,
)

In [None]:
pn.Row(fig_s).servable()
pn.pane.Markdown(message()).servable()
pn.Row(vis_spec_df).servable()
pn.Row(fig).servable()

## Configure the Data App

In [None]:
pn.state.template.param.update(
    site="TopDownViz",
    title="Turn topdown mzML into deconvolved spectra visualisations",
    accent_base_color=ACCENT_BASE_COLOR,
    header_background=ACCENT_BASE_COLOR,
)

You can **serve the app** with `panel serve static_file_topdown.ipynb`. Add `--autoreload` for *hot reloading* while developing. The app is available at [http://localhost:5006/static_file_topdown](http://localhost:5006/HvplotInteractive).

- For previewing the app in Jupyter lab check out the [Panel Jupyter Lab Preview](https://blog.holoviz.org/panel_0.12.0.html#JupyterLab-previews).
- For deployment options check out the [Server Deployment User Guide](https://panel.holoviz.org/user_guide/Server_Deployment.html).