diff --git a/docs/source/pymzml_spec.rst b/docs/source/pymzml_spec.rst index 950d493e..900e462f 100755 --- a/docs/source/pymzml_spec.rst +++ b/docs/source/pymzml_spec.rst @@ -30,8 +30,8 @@ Chromatogram :exclude-members: __repr__, __str__ -MS_Spectrum +MsData ----------- -.. autoclass:: pymzml.spec.MS_Spectrum +.. autoclass:: pymzml.msdata.MsData :members: diff --git a/example_scripts/access_spectra_and_chromatograms.py b/example_scripts/access_spectra_and_chromatograms.py new file mode 100644 index 00000000..2b8af2b0 --- /dev/null +++ b/example_scripts/access_spectra_and_chromatograms.py @@ -0,0 +1,101 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- + +import os +import sys +import pymzml + + +def main(mzml_file): + """ + Example script demonstrating how to access both spectra and chromatograms + using pymzML. + + Usage: + ./access_spectra_and_chromatograms.py + """ + print("Initializing Reader...") + # Initialize with skip_chromatogram=False to include chromatograms during iteration + run = pymzml.run.Reader(mzml_file, skip_chromatogram=False) + + # Access the first spectrum using indexing (traditional way) + print("\nAccessing first spectrum using indexing (run[0]):") + try: + spectrum = run[0] + print(f"Spectrum ID: {spectrum.ID}") + print(f"MS Level: {spectrum.ms_level}") + print(f"Retention Time: {spectrum.scan_time_in_minutes()} minutes") + print(f"Number of peaks: {len(spectrum.peaks('raw'))}") + except Exception as e: + print(f"Error accessing spectrum: {e}") + + # Access the first spectrum using the new get_spectrum method + print("\nAccessing first spectrum using get_spectrum(0):") + try: + spectrum = run.get_spectrum(0) + print(f"Spectrum ID: {spectrum.ID}") + print(f"MS Level: {spectrum.ms_level}") + print(f"Retention Time: {spectrum.scan_time_in_minutes()} minutes") + print(f"Number of peaks: {len(spectrum.peaks('raw'))}") + except Exception as e: + print(f"Error accessing spectrum: {e}") + + # Access the TIC chromatogram using string identifier + print("\nAccessing TIC chromatogram using run['TIC']:") + try: + chromatogram = run["TIC"] + print(f"Chromatogram ID: {chromatogram.ID}") + print(f"Number of data points: {len(chromatogram.peaks())}") + + # Print the first few data points + print("\nFirst 5 data points (time, intensity):") + for i, (time, intensity) in enumerate(chromatogram.peaks()): + if i >= 5: + break + print(f" {time:.4f}, {intensity:.2f}") + except Exception as e: + print(f"Error accessing chromatogram: {e}") + + # Access the first chromatogram using the new get_chromatogram method + print("\nAccessing first chromatogram using get_chromatogram(0):") + try: + chromatogram = run.get_chromatogram(0) + print(f"Chromatogram ID: {chromatogram.ID}") + print(f"Number of data points: {len(chromatogram.peaks())}") + + # Print the first few data points + print("\nFirst 5 data points (time, intensity):") + for i, (time, intensity) in enumerate(chromatogram.peaks()): + if i >= 5: + break + print(f" {time:.4f}, {intensity:.2f}") + except Exception as e: + print(f"Error accessing chromatogram: {e}") + + # Demonstrate iterating through all items (spectra and chromatograms) + print("\nIterating through first few items (spectra and chromatograms):") + count = 0 + for item in run: + if count >= 5: + break + if isinstance(item, pymzml.spec.Spectrum): + print(f" Spectrum {item.ID}, MS level {item.ms_level}, RT {item.scan_time_in_minutes():.2f} min") + elif hasattr(item, 'time') and hasattr(item, 'i'): + print(f" Chromatogram {item.ID}, {len(item.peaks())} data points") + count += 1 + + print("\nDone!") + + +if __name__ == "__main__": + if len(sys.argv) < 2: + print(main.__doc__) + print("Please provide a path to an mzML file.") + sys.exit(1) + + mzml_file = sys.argv[1] + if not os.path.exists(mzml_file): + print(f"Error: File '{mzml_file}' not found.") + sys.exit(1) + + main(mzml_file) diff --git a/pymzml/__init__.py b/pymzml/__init__.py index 07cdd5df..c60e1ff1 100755 --- a/pymzml/__init__.py +++ b/pymzml/__init__.py @@ -24,7 +24,7 @@ OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. """ -__all__ = ["run", "spec", "obo", "minimum", "plot", "file_classes"] +__all__ = ["run", "spec", "chromatogram", "obo", "minimum", "plot", "file_classes"] import os import sys @@ -40,7 +40,9 @@ # Imports of individual modules import pymzml.run import pymzml.spec +import pymzml.chromatogram from pymzml.spec import MSDecoder +from pymzml.chromatogram import Chromatogram import pymzml.obo import pymzml.plot import pymzml.utils diff --git a/pymzml/chromatogram.py b/pymzml/chromatogram.py new file mode 100644 index 00000000..ccc6d9d4 --- /dev/null +++ b/pymzml/chromatogram.py @@ -0,0 +1,372 @@ +#!/usr/bin/env python3 +# -*- coding: latin-1 -*- +""" +The chromatogram class offers a python object for mass spectrometry chromatogram data. +The chromatogram object holds the basic information of the chromatogram and offers +methods to interrogate properties of the chromatogram. +Data, i.e. time and intensity decoding is performed on demand +and can be accessed via their properties, e.g. :py:attr:`~pymzml.chromatogram.Chromatogram.peaks`. + +The Chromatogram class is used in the :py:class:`~pymzml.run.Reader` class. +There each chromatogram is accessible as a chromatogram object. +""" + +# Python mzML module - pymzml +# Copyright (C) 2010-2019 M. Kösters, C. Fufezan +# The MIT License (MIT) + +# Permission is hereby granted, free of charge, to any person obtaining a copy +# of this software and associated documentation files (the "Software"), to deal +# in the Software without restriction, including without limitation the rights +# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +# copies of the Software, and to permit persons to whom the Software is +# furnished to do so, subject to the following conditions: + +# The above copyright notice and this permission notice shall be included in all +# copies or substantial portions of the Software. + +# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +# SOFTWARE. + +import re +import numpy as np +from .msdata import MsData +from .obo import OboTranslator + + +class Chromatogram(MsData): + """ + Class for Chromatogram access and handling. + """ + + def __init__(self, element, measured_precision=5e-6, *, obo_version=None): + """ + Arguments: + element (xml.etree.ElementTree.Element): chromatogram as xml Element + + Keyword Arguments: + measured_precision (float): in ppm, i.e. 5e-6 equals to 5 ppm. + obo_version (str, optional): obo version number. + """ + self._measured_precision = measured_precision + self.element = element + self.noise_level_estimate = {} + # Property variables + self._time = None + self._ms_level = None + self._i = None + self._t_mass_set = None + self._peaks = None + self._t_mz_set = None + self._centroided_peaks = None + self._reprofiled_peaks = None + self._deconvoluted_peaks = None + self._profile = None + self._extreme_values = None + self._centroided_peaks_sorted_by_i = None + self._transformed_mz_with_error = None + self._transformed_mass_with_error = None + self._precursors = None + self._ID = None + self._chromatogram_type = None + self._precursor_mz = None + self._product_mz = None + self._polarity = None + self.obo_translator = OboTranslator.from_cache(obo_version) + + if self.element: + self.ns = ( + re.match(r"\{.*\}", element.tag).group(0) + if re.match(r"\{.*\}", element.tag) + else "" + ) + + self._decode = self._decode_to_numpy + # assign function to create numpy array to list??? + self._array = np.array + + def __repr__(self): + """ + Returns representative string for a chromatogram object class + """ + return "<__main__.Chromatogram object with native ID {0} at {1}>".format( + self.ID, hex(id(self)) + ) + + def __str__(self): + """ + Returns representative string for a chromatogram object class + """ + return "<__main__.Chromatogram object with native ID {0} at {1}>".format( + self.ID, hex(id(self)) + ) + + @property + def ID(self): + """ + Access the native id of the chromatogram. + + Returns: + ID (str): native ID of the chromatogram + """ + if self._ID is None: + if self.element: + self._ID = self.element.get("id") + return self._ID + + @property + def mz(self): + """ + Chromatogram has no property mz. This property is included for + compatibility with the Spectrum class. + + Returns: + time (list): list of time values from the chromatogram + """ + print("Chromatogram has no property mz.\nReturn retention time instead") + return self.time + + @property + def time(self): + """ + Returns the list of time values. If the time values are encoded, the + function _decode() is used to decode the encoded data.\n + The time property can also be set, e.g. for theoretical data. + However, it is recommended to use the profile property to set time and + intensity tuples at same time. + + Returns: + time (list): list of time values from the analyzed chromatogram + + """ + if self._time is None: + params = self._get_encoding_parameters("time array") + self._time = self._decode(*params) + return self._time + + @property + def i(self): + """ + Returns the list of intensity values from the analyzed chromatogram. + + Returns: + i (list): list of intensity values from the analyzed chromatogram + """ + if self._i is None: + params = self._get_encoding_parameters("intensity array") + self._i = self._decode(*params) + return self._i + + @property + def profile(self): + """ + Returns the list of peaks of the chromatogram as tuples (time, intensity). + + Returns: + peaks (list): list of time, i tuples + + Example: + + >>> import pymzml + >>> run = pymzml.run.Reader( + ... spectra.mzMl.gz, + ... MS_precisions = { + ... 1 : 5e-6, + ... 2 : 20e-6 + ... } + ... ) + >>> for entry in run: + ... if isinstance(entry, pymzml.chromatogram.Chromatogram): + ... for time, intensity in entry.peaks: + ... print(time, intensity) + + Note: + The peaks property can also be set, e.g. for theoretical data. + It requires a list of time/intensity tuples. + + """ + if self._profile is None: + if self._time is None and self._i is None: + self._profile = [] + for pos, t in enumerate(self.time): + self._profile.append([t, self.i[pos]]) + # much faster than zip ... list(zip(self.mz, self.i)) + elif self._time is not None and self._i is not None: + self._profile = [] + for pos, t in enumerate(self.time): + self._profile.append([t, self.i[pos]]) + elif self._profile is None: + self._profile = [] + return self._array(self._profile) + + @profile.setter + def profile(self, tuple_list): + """ + Set the chromatogram profile. + + Args: + tuple_list (list): list of tuples (time, intensity) + """ + if len(tuple_list) == 0: + return + self._time = [] + self._i = [] + for time, i in tuple_list: + self._time.append(time) + self._i.append(i) + self._peaks = tuple_list + self._reprofiledPeaks = None + self._centroidedPeaks = None + return self + + def peaks(self): + """ + Return the list of peaks of the chromatogram as tuples (time, intensity). + + Returns: + peaks (list): list of time, intensity tuples + + Example: + + >>> import pymzml + >>> run = pymzml.run.Reader( + ... spectra.mzMl.gz, + ... MS_precisions = { + ... 1 : 5e-6, + ... 2 : 20e-6 + ... } + ... ) + >>> for entry in run: + ... if isinstance(entry, pymzml.chromatogram.Chromatogram): + ... for time, intensity in entry.peaks: + ... print(time, intensity) + + Note: + The peaks property can also be set, e.g. for theoretical data. + It requires a list of time/intensity tuples. + + """ + return self.profile + + @property + def chromatogram_type(self): + """ + Returns the chromatogram type. + + Returns: + chromatogram_type (str): chromatogram type + """ + if self._chromatogram_type is None: + for element in self.element.iter(): + if element.tag.endswith("}cvParam"): + accession = element.get("accession") + # Check for chromatogram type accessions + if accession in [ + "MS:1000235", # total ion current chromatogram + "MS:1000627", # selected ion current chromatogram + "MS:1000628", # basepeak intensity chromatogram + "MS:1000810", # chromatogram + "MS:1000811", # chromatogram created by spectrum aggregation + "MS:1000812", # single ion monitoring chromatogram + "MS:1000813", # multiple reaction monitoring chromatogram + "MS:1000814", # selected reaction monitoring chromatogram + "MS:1000815", # consecutive reaction monitoring chromatogram + "MS:1001472", # selected ion monitoring chromatogram + "MS:1001473", # selected reaction monitoring chromatogram + "MS:1001474", # consecutive reaction monitoring chromatogram + "MS:1001475", # targeted SIM chromatogram + "MS:1001476", # automatic SIM chromatogram + "MS:1001477", # targeted SRM chromatogram + "MS:1001478", # automatic SRM chromatogram + "MS:1001479", # targeted CRM chromatogram + "MS:1001480", # automatic CRM chromatogram + ]: + self._chromatogram_type = element.get("name") + break + return self._chromatogram_type + + @property + def polarity(self): + """ + Returns the polarity of the chromatogram. + + Returns: + polarity (str): polarity (positive scan or negative scan) + """ + if self._polarity is None: + for element in self.element.iter(): + if element.tag.endswith("}cvParam"): + accession = element.get("accession") + # Check for polarity accessions + if accession in [ + "MS:1000129", # negative scan + "MS:1000130", # positive scan + ]: + self._polarity = element.get("name") + break + return self._polarity + + @property + def precursor_mz(self): + """ + Returns the precursor m/z value for SRM/MRM chromatograms. + + Returns: + precursor_mz (float): precursor m/z value + """ + if self._precursor_mz is None: + precursor = self.element.find(f".//{self.ns}precursor") + if precursor is not None: + isolation_window = precursor.find(f".//{self.ns}isolationWindow") + if isolation_window is not None: + for element in isolation_window.iter(): + if ( + element.tag.endswith("}cvParam") + and element.get("accession") == "MS:1000827" + ): # isolation window target m/z + self._precursor_mz = float(element.get("value")) + break + return self._precursor_mz + + @property + def product_mz(self): + """ + Returns the product m/z value for SRM/MRM chromatograms. + + Returns: + product_mz (float): product m/z value + """ + if self._product_mz is None: + product = self.element.find(f".//{self.ns}product") + if product is not None: + isolation_window = product.find(f".//{self.ns}isolationWindow") + if isolation_window is not None: + for element in isolation_window.iter(): + if ( + element.tag.endswith("}cvParam") + and element.get("accession") == "MS:1000827" + ): # isolation window target m/z + self._product_mz = float(element.get("value")) + break + return self._product_mz + + def get_chromatogram_properties(self): + """ + Returns a dictionary with the main properties of the chromatogram. + + Returns: + properties (dict): dictionary with chromatogram properties + """ + properties = { + "id": self.ID, + "chromatogram_type": self.chromatogram_type, + "polarity": self.polarity, + "precursor_mz": self.precursor_mz, + "product_mz": self.product_mz, + } + return properties diff --git a/pymzml/file_classes/indexedGzip.py b/pymzml/file_classes/indexedGzip.py index a3ac7e1d..bdfe68dd 100755 --- a/pymzml/file_classes/indexedGzip.py +++ b/pymzml/file_classes/indexedGzip.py @@ -33,6 +33,7 @@ from xml.etree.ElementTree import XML from .. import spec +from .. import chromatogram from ..utils.GSGR import GSGR @@ -89,7 +90,7 @@ def __getitem__(self, identifier): data = self.Reader.read_block(identifier) element = XML(ns_prefix + data.decode("utf-8") + ns_suffix) if "chromatogram" in element[0].tag: - return spec.Chromatogram(list(element)[0], measured_precision=5e-6) + return chromatogram.Chromatogram(list(element)[0], measured_precision=5e-6) else: return spec.Spectrum(list(element)[0], measured_precision=5e-6) diff --git a/pymzml/file_classes/standardGzip.py b/pymzml/file_classes/standardGzip.py index fd808ac0..b5108f61 100755 --- a/pymzml/file_classes/standardGzip.py +++ b/pymzml/file_classes/standardGzip.py @@ -32,6 +32,7 @@ from .. import regex_patterns from .. import spec +from .. import chromatogram class StandardGzip(object): @@ -100,7 +101,9 @@ def __getitem__(self, identifier): elif element.tag.endswith("}chromatogram"): if element.get("id") == identifier: self.file_handler.seek(old_pos, 0) - return spec.Chromatogram(element, measured_precision=5e-6) + return chromatogram.Chromatogram( + element, measured_precision=5e-6 + ) if __name__ == "__main__": diff --git a/pymzml/file_classes/standardMzml.py b/pymzml/file_classes/standardMzml.py index 18223a53..364389e9 100755 --- a/pymzml/file_classes/standardMzml.py +++ b/pymzml/file_classes/standardMzml.py @@ -35,6 +35,7 @@ from xml.etree.ElementTree import XML, iterparse from .. import spec +from .. import chromatogram from .. import regex_patterns @@ -97,7 +98,7 @@ def __getitem__(self, identifier): if element.tag.endswith("}chromatogram"): if element.get("id") == "TIC": found = True - spectrum = spec.Chromatogram( + spectrum = chromatogram.Chromatogram( element, measured_precision=5e-6 ) elif event == "STOP": @@ -116,7 +117,7 @@ def __getitem__(self, identifier): if data.startswith(">> spec.get_element_by_path(['scanList', 'scan', 'scanWindowList', + ... 'scanWindow', 'cvParam']) + + """ + return_ele = None + if len(hooks) > 0: + path_array = ["."] + for hook in hooks: + path_array.append("{ns}{hook}".format(ns=self.ns, hook=hook)) + path = "/".join(path_array) + return_ele = self.element.findall(path) + + return return_ele + + def _register(self, decoded_tuple): + d_type, array = decoded_tuple + if d_type == "mz": + self._mz = array + elif d_type == "i": + self._i = array + elif d_type == "time": + self._time = array + else: + raise Exception("Unknown data Type ({0})".format(d_type)) + + def _get_encoding_parameters(self, array_type): + """ + Find the correct parameter for decoding and return them as tuple. + + Arguments: + array_type (str): data type of the array, e.g. m/z, time or + intensity + Returns: + data (str) : encoded data + comp (str) : compression method + d_type (str) : data type + d_array_length (str) : length of the data array + """ + numpress_encoding = False + + b_data_string = "./{ns}binaryDataArrayList/{ns}binaryDataArray/{ns}cvParam[@name='{name}']/..".format( + ns=self.ns, name=array_type + ) + float_type_string = "./{ns}cvParam[@accession='{Acc}']" + + b_data_array = self.element.find(b_data_string) + if not b_data_array: + # non-standard data array + b_data_string = "./{ns}binaryDataArrayList/{ns}binaryDataArray/{ns}cvParam[@value='{value}']/..".format( + ns=self.ns, value=array_type + ) + b_data_array = self.element.find(b_data_string) + + comp = [] + if b_data_array: + for cvParam in b_data_array.iterfind("./{ns}cvParam".format(ns=self.ns)): + if "compression" in cvParam.get("name"): + if "numpress" in cvParam.get("name").lower(): + numpress_encoding = True + comp.append(cvParam.get("name")) + d_array_length = self.element.get("defaultArrayLength") + if not numpress_encoding: + try: + # 32-bit float + d_type = b_data_array.find( + float_type_string.format( + ns=self.ns, + Acc=self.obo_translator["32-bit float"]["id"], + ) + ).get("name") + except: + try: + # 64-bit Float + d_type = b_data_array.find( + float_type_string.format( + ns=self.ns, + Acc=self.obo_translator["64-bit float"]["id"], + ) + ).get("name") + except: + try: + # 32-bit integer + d_type = b_data_array.find( + float_type_string.format( + ns=self.ns, + Acc=self.obo_translator["32-bit integer"]["id"], + ) + ).get("name") + except: + try: + # 64-bit integer + d_type = b_data_array.find( + float_type_string.format( + ns=self.ns, + Acc=self.obo_translator["64-bit integer"]["id"], + ) + ).get("name") + except: + # null-terminated ASCII string + d_type = b_data_array.find( + float_type_string.format( + ns=self.ns, + Acc=self.obo_translator[ + "null-terminated ASCII string" + ]["id"], + ) + ).get("name") + else: + # compression is numpress, dont need data type here + d_type = None + data = b_data_array.find("./{ns}binary".format(ns=self.ns)) + if data is not None: + data = data.text + else: + data = None + d_array_length = 0 + d_type = "64-bit float" + if data is not None: + data = data.encode("utf-8") + else: + data = "" + return (data, d_array_length, d_type, comp) + + @property + def measured_precision(self): + """ + Set the measured and internal precision. + + Returns: + value (float): measured Precision (e.g. 5e-6) + """ + return self._measured_precision + + @measured_precision.setter + def measured_precision(self, value): + self._measured_precision = value + self.internal_precision = int(round(50000.0 / (value * 1e6))) + return + + def _decode_to_numpy(self, data, d_array_length, data_type, comp): + """ + Decode the b64 encoded and packed strings from data as numpy arrays. + + Returns: + data (np.ndarray): Returns the unpacked data as a tuple. Returns an + empty list if there is no raw data or raises an + exception if data could not be decoded. + + d_array_length just for compatibility + """ + out_data = b64dec(data) + if len(out_data) != 0: + if "zlib" in comp or "zlib compression" in comp: + out_data = zlib.decompress(out_data) + if ( + "ms-np-linear" in comp + or "ms-np-pic" in comp + or "ms-np-slof" in comp + or "MS-Numpress linear prediction compression" in comp + or "MS-Numpress short logged float compression" in comp + ): + out_data = self._decodeNumpress_to_array(out_data, comp) + if data_type == "32-bit float": + # one character code may be sufficient too (f) + f_type = np.float32 + out_data = np.frombuffer(out_data, f_type) + elif data_type == "64-bit float": + # one character code may be sufficient too (d) + f_type = np.float64 + out_data = np.frombuffer(out_data, f_type) + elif data_type == "32-bit integer": + # one character code may be sufficient too (i) + i_type = np.int32 + out_data = np.frombuffer(out_data, i_type) + elif data_type == "64-bit integer": + # one character code may be sufficient too (l) + i_type = np.int64 + out_data = np.frombuffer(out_data, i_type) + # TODO elif data_type == "null-terminated ASCII string": + else: + raise ValueError(f"Unsupported data type: {data_type}") + else: + out_data = np.array([]) + return out_data + + def _decode_to_tuple(self, data, d_array_length, float_type, comp): + """ + Decode b64 encoded and packed strings. + + Returns: + data (tuple): Returns the unpacked data as a tuple. + Returns an empty list if there is no raw data or + raises an exception if data could not be decoded. + """ + dec_data = b64dec(data) + if len(dec_data) != 0: + if "zlib" in comp or "zlib compression" in comp: + dec_data = zlib.decompress(dec_data) + if set(["ms-np-linear", "ms-np-pic", "ms-np-slof"]) & set(comp): + self._decodeNumpress(data, comp) + # else: + # print( + # 'New data compression ({0}) detected, cant decompress'.format( + # comp + # ) + # ) + # sys.exit(1) + if float_type == "32-bit float": + f_type = "f" + elif float_type == "64-bit float": + f_type = "d" + fmt = "{endian}{array_length}{float_type}".format( + endian="<", array_length=d_array_length, float_type=f_type + ) + ret_data = unpack(fmt, dec_data) + else: + ret_data = [] + return ret_data + + def _decodeNumpress_to_array(self, data, compression): + """ + Decode golomb-rice encoded data (aka numpress encoded data). + + Arguments: + data (str) : Encoded data string + compression (str) : Decompression algorithm to be used + (valid are 'ms-np-linear', 'ms-np-pic', 'ms-np-slof') + + Returns: + array (list): Returns the unpacked data as an array of floats. + + """ + result = [] + comp_ms_tags = [self.calling_instance.OT[comp]["id"] for comp in compression] + data = np.frombuffer(data, dtype=np.uint8) + if "MS:1002312" in comp_ms_tags: + from .decoder import MSDecoder + + result = MSDecoder.decode_linear(data) + elif "MS:1002313" in comp_ms_tags: + from .decoder import MSDecoder + + result = MSDecoder.decode_pic(data) + elif "MS:1002314" in comp_ms_tags: + from .decoder import MSDecoder + + result = MSDecoder.decode_slof(data) + return result + + def _median(self, data): + """ + Compute median. + + Arguments: + data (list): list of numeric values + + Returns: + median (float): median of the input data + """ + return np.median(data) + + def to_string(self, encoding="latin-1", method="xml"): + """ + Return string representation of the xml element the + spectrum was initialized with. + + Keyword Arguments: + encoding (str) : text encoding of the returned string.\n + Default is latin-1. + method (str) : text format of the returned string.\n + Default is xml, alternatives are html and text. + + Returns: + element (str) : xml string representation of the spectrum. + """ + return ElementTree.tostring(self.element, encoding=encoding, method=method) diff --git a/pymzml/run.py b/pymzml/run.py index 1c521c37..16a9494a 100644 --- a/pymzml/run.py +++ b/pymzml/run.py @@ -46,6 +46,7 @@ from pathlib import Path from . import spec +from . import chromatogram from . import obo from . import regex_patterns from .file_interface import FileInterface @@ -166,7 +167,9 @@ def __next__(self): if element.tag.endswith("}chromatogram"): if self.skip_chromatogram: continue - spectrum = spec.Chromatogram(element, obo_version=self.OT.version) + spectrum = chromatogram.Chromatogram( + element, obo_version=self.OT.version + ) # if has_ref_group: # spectrum._set_params_from_reference_group( # self.info['referenceable_param_group_list_element'] @@ -183,26 +186,29 @@ def __next__(self): def __getitem__(self, identifier): """ - Access spectrum with native id 'identifier'. + Access spectrum or chromatogram with native id 'identifier'. Arguments: identifier (str or int): last number in the id tag of the spectrum - element + element or a chromatogram identifier like 'TIC' Returns: spectrum (Spectrum or Chromatogram): spectrum/chromatogram object with native id 'identifier' """ try: - if int(identifier) > self.get_spectrum_count(): + if isinstance(identifier, int) and identifier > self.get_spectrum_count(): raise Exception("Requested identifier is out of range") except: pass - spectrum = self.info["file_object"][identifier] - spectrum.obo_translator = self.OT - if isinstance(spectrum, spec.Spectrum): - spectrum.measured_precision = self.ms_precisions[spectrum.ms_level] - return spectrum + + element = self.info["file_object"][identifier] + element.obo_translator = self.OT + + if isinstance(element, spec.Spectrum): + element.measured_precision = self.ms_precisions[element.ms_level] + + return element def __enter__(self): return self @@ -455,6 +461,76 @@ def get_chromatogram_count(self): """ return self.info["chromatogram_count"] + def get_spectrum(self, identifier): + """ + Access spectrum with the given identifier. + + Arguments: + identifier (str or int): Either a string identifier or an index (0-based) + to access spectra in order. + + Returns: + spectrum (Spectrum): spectrum object with the given identifier + + Note: + This method provides the same functionality as using the indexing syntax + (e.g., run[0]), but with a more explicit method name. + """ + return self[identifier] + + def get_chromatogram(self, identifier): + """ + Access chromatogram with the given identifier. + + Arguments: + identifier (str or int): Either a string identifier like 'TIC' or + an index (0-based) to access chromatograms in order. + + Returns: + chromatogram (Chromatogram): chromatogram object with the given identifier + + Note: + This method is only useful when skip_chromatogram is set to False + if you want to access chromatograms by index. If skip_chromatogram is True, + you can still access chromatograms by string identifiers (e.g., 'TIC'). + """ + if isinstance(identifier, str): + return self[identifier] + + if isinstance(identifier, int): + if self.get_chromatogram_count() is None: + raise Exception("No chromatograms found in the file") + + if identifier >= self.get_chromatogram_count(): + raise Exception( + f"Chromatogram index {identifier} is out of range (0-{self.get_chromatogram_count()-1})" + ) + + # Reset the file pointer and iterate to find the chromatogram + temp_skip_chromatogram = self.skip_chromatogram + self.skip_chromatogram = False + + self.info["file_object"].close() + self.info["file_object"] = self._open_file( + self.path_or_file, build_index_from_scratch=False + ) + self.iter = self._init_iter() + + chrom_count = 0 + try: + for element in self: + if isinstance(element, chromatogram.Chromatogram): + if chrom_count == identifier: + return element + chrom_count += 1 + finally: + # Restore original skip_chromatogram setting + self.skip_chromatogram = temp_skip_chromatogram + + raise Exception(f"Chromatogram with index {identifier} not found") + + raise ValueError("Identifier must be a string or an integer") + def close(self): self.info["file_object"].close() diff --git a/pymzml/spec.py b/pymzml/spec.py index 3001e8f8..304b7570 100755 --- a/pymzml/spec.py +++ b/pymzml/spec.py @@ -69,10 +69,15 @@ PROTON = 1.00727646677 ISOTOPE_AVERAGE_DIFFERENCE = 1.002 +# Import Chromatogram from chromatogram.py for backward compatibility +# Import MsData from msdata.py +from .msdata import MsData -class MS_Spectrum(object): + +class MS_Spectrum(MsData): """ General spectrum class for data handling. + This class is kept for backward compatibility. """ def _read_accessions(self): @@ -392,7 +397,7 @@ def to_string(self, encoding="latin-1", method="xml"): return ElementTree.tostring(self.element, encoding=encoding, method=method) -class Spectrum(MS_Spectrum): +class Spectrum(MsData): """ Spectrum class which inherits from class :py:attr:`pymzml.spec.MS_Spectrum` @@ -1733,7 +1738,7 @@ def centroidedPeaks(self): return self.peaks("centroided") -class Chromatogram(MS_Spectrum): +class Chromatogram(MsData): """ Class for Chromatogram access and handling. """ diff --git a/pymzml/utils/SQListeConnector.py b/pymzml/utils/SQListeConnector.py index 3fdf1582..fbc5bbe9 100755 --- a/pymzml/utils/SQListeConnector.py +++ b/pymzml/utils/SQListeConnector.py @@ -1,6 +1,7 @@ import sqlite3 import xml.etree.ElementTree as et from pymzml import spec +from pymzml import chromatogram from pymzml.run import Reader @@ -46,7 +47,7 @@ def __getitem__(self, key): if "spectrum" in element.tag: spectrum = spec.Spectrum(element) elif "chromatogram" in element.tag: - spectrum = spec.Chromatogram(element) + spectrum = chromatogram.Chromatogram(element) return spectrum def get_spectrum_count(self): diff --git a/tests/access_spectra_and_chromatograms_test.py b/tests/access_spectra_and_chromatograms_test.py new file mode 100644 index 00000000..b5bddf18 --- /dev/null +++ b/tests/access_spectra_and_chromatograms_test.py @@ -0,0 +1,187 @@ +#!/usr/bin/env python3 +# -*- coding: utf-8 -*- +""" +Test cases for the new functionality in pymzml.run.Reader +related to accessing spectra and chromatograms. +""" +import os +import sys + +# Add parent directory to Python path +sys.path.append(os.path.abspath(os.path.dirname(os.path.dirname(__file__)))) + +import unittest +import pymzml.run as run +from pymzml.spec import Spectrum +import test_file_paths + + +class AccessSpectraAndChromatogramsTest(unittest.TestCase): + """ + Test cases for the new functionality in pymzml.run.Reader + related to accessing spectra and chromatograms. + """ + + def setUp(self): + """Set up test cases.""" + self.paths = test_file_paths.paths + + # Use a file with chromatograms for testing + # mini.chrom.mzML is at index 3 + for i, path in enumerate(self.paths): + if "mini.chrom.mzML" in path and not path.endswith(".gz") and not path.endswith(".idx.gz"): + self.chrom_file = path + break + else: + # Fallback to a known index if the file name is not found + self.chrom_file = self.paths[3] # mini.chrom.mzML + + # Initialize readers with different settings + self.reader_with_chromatograms = run.Reader( + self.chrom_file, + skip_chromatogram=False + ) + + self.reader_skip_chromatograms = run.Reader( + self.chrom_file, + skip_chromatogram=True + ) + + def test_get_spectrum_method(self): + """Test the get_spectrum method.""" + # Check if the file has spectra + spec_count = self.reader_with_chromatograms.get_spectrum_count() + if spec_count is None or spec_count == 0: + self.skipTest("Test file does not contain spectra") + + # Test that get_spectrum(0) returns the same as reader[0] + try: + spectrum_by_index = self.reader_with_chromatograms[0] + spectrum_by_method = self.reader_with_chromatograms.get_spectrum(0) + + self.assertIsInstance(spectrum_by_index, Spectrum) + self.assertIsInstance(spectrum_by_method, Spectrum) + self.assertEqual(spectrum_by_index.ID, spectrum_by_method.ID) + + # Test accessing a spectrum by ID + spectrum_id = spectrum_by_index.ID + if isinstance(spectrum_id, str): + spectrum_by_id = self.reader_with_chromatograms[spectrum_id] + self.assertEqual(spectrum_by_index.ID, spectrum_by_id.ID) + except IndexError: + self.skipTest("Could not access spectrum at index 0") + + def test_get_chromatogram_method(self): + """Test the get_chromatogram method.""" + # Check if the file has chromatograms + chrom_count = self.reader_with_chromatograms.get_chromatogram_count() + if chrom_count is None or chrom_count == 0: + self.skipTest("Test file does not contain chromatograms") + + # Test accessing chromatogram by index + try: + chrom_by_index = self.reader_with_chromatograms.get_chromatogram(0) + self.assertTrue(hasattr(chrom_by_index, 'time') and hasattr(chrom_by_index, 'i')) + + # If we successfully got a chromatogram by index, try to get it by ID + chrom_id = chrom_by_index.ID + if chrom_id: + chrom_by_id = self.reader_with_chromatograms[chrom_id] + self.assertTrue(hasattr(chrom_by_id, 'time') and hasattr(chrom_by_id, 'i')) + self.assertEqual(chrom_by_id.ID, chrom_id) + except Exception as e: + self.skipTest(f"Could not access chromatogram at index 0: {e}") + + # Test that the chromatogram count is correct + self.assertIsNotNone(self.reader_with_chromatograms.get_chromatogram_count()) + + def test_skip_chromatogram_parameter(self): + """Test the skip_chromatogram parameter.""" + # Check if the file has both spectra and chromatograms + spec_count = self.reader_with_chromatograms.get_spectrum_count() + chrom_count = self.reader_with_chromatograms.get_chromatogram_count() + + if spec_count is None or spec_count == 0: + self.skipTest("Test file does not contain spectra") + if chrom_count is None or chrom_count == 0: + self.skipTest("Test file does not contain chromatograms") + + # With skip_chromatogram=False, we should see both spectra and chromatograms + # Reset the reader to ensure we start from the beginning + self.reader_with_chromatograms.close() + self.reader_with_chromatograms = run.Reader( + self.chrom_file, + skip_chromatogram=False + ) + + # Collect items + spectra_found = False + chromatograms_found = False + count = 0 + max_items = 20 # Increase the limit to ensure we see both types + + for item in self.reader_with_chromatograms: + if isinstance(item, Spectrum): + spectra_found = True + elif hasattr(item, 'time') and hasattr(item, 'i'): + chromatograms_found = True + + if spectra_found and chromatograms_found: + break + + count += 1 + if count >= max_items: + break + + # Check that we found both types + self.assertTrue(spectra_found, "No spectra found when iterating with skip_chromatogram=False") + self.assertTrue(chromatograms_found, "No chromatograms found when iterating with skip_chromatogram=False") + + # With skip_chromatogram=True, we should only see spectra + # Reset the reader to ensure we start from the beginning + self.reader_skip_chromatograms.close() + self.reader_skip_chromatograms = run.Reader( + self.chrom_file, + skip_chromatogram=True + ) + + # Collect items + items_without_chromatograms = [] + for item in self.reader_skip_chromatograms: + items_without_chromatograms.append(item) + if len(items_without_chromatograms) >= 10: # Limit to first 10 items + break + + # Check that we only have spectra (if any items were found) + if items_without_chromatograms: + only_spectra = all(isinstance(item, Spectrum) for item in items_without_chromatograms) + self.assertTrue(only_spectra, "Found non-spectrum items when iterating with skip_chromatogram=True") + + def test_chromatogram_index_out_of_range(self): + """Test that accessing a chromatogram with an out-of-range index raises an exception.""" + # Check if the file has chromatograms + chrom_count = self.reader_with_chromatograms.get_chromatogram_count() + if chrom_count is None or chrom_count == 0: + self.skipTest("Test file does not contain chromatograms") + + with self.assertRaises(Exception): + self.reader_with_chromatograms.get_chromatogram(100) # Assuming there are fewer than 100 chromatograms + + def test_chromatogram_invalid_identifier(self): + """Test that accessing a chromatogram with an invalid identifier raises an exception.""" + # Check if the file has chromatograms + chrom_count = self.reader_with_chromatograms.get_chromatogram_count() + if chrom_count is None or chrom_count == 0: + self.skipTest("Test file does not contain chromatograms") + + with self.assertRaises(Exception): + self.reader_with_chromatograms.get_chromatogram("NonExistentChromatogram") + + def tearDown(self): + """Clean up after tests.""" + self.reader_with_chromatograms.close() + self.reader_skip_chromatograms.close() + + +if __name__ == "__main__": + unittest.main(verbosity=3) diff --git a/tests/chromatogram_properties_test.py b/tests/chromatogram_properties_test.py new file mode 100644 index 00000000..87936bfb --- /dev/null +++ b/tests/chromatogram_properties_test.py @@ -0,0 +1,162 @@ +#!/usr/bin/env python3 +# -*- coding: utf-8 -*- +""" +Test cases for the new properties in the Chromatogram class. +""" +import os +import sys + +# Add parent directory to Python path +sys.path.append(os.path.abspath(os.path.dirname(os.path.dirname(__file__)))) + +import unittest +import pymzml.run as run +from pymzml.chromatogram import Chromatogram +import test_file_paths + + +class ChromatogramPropertiesTest(unittest.TestCase): + """ + Test cases for the new properties in the Chromatogram class. + """ + + def setUp(self): + """Set up test cases.""" + self.paths = test_file_paths.paths + + # Use a file with chromatograms for testing + # mini.chrom.mzML is at index 3 + for i, path in enumerate(self.paths): + if "mini.chrom.mzML" in path and not path.endswith(".gz") and not path.endswith(".idx.gz"): + self.chrom_file = path + break + else: + # Fallback to a known index if the file name is not found + self.chrom_file = self.paths[3] # mini.chrom.mzML + + # Initialize reader with chromatograms + self.reader = run.Reader( + self.chrom_file, + skip_chromatogram=False + ) + + def test_chromatogram_type(self): + """Test the chromatogram_type property.""" + # Get the first chromatogram + chromatogram = self.reader.get_chromatogram(0) + + # Test that chromatogram_type is accessible + chromatogram_type = chromatogram.chromatogram_type + + # The type might be None depending on the test file, but the property should be accessible + self.assertIsNotNone(chromatogram, "Chromatogram should not be None") + + # Print the chromatogram type for debugging + print(f"Chromatogram type: {chromatogram_type}") + + def test_polarity(self): + """Test the polarity property.""" + # Get the first chromatogram + chromatogram = self.reader.get_chromatogram(0) + + # Test that polarity is accessible + polarity = chromatogram.polarity + + # The polarity might be None depending on the test file, but the property should be accessible + self.assertIsNotNone(chromatogram, "Chromatogram should not be None") + + # Print the polarity for debugging + print(f"Polarity: {polarity}") + + def test_precursor_mz(self): + """Test the precursor_mz property.""" + # Get the first chromatogram + chromatogram = self.reader.get_chromatogram(0) + + # Test that precursor_mz is accessible + precursor_mz = chromatogram.precursor_mz + + # The precursor_mz might be None depending on the test file, but the property should be accessible + self.assertIsNotNone(chromatogram, "Chromatogram should not be None") + + # Print the precursor_mz for debugging + print(f"Precursor m/z: {precursor_mz}") + + def test_product_mz(self): + """Test the product_mz property.""" + # Get the first chromatogram + chromatogram = self.reader.get_chromatogram(0) + + # Test that product_mz is accessible + product_mz = chromatogram.product_mz + + # The product_mz might be None depending on the test file, but the property should be accessible + self.assertIsNotNone(chromatogram, "Chromatogram should not be None") + + # Print the product_mz for debugging + print(f"Product m/z: {product_mz}") + + def test_get_chromatogram_properties(self): + """Test the get_chromatogram_properties method.""" + # Get the first chromatogram + chromatogram = self.reader.get_chromatogram(0) + + # Test that get_chromatogram_properties returns a dictionary + properties = chromatogram.get_chromatogram_properties() + + self.assertIsInstance(properties, dict, "get_chromatogram_properties should return a dictionary") + + # Check that the dictionary contains the expected keys + expected_keys = ["id", "chromatogram_type", "polarity", "precursor_mz", "product_mz"] + for key in expected_keys: + self.assertIn(key, properties, f"Properties dictionary should contain key '{key}'") + + # Print the properties for debugging + print("Chromatogram properties:") + for key, value in properties.items(): + print(f" {key}: {value}") + + def test_all_chromatograms(self): + """Test all chromatograms in the file.""" + # Get the number of chromatograms + chrom_count = self.reader.get_chromatogram_count() + + if chrom_count is None or chrom_count == 0: + self.skipTest("Test file does not contain chromatograms") + + print(f"\nTesting {chrom_count} chromatograms:") + + # Test each chromatogram + for i in range(chrom_count): + chromatogram = self.reader.get_chromatogram(i) + + # Print information about the chromatogram + print(f"\nChromatogram {i}:") + print(f" ID: {chromatogram.ID}") + print(f" Type: {chromatogram.chromatogram_type}") + print(f" Polarity: {chromatogram.polarity}") + print(f" Precursor m/z: {chromatogram.precursor_mz}") + print(f" Product m/z: {chromatogram.product_mz}") + + # Verify that the chromatogram has time and intensity data + self.assertIsNotNone(chromatogram.time, "Chromatogram should have time data") + self.assertIsNotNone(chromatogram.i, "Chromatogram should have intensity data") + + # Verify that the peaks method returns data + peaks = chromatogram.peaks() + self.assertIsNotNone(peaks, "Chromatogram peaks should not be None") + + # Print the first few data points + print(" First 3 data points (time, intensity):") + for j, (time, intensity) in enumerate(peaks): + if j >= 3: + break + print(f" {time:.4f}, {intensity:.2f}") + + def tearDown(self): + """Clean up after tests.""" + self.reader.close() + + +if __name__ == "__main__": + unittest.main(verbosity=3) diff --git a/tests/file_io_indexed_gzip_test.py b/tests/file_io_indexed_gzip_test.py index d9ee11ce..0bfc4bc3 100755 --- a/tests/file_io_indexed_gzip_test.py +++ b/tests/file_io_indexed_gzip_test.py @@ -7,7 +7,8 @@ from pymzml.file_classes.indexedGzip import IndexedGzip import unittest import random -from pymzml.spec import Spectrum, Chromatogram +from pymzml.spec import Spectrum +from pymzml.chromatogram import Chromatogram import struct import re import test_file_paths diff --git a/tests/file_io_standard_gzip_test.py b/tests/file_io_standard_gzip_test.py index c6d8e220..34d0ac24 100755 --- a/tests/file_io_standard_gzip_test.py +++ b/tests/file_io_standard_gzip_test.py @@ -7,7 +7,8 @@ from pymzml.file_classes.standardGzip import StandardGzip import unittest import random -from pymzml.spec import Spectrum, Chromatogram +from pymzml.spec import Spectrum +from pymzml.chromatogram import Chromatogram import re import struct import test_file_paths diff --git a/tests/file_io_standard_mzml_test.py b/tests/file_io_standard_mzml_test.py index dcf56324..570c7c8e 100755 --- a/tests/file_io_standard_mzml_test.py +++ b/tests/file_io_standard_mzml_test.py @@ -6,7 +6,8 @@ import os from pymzml.file_classes.standardMzml import StandardMzml import unittest -from pymzml.spec import Spectrum, Chromatogram +from pymzml.spec import Spectrum +from pymzml.chromatogram import Chromatogram import test_file_paths diff --git a/tests/main_spec_test.py b/tests/main_spec_test.py index 4042460e..edd43274 100755 --- a/tests/main_spec_test.py +++ b/tests/main_spec_test.py @@ -3,7 +3,8 @@ sys.path.append(os.path.abspath(".")) import pymzml.run as run -from pymzml.spec import Spectrum, Chromatogram +from pymzml.spec import Spectrum +from pymzml.chromatogram import Chromatogram import random import statistics as stat import unittest diff --git a/tox.ini b/tox.ini index 58e20857..ad2585c3 100644 --- a/tox.ini +++ b/tox.ini @@ -1,5 +1,5 @@ [tox] -envlist = py37,py38,py39,coverage,example_scripts,docu +envlist = py37,py38,py39,py312,py313,coverage,example_scripts,docu [testenv] deps = @@ -48,6 +48,7 @@ deps = commands = pip install -Ur{toxinidir}/requirements.txt python example_scripts/access_run_info.py + python example_scripts/access_spectra_and_chromatograms.py python example_scripts/compare_spectra.py python example_scripts/extract_ion_chromatogram.py python example_scripts/extreme_values.py