diff --git a/pymzml/run.py b/pymzml/run.py index 2f76456a..a7d46872 100755 --- a/pymzml/run.py +++ b/pymzml/run.py @@ -84,12 +84,15 @@ def __init__( build_index_from_scratch=False, skip_chromatogram=True, index_regex=None, - **kwargs + resolution_dict=None, + mz_resolution_reference=200, + **kwargs, ): """Initialize and set required attributes.""" self.index_regex = index_regex self.build_index_from_scratch = build_index_from_scratch self.skip_chromatogram = skip_chromatogram + self.mz_resolution_reference = mz_resolution_reference if MS_precisions is None: MS_precisions = {} if "MS1_Precision" in kwargs.keys(): @@ -98,11 +101,20 @@ def __init__( MS_precisions[2] = kwargs["MSn_Precision"] MS_precisions[3] = kwargs["MSn_Precision"] + if resolution_dict is None: + resolution_dict = {} + self.resolution_dict = { + None: 70_000, + 1: 70_000, + 2: 35_000, + } + self.resolution_dict.update(resolution_dict) + # Parameters self.ms_precisions = { None: 0.0001, # if spectra does not contain ms_level information - # e.g. UV-chromatograms (thanks pyeguy) then ms_level is - # returned as None + # e.g. UV-chromatograms (thanks pyeguy) then ms_level is + # returned as None 0: 0.0001, 1: 5e-6, 2: 20e-6, @@ -153,7 +165,11 @@ def __next__(self): event, element = next(self.iter, ("END", "END")) if event == "end": if element.tag.endswith("}spectrum"): - spectrum = spec.Spectrum(element) + spectrum = spec.Spectrum( + element=element, + resolution_dict=self.resolution_dict, + mz_resolution_reference=self.mz_resolution_reference, + ) if has_ref_group: spectrum._set_params_from_reference_group( self.info["referenceable_param_group_list_element"] @@ -196,6 +212,7 @@ def __getitem__(self, identifier): except: pass spectrum = self.info["file_object"][identifier] + spectrum._resolution_dict.update(self.resolution_dict) spectrum.calling_instance = self if isinstance(spectrum, spec.Spectrum): spectrum.measured_precision = self.ms_precisions[spectrum.ms_level] @@ -227,7 +244,7 @@ def _open_file(self, path_or_file): path_or_file, self.info["encoding"], build_index_from_scratch=self.build_index_from_scratch, - index_regex=self.index_regex + index_regex=self.index_regex, ) def _guess_encoding(self, mzml_file): diff --git a/pymzml/spec.py b/pymzml/spec.py index 8c36cbc2..50be6048 100755 --- a/pymzml/spec.py +++ b/pymzml/spec.py @@ -41,6 +41,7 @@ # OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE # SOFTWARE. +import copy import math import re import sys @@ -376,7 +377,13 @@ class Spectrum(MS_Spectrum): """ - def __init__(self, element=ElementTree.Element(""), measured_precision=5e-6): + def __init__( + self, + element=ElementTree.Element(""), + measured_precision=5e-6, + resolution_dict=None, + mz_resolution_reference=200, + ): __slots__ = [ "_centroided_peaks", @@ -400,7 +407,14 @@ def __init__(self, element=ElementTree.Element(""), measured_precision=5e-6): "internal_precision" "noise_level_estimate", "selected_precursors", ] - + if resolution_dict is None: + resolution_dict = { + None: 70_000, + 1: 70_000, + 2: 35_000, + } + self._resolution_dict = resolution_dict + self._mz_resolution_reference = mz_resolution_reference self._centroided_peaks = None self._centroided_peaks_sorted_by_i = None self._extreme_values = None @@ -454,6 +468,19 @@ def __del__(self): if self.element: self.element.clear() + def __deepcopy__(self, memo): + cls = self.__class__ + result = cls.__new__(cls) + memo[id(self)] = result + for k, v in self.__dict__.items(): + # skip underscore prefix keys from __dict__ + try: + setattr(result, k, copy.deepcopy(v, memo)) + except: + # skip copying of buffered reader + pass + return result + def __add__(self, other_spec): """ Adds two pymzml spectra @@ -478,13 +505,16 @@ def __add__(self, other_spec): ... s += spec """ - assert isinstance(other_spec, Spectrum) - if self._peak_dict["reprofiled"] is None: - reprofiled = self._reprofile_Peaks() - self.set_peaks(reprofiled, "reprofiled") + if isinstance(other_spec, Spectrum) is False: + raise IOError("Require pymzml Spectrum to add!") + new_spec = copy.deepcopy(self) + other_spec.peaks("centroided") + if new_spec._peak_dict["reprofiled"] is None: + reprofiled = new_spec._reprofile_Peaks() + new_spec.set_peaks(reprofiled, "reprofiled") for mz, i in other_spec.peaks("reprofiled"): - self._peak_dict["reprofiled"][mz] += i - return self + new_spec._peak_dict["reprofiled"][mz] += i + return new_spec def __sub__(self, other_spec): """ @@ -609,7 +639,7 @@ def __getitem__(self, accession): if not accession.startswith("MS:"): try: accession = self.calling_instance.OT[accession]["id"] - except TypeError: + except (TypeError, AttributeError): accession = "---" search_string = './/*[@accession="{0}"]'.format(accession) elements = [] @@ -851,6 +881,17 @@ def ms_level(self): ) # put hardcoded MS tags in minimum.py??? return self._ms_level + @ms_level.setter + def ms_level(self, ms_level): + """ + Property to access the ms level. + + Returns: + ms_level (int): + """ + if isinstance(ms_level, int): + self._ms_level = ms_level + @property def scan_time(self): """ @@ -910,8 +951,12 @@ def selected_precursors(self): selected_precursor_mzs = self.element.findall( ".//*[@accession='MS:1000744']" ) - selected_precursor_is = self.element.findall(".//*[@accession='MS:1000042']") - selected_precursor_cs = self.element.findall(".//*[@accession='MS:1000041']") + selected_precursor_is = self.element.findall( + ".//*[@accession='MS:1000042']" + ) + selected_precursor_cs = self.element.findall( + ".//*[@accession='MS:1000041']" + ) precursors = self.element.findall( "./{ns}precursorList/{ns}precursor".format(ns=self.ns) ) @@ -1043,17 +1088,15 @@ def peaks(self, peak_type): Returns: peaks (list or ndarray): list or numpy array of mz/i tuples or arrays """ + if self._peak_dict["raw"] is None: + mz_params = self._get_encoding_parameters("m/z array") + i_params = self._get_encoding_parameters("intensity array") + mz = self._decode(*mz_params) + i = self._decode(*i_params) + self._peak_dict["raw"] = np.stack((mz, i), axis=-1) + if self._peak_dict[peak_type] is None: - if self._peak_dict["raw"] is None: - mz_params = self._get_encoding_parameters("m/z array") - i_params = self._get_encoding_parameters("intensity array") - mz = self._decode(*mz_params) - i = self._decode(*i_params) - arr = np.stack((mz, i), axis=-1) - self._peak_dict[peak_type] = arr - if peak_type == "raw": - pass - elif peak_type == "centroided": + if peak_type == "centroided": self._peak_dict["centroided"] = self._centroid_peaks() elif peak_type == "reprofiled": self._peak_dict["reprofiled"] = self._reprofile_Peaks() @@ -1062,13 +1105,13 @@ def peaks(self, peak_type): else: raise KeyError - if not isinstance(self._peak_dict[peak_type], np.ndarray): - peaks = self._array(self._peak_dict[peak_type]) - else: - peaks = self._peak_dict[peak_type] if peak_type == "reprofiled": peaks = list(self._peak_dict[peak_type].items()) peaks.sort(key=itemgetter(0)) + elif not isinstance(self._peak_dict[peak_type], np.ndarray): + peaks = self._array(self._peak_dict[peak_type]) + else: + peaks = self._peak_dict[peak_type] return peaks @lru_cache() @@ -1190,7 +1233,9 @@ def _centroid_peaks(self): try: profile_ot = self.calling_instance.OT.name.get("profile spectrum", None) if profile_ot is None: - profile_ot = self.calling_instance.OT.name.get("profile mass spectrum", None) + profile_ot = self.calling_instance.OT.name.get( + "profile mass spectrum", None + ) acc = profile_ot["id"] is_profile = ( True @@ -1208,13 +1253,15 @@ def _centroid_peaks(self): if is_profile is not None or self.reprofiled: # check if spec is a profile spec tmp = [] if self._peak_dict["reprofiled"] is not None: + # reprofile case i_array = [i for mz, i in self.peaks("reprofiled")] mz_array = [mz for mz, i in self.peaks("reprofiled")] else: + # profile spec case i_array = self.i mz_array = self.mz for pos, i in enumerate(i_array[:-1]): - if pos <= 1: + if pos < 1: continue if 0 < i_array[pos - 1] < i > i_array[pos + 1] > 0: x1 = float(mz_array[pos - 1]) @@ -1252,22 +1299,38 @@ def _reprofile_Peaks(self): reprofiled_peaks (list): list of reprofiled m/z, i tuples """ tmp = ddict(int) + ms_level = self.ms_level for mz, i in self.peaks("centroided"): - # Let the measured precision be 2 sigma of the signal width - # When using normal distribution - # FWHM = 2 sqt(2 * ln(2)) sigma = 2.3548 sigma - s = mz * self.measured_precision * 2 # in before 2 - s2 = s * s - floor = mz - 5.0 * s # Gauss curve +- 3 sigma - ceil = mz + 5.0 * s + sigma = ( + mz + / self._resolution_dict.get(ms_level) + * math.sqrt(mz / self._mz_resolution_reference) + / 2.4477 + ) + floor = mz - (3 * 2.4477 * sigma) + ceil = mz + (3 * 2.4477 * sigma) # * vor + but anyways :) ip = self.internal_precision / 4 - # more spacing, i.e. less points describing the gauss curve - # -> faster adding for _ in range(int(round(floor * ip)), int(round(ceil * ip)) + 1): if _ % int(5) == 0: a = float(_) / float(ip) - y = i * math.exp(-1 * ((mz - a) * (mz - a)) / (2 * s2)) + y = i * math.exp(-1 * ((a - mz) * (a- mz)) / (2 * sigma * sigma)) tmp[a] += y + # x_values = + # Let the measured precision be 2 sigma of the signal width + # When using normal distribution + # FWHM = 2 sqt(2 * ln(2)) sigma = 2.3548 sigma + # s = mz * self.measured_precision * 2 # in before 2 + # s2 = s * s + # floor = mz - 5.0 * s # Gauss curve +- 3 sigma + # ceil = mz + 5.0 * s + # ip = self.internal_precision / 4 + # # more spacing, i.e. less points describing the gauss curve + # # -> faster adding + # for _ in range(int(round(floor * ip)), int(round(ceil * ip)) + 1): + # if _ % int(5) == 0: + # a = float(_) / float(ip) + # y = i * math.exp(-1 * ((mz - a) * (mz - a)) / (2 * s2)) + # tmp[a] += y self.reprofiled = True self.set_peaks(None, "centroided") return tmp @@ -1350,7 +1413,8 @@ def remove_noise( noise_level = self.estimated_noise_level(mode=mode) if len(self.peaks("centroided")) != 0: self._peak_dict["centroided"] = self.peaks("centroided")[ - self.peaks("centroided")[:, 1] / noise_level >= signal_to_noise_threshold + self.peaks("centroided")[:, 1] / noise_level + >= signal_to_noise_threshold ] if len(self.peaks("raw")) != 0: self._peak_dict["raw"] = self.peaks("raw")[ @@ -1740,6 +1804,7 @@ def __init__(self, element, measured_precision=5e-6, param=None): self._measured_precision = measured_precision self.element = element self.noise_level_estimate = {} + self._resolution_dict = {} # Property variables self._time = None self._ms_level = None diff --git a/tests/main_spec_test.py b/tests/main_spec_test.py index 1972e66f..593c2bd3 100755 --- a/tests/main_spec_test.py +++ b/tests/main_spec_test.py @@ -169,23 +169,23 @@ def test_div(self): def test_add_specs_to_empty_spec(self): spec1 = Spectrum() spec2 = Spectrum() - spec2.set_peaks([(100, 200)], "raw") - spec1 += spec2 - centroided_mz = spec1.peaks("centroided")[:, 0] - centroided_i = spec1.peaks("centroided")[:, 1] + spec2.set_peaks([(100, 2e6)], "raw") + spec3 = spec1 + spec2 + centroided_mz = spec3.peaks("centroided")[:, 0] + centroided_i = spec3.peaks("centroided")[:, 1] assert np.allclose(centroided_mz, [100], rtol=5e-6) - assert np.allclose(centroided_i, [200], atol=0.002) + assert np.allclose(centroided_i, [2e6], atol=0.002) def test_add_tow_custom_specs(self): spec1 = Spectrum() spec2 = Spectrum() - spec1.set_peaks([(100, 200)], "raw") - spec2.set_peaks([(100, 200), (200, 300)], "raw") - spec1 += spec2 - centroided_mz = spec1.peaks("centroided")[:, 0] - centroided_i = spec1.peaks("centroided")[:, 1] + spec1.set_peaks([(100, 2e6)], "raw") + spec2.set_peaks([(100, 2e6), (200, 3e6)], "raw") + spec3 = spec1 + spec2 + centroided_mz = spec3.peaks("centroided")[:, 0] + centroided_i = spec3.peaks("centroided")[:, 1] assert np.allclose(centroided_mz, [100, 200], rtol=5e-6) - assert np.allclose(centroided_i, [400, 300], atol=0.002) + assert np.allclose(centroided_i, [4e6, 3e6], atol=0.002) def test_average_spectra(self): spec0 = Spectrum() @@ -513,15 +513,35 @@ def test_scan_time(self): self.assertEqual(scan_time, 0.023756566) def test_get_all_arrays_in_spec(self): - assert self.spec.get_all_arrays_in_spec() == ['m/z array', 'intensity array'] + assert self.spec.get_all_arrays_in_spec() == ["m/z array", "intensity array"] def test_get_array(self): # import pdb;pdb.set_trace() - assert (self.spec.mz == self.spec.get_array('m/z array')).all() + assert (self.spec.mz == self.spec.get_array("m/z array")).all() def test_get_tims_tof_ion_mobility(self): assert self.spec.get_tims_tof_ion_mobility() is None + def test_spectrum_set_resolution(self): + res_dict = { + 1: 70_000, + 2: 35_000, + } + spec = Spectrum(resolution_dict=res_dict, mz_resolution_reference=200) + assert spec._mz_resolution_reference == 200 + assert spec._resolution_dict[1] == 70_000 + assert spec._resolution_dict[2] == 35_000 + + def test_spectrum_set_resolution(self): + res_dict = { + 1: 70_000, + 2: 35_000, + } + spec = Spectrum(resolution_dict=res_dict, mz_resolution_reference=200) + spec.set_peaks(np.array([[100, 200], [200, 200]]), "raw") + spec.ms_level = 1 + peaks = spec.peaks("reprofiled") + if __name__ == "__main__": unittest.main(verbosity=3)