<a href="https://colab.research.google.com/github/stavis1/GST_colloquium/blob/main/GST_colloquium.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

#GST Colloquium on Python coding (Built and Presented by the Bob-Mob)

Our hope is that this workshop will provide a useful supplement for the LFSC 507 class on Python coding. The main goal is to provide a reference of useful "tips and tricks" for ways to write faster, cleaner code. Go ahead and save a copy of this worksheet for future reference if you think that will be helpful.

The optimization tips focus on real examples we've run into during our research and should be applicable to a variety of applications. Most of us don't need a super-well optimized Python script, but certain bad habits can be cumbersome to write and slow to run, especially when working with a large volume of data. We've trimmed the data in our examples so that they run quickly, but keep in mind the relative speed increase will generally scale proportionally with the data size. We've also tried to include negative examples that are completely arbitrary and any similarities to coding attempts made by people in our group are entirely coincidental.




### Getting Started
Let's start by getting the colab notebook up and running. We're going to install/import the packages we'll need later as well as defining a fancy function decorator called "time_this" that lets us record the time our operations will take.

In [None]:
#We need to download some example data that is hosted on figshare
#The "!" tells jupyter to pass the line to the bash shell, this allows us to run
#command line tools like pip or wget from within jupyter notebook.
!wget https://figshare.com/ndownloader/files/45443476 -O data.zip
!unzip -o data.zip

#colab comes with most common packages installed already but we want to use a
#few uncommon ones so we install these ourselves.
!pip install brain-isotopic-distribution
!pip install multiprocesspandas
!pip install xlsx2csv

#it is generally recommended best practice to import the packages that come with
#python first before importing third party packages.
from functools import cache
from io import StringIO
from multiprocessing import Pool
from multiprocessing import Manager
import os
import re
import time
import traceback
import warnings

from brainpy import isotopic_variants
import matplotlib.pyplot as plt
from multiprocesspandas import applyparallel
import numpy as np
import pandas as pd
from scipy.optimize import minimize
from scipy.stats import zscore
from scipy.stats import binom
from sklearn.metrics import mean_squared_error
from sortedcontainers import SortedList
from xlsx2csv import Xlsx2csv

#This defines a function decorator, a concept which is used in §1.3.
#It times a function call and prints the elapsed time while suppressing warnings
def time_this(func):
  #the way this works is that time_this() creates and returns a new function
  #that replaces whatever function follows @time_this.
  def inner(*args, **kwargs):
    #this context manager suppresses all warnings. DO NOT DO THIS NORMALLY
    #We have extensively tested this code and data and know that the warnings
    #can be safely ignored for this tutorial but that is not true for most code
    with warnings.catch_warnings():
      warnings.simplefilter("ignore")
      #time.time() gives unix time in units of seconds
      start_time = time.time()
      #the try: ... except: block is a technique for gracefully handling errors.
      #first the code under try: is run and if an exception is raised the code
      #under except: is run.
      try:
        #*args, **kwargs allows arbitrary arguments to be passed to the user
        #defined function.
        result = func(*args, **kwargs)
      except:
        #we want to still time a function call even if it fails so we catch any
        #exception generated and use the traceback package to print the error
        #message
        traceback.print_exc()
        #if func() fails it does not return anything so result is undefined
        #in order to continue our script we need to define result
        result = None
      elapsed_time = time.time() - start_time
    #prepending f to a string allows you to place expressions within braces {}
    #which are evaluated and the result is placed within the string
    print(f'Running {func.__name__} took {elapsed_time} seconds.')
    return (elapsed_time, result)
  #we return the function inner which is then evaluated with the original
  #arguments. Using @time_this is equivalent to running
  #time_this(func)(*args, **kwargs)
  return inner


##1 Data Structures

The way data are stored in memory can have an enormous effect on the speed with which they can be processed for particular tasks. Choosing the correct data structure can be the difference between a runtime of hours or minutes in some situations. Here we present two simple and broadly useful examples of this, but there are many specialized data structures that can dramatically improve performance on specific tasks. If you have a difficult performance problem it is well worth doing some googling to see if a optimized data structure or algorithm exists for your task.

Python has a variety of data types (objects). Some are single points of data like strings or integers, and other combine groups of data. Lists are a very common data stucture that are simply an ordered collection of objects. Accessing data from lists, however, can be quite slow if you don't already know the position of the object of interest. Two options that often allow for more efficient data lookups include sorted lists and hashmaps (dictionaries).

###1.1 Sorted lists and Binary searches

Suppose, you need to find the closest value in a list to some query. Ordinarily the only way to do this is to iterate through each object in the list, compare it to the query, and store the values close to the query. This is a slow process, especially with large lists. Instead we can use an efficient binary search strategy using sorted lists. A sorted list, as the name suggests, is simply a list sorted by value for numeric lists or alphabetically for strings. The binary search algorithm starts by comparing the query against the middle value in the sorted list. If your query is larger than this value then we know that the target value is in the larger half of the list. We can then start the process over using just the larger half, meaning that we compare the query against the middle value in this new list and infer from that what half of the new list our target value must be in. This process continues until either an exact match to the query is found or two adjacent numbers are found which bracket the query, meaning one is smaller and one is larger.

For example, suppose you needed to find the values closest to 7.66 from a list of numbers:

<figure>
<left>
<img src='https://figshare.com/ndownloader/files/45442030' width="600"/>
</figure>

With this approach we can efficiently do a variety of things including:
*  find all values within a range
*  find the closest value to a query
*  insert a value into a sorted list
*  find all values larger (or smaller) than some query

The sortedcontainers package in Python implements several sorted versions of data structures which allow for this kind of efficient lookup. It is worth noting that under the hood sortedcontainers is implemented with a more complex list-of-lists data structure that makes some operations faster, but within the lists values are found using binary search.



####1.1.1 Example: fuzzy matching

One application of binary searches we found useful is in finding the intersection between two sets of data. MS-DIAL is a lipidomics untargeted analysis program that will attempt to identify lipids from LC-MS data. Suppose we wanted to check what lipid identifications that were the same when running two different versions of MS-DIAL. To do that we used sorted lists.

In [None]:
#The export from MS-DIAL has some metadata in the first 4 rows that we need to
#skip on import.
MSD492 = pd.read_csv('MS-DIAL_4-9-2.tsv', sep = '\t', skiprows = 4)
MSD46 = pd.read_csv('MS-DIAL_4-6.tsv', sep = '\t', skiprows = 4)

First, let's take a look at what data is provided by MS-DIAL. We will be using the retention time "Average RT(min)" and the mass-to-charge ratio "Average Mz" to find chromatographic features that are likely the same between processing runs and then we will be capturing the lipid name assigned to the feature.

In [None]:
print(MSD492.shape)
MSD492.head()

In [None]:
#We define the tolerances for how close an entry has to be in order to be
#considered a match
#RT stands for retention time which is how long the analyte took to elute off
#the chromatography column
εrt = 1
#m/z means mass to charge ratio which is what mass spectrometers measure
εmz = 0.01

#This approach uses hashmap and sorted data strucutres wherever possible to
#speed up data access operations
@time_this
def sortedlist_search(query, target):
  query = query.copy()
  #We need to first set up the data structures. We use the dataframe index to
  #keep track of what lipid each element refers to. In order to keep that info
  #associated with the varaible of interest (m/z or RT) we make sorted lists out
  #of (variable, index) tuples. By default any iterable will be sorted by its
  #first element.
  rt_idx = SortedList(zip(target['Average Rt(min)'], target.index))
  mz_idx = SortedList(zip(target['Average Mz'], target.index))
  #We also set up a dictionary to map indices to metabolite names.
  idx_name = {i:n for i,n in zip(target.index, target['Metabolite name'])}

  #We want to do this mapping for all elements in the query dataframe so we will
  #apply this function to each RT,m/z pair
  def find_hits(rt, mz):
    #irange returns all elements within a range.
    #We make sets of only the indices of those elements.
    #Because we put (variable, index) tuples into the sorted lists
    rt_hits = set(h[1] for h in rt_idx.irange((rt-εrt,), (rt+εrt,)))
    mz_hits = set(h[1] for h in mz_idx.irange((mz-εmz,), (mz+εmz,)))
    #Because sets are hashmap data structures, checking whether an element is in
    #the set is fast. This allows sets to do intersection operations quickly.
    hits = rt_hits.intersection(mz_hits)
    #A single query element may map to multiple target entries so we return all
    #of the hits as a pipe separated list.
    names = '|'.join(idx_name[h] for h in hits)
    return names

  #As we will discuss in §2.5 applying a function within a list comprehension is
  #the best way to iterate an arbitrary user defined function over a dataframe
  query['matches'] = [find_hits(rt,mz) for rt,mz in zip(query['Average Rt(min)'], query['Average Mz'])]
  return query

#This approach only uses subsetting pandas data frames
@time_this
def linear_search(query, target):
  query = query.copy()

  #the structure here is the same as above where we define a lookup function and
  #then apply that to all RT, m/z pairs
  def find_hits(rt, mz):
    #We don't want to re-calculate the upper and lower bounds so we'll save them
    #as variables here
    rt_low = rt-εrt
    rt_high = rt+εrt
    #Because we don't have a sorted data structure we have to check every single
    #row of the dataframe to see if it is in the RT window
    subset = target[[r > rt_low and r < rt_high for r in target['Average Rt(min)']]]
    #.shape[0] is the number of rows. If it is 0, meaning no hits then it will
    #be computed as false and we will skip further processing steps
    if subset.shape[0]:
      mz_low = mz-εmz
      mz_high = mz+εmz
      subset = subset[[m > mz_low and m < mz_high for m in subset['Average Mz']]]
      if subset.shape[0]:
        names = '|'.join(subset['Metabolite name'])
      else:
        names = ''
    else:
      names = ''
    return names

  query['matches'] = [find_hits(rt,mz) for rt,mz in zip(query['Average Rt(min)'], query['Average Mz'])]
  return query

In [None]:
slist_time, slist_results = sortedlist_search(MSD492, MSD46)
linear_time, linear_results = linear_search(MSD492, MSD46)
ratio = linear_time/slist_time
print(f'Binary search was {ratio} times faster than the linear search')

Now we can see how well the matching process worked

In [None]:
print(slist_results[['Metabolite name', 'matches']])

###1.2 Hashmaps

Hashmaps are data structures that store key-value pairs in a way that allows you to very quickly look up a value (some data) based on a key (a label for the data). Dictionaries in python are hashmaps which can store any object as a value and can use any hashable object as a key. Sets are specialized hashmaps which only store true or false as values. These can be used to efficiently look up whether or not a key is in the set. What does it mean for an object to be hashable? Essentially, the object has to be able to be converted into a number. This can be easily done for any combination of text/numbers. For example, python's default hash of 42 is 42, while the hash of "GST colloquium" is 1233321040512913595


More precisely, a hash is a function that maps objects of arbitrary size and complexity to a finite range of numbers. In the context of a hashmap data structure the finite range represents locations in memory and the hash function calculates where in memory a value is stored based on its key. This means that the time it takes to find a value is only dependent on the time it takes to calculate the hash of a key. This contrasts with looking up values in a list which requires iterating over each item in the list until you find what you're looking for, an operation that becomes more expensive on average as the list grows in size.


<img align="right" src="https://figshare.com/ndownloader/files/45442087" alt="Figure 1. Hashmap Data Call" title="Figure 1: Accessing Data in a Hashmap" width="440" hspace="20" vspace="10"> The tradeoff that you have to make with hashmaps is that there are some cases in which two keys have the same hash so their values must be stored in a list and iterated over for lookup. To minimize this problem the range of memory locations needs to be significantly larger than the number of values stored. This means that hash maps will generally be larger in memory than more compact data structures like lists. This is not as much of a problem as it first appears because Python does not store the actual value at the memory location calculated by the hash function. Instead the memory location it calculates is the index of a list that stores memory addresses which point to where the values are actually stored (see figure). This means that each block of memory indexed by the hashmap can be very small while still being able to point to values of arbitrary size. Therefore the memory overhead is only likely to be siginficant if you're storing a large number of small things.





**TLDR:** Storing data in hashmaps (like a python dictionary) can significantly improve data access speed, thereby speeding up the whole script.


###1.3: Memoization

Repeatedly calling a function on the same input is an unnecessary waste of computational resources. Ideally, if we wish to run a function on a collection of inputs that have repeats what we would first do is collect only the unique inputs, run the function on those and then use a dictionary to map inputs to outputs for the whole collection. The code for doing this is somewhat tedious to write and requires the creation of several intermediate variables. A more elegant solution is to have the function itself build the dictionary of inputs and outputs as it is run. This way, if it sees a duplicate input it can skip evaluating the function and instead immediately return the saved output. This process is called <u>memoization</u>.

Python provides an easy way of doing this with the @cache decorator from the built-in functools package. Decorators are a way of adding extra functionality to python functions. For example @time_this is a decorator defined and used in this tutorial which, when applied to a function, times how long it takes to run while handling errors and warnings. In this case all we need to do in order to memoize a function is to apply @cache to the function definition.

The implementation of @cache uses a dictionary to store the input-output mapping. This means that inputs must be hashable, i.e. capable of being used as keys in a dictionary. Mutable objects, meaning objects that can be changed after being created like lists, pandas dataframes, or numpy arrays are not hashable and cannot be used as inputs. On the other hand tuples, strings, and frozensets can all be used as inputs. This means that some amount of creative data conversion may be necessary to use memoization.

**TLDR:** Memoization stores the output of a function in a dictionary for fast evaluation of duplicate inputs.

####1.3.1 Example: isotope abundance calculations

Naturally occuring rare heavy isotopes (atoms of the same element but with different masses) mean that biological molecules consist of different isotopic forms with the same chemical properties, but different masses. The relative abundance of these heavy versions is dependent on the chemical formula. This means that we can compare the observed isotopolog abundances against what is expected from the natural frequency of heavy isotopes as a quality control check for metabolite identifications. However, this is somewhat expensive to compute and many metabolites have the same chemical formula so this is an ideal use case for memoization.

In [None]:
metabolites = pd.read_csv('metabolomics_formula_fit.tsv', sep = '\t')

We will take a look at what data is provided by Compound Discoverer. The formula column will be used to calculate the expected isotope distribution. In order to actually do the comparison we would have to extract the observed isotopic packet from the raw data. This is unnecessary for demonstrating @cache so we won't show that step.

In [None]:
print(metabolites.shape)
metabolites.head()

In [None]:
#This is what we could do to avoid repeated function evaluations if we didn't
#have the ability to memoize a funciton
@time_this
def unique_get_isopacket(metabolites):
  #This function will be identical for all versions of this task.
  #It takes a chemical formula as a string of text and returns an expected
  #isotopic distribution.
  def nocache_exp_isopacket(formula):
    #re is the regex package it is used for finding patterns in text
    #this line finds element symbols and numbers after them (e.g. C18)
    #then makes a dictionary of symbol:number
    formula = {e:int(n) for e,n in re.findall(r'([A-Z][a-z]?)(\d+)', formula)}
    #isotopic_variants uses a computationally intensive recursive algorithm to
    #predict isotope packets from a chemical formula. This is the slowest part
    #of the code which we wish to avoid doing repeatedly
    packet = isotopic_variants(formula, npeaks = 3)
    #isotopic_variants predicts both m/z and intensity values but we only want
    #the latter so we have to extract that information here
    intensity = [p.intensity for p in packet]
    return intensity
  #set is a hashmap data structure which will only hold unique elements
  formulae = set(metabolites['Formula'])
  #we now apply nocache_exp_isopacket to these unique elements and make a
  #dictionary mapping formulae to isotopic packets
  isopacket_map = {f:nocache_exp_isopacket(f) for f in formulae}
  #then we can use this dictionary to create a list of the expected isotopic
  #packets in order with repeats for further processing.
  intensities = [isopacket_map[f] for f in metabolites['Formula']]

#This verison uses @cache to easily memoize the function. The basic processing
#is the same but instead of explicitly building intermediate data structures
#to hold the unique elements and input -> output map we let @cache handle
#repeated elements in an efficient way
@time_this
def cache_get_isopacket(metabolites):
  @cache #this line is the only thing you need to do to memoize a function
  def cache_exp_isopacket(formula):
    formula = {e:int(n) for e,n in re.findall(r'([A-Z][a-z]?)(\d+)', formula)}
    packet = isotopic_variants(formula, npeaks = 3)
    intensity = [p.intensity for p in packet]
    return intensity

  #note that we can just apply the function directly to the collection of
  #repetative inputs
  intensities = [cache_exp_isopacket(f) for f in metabolites['Formula']]
  return intensities

#This function works exactly the same way as cache_get_isopacket but without
#using @cache for memoization
@time_this
def nocache_get_isopacket(metabolites):
  def nocache_exp_isopacket(formula):
    formula = {e:int(n) for e,n in re.findall(r'([A-Z][a-z]?)(\d+)', formula)}
    packet = isotopic_variants(formula, npeaks = 3)
    intensity = [p.intensity for p in packet]
    return intensity

  intensities = [nocache_exp_isopacket(f) for f in metabolites['Formula']]
  return intensities


In [None]:
unique_time, unique_intensities = unique_get_isopacket(metabolites)
cache_time, cache_intensities = cache_get_isopacket(metabolites)
nocache_time, nocache_intensities = nocache_get_isopacket(metabolites)
unique_ratio = nocache_time/unique_time
cache_ratio = nocache_time/cache_time
print(f'The unique values approach was {unique_ratio} times faster than the unmemoized approach')
print(f'The memoized approach was {cache_ratio} times faster than the unmemoized approach')

The output is a list of intensity distributions, one per metabolite.

In [None]:
cache_intensities[:20]

##2 Pandas and Numpy

###2.1 Faster .xlsx import
Tabular data is often parsed in Python as Pandas dataframes. The Pandas package has a built-in reader for Excel files (.xlsx), but this is a rather slow process, especially for large files. This is because .xlsx files are actually zipped folders of mostly XML files which need to be decompressed before going through the somewhat inefficient process of XML parsing. This setup gives Excel the flexibility to store images, graphs, formatting, and functions. However, if all you want is a table of data, this flexibility comes at significant performance costs. It is therefore generally preferable to store tabular data as a text file (.csv or .tsv) unless you absolutely need to store these extra types of information.

If you have no choice but to use large .xlsx files there is still a faster way to import the data as a pandas dataframe. The xlsx2csv package is sigificantly faster at parsing .xlsx files than pandas. So, what we can do is parse the .xlsx file to an in-memory .csv (meaning that the file is stored only in RAM and not written to our hard drive) then use the very efficient pandas .csv parser to get the data into a dataframe. This trick comes from [stackoverflow](https://stackoverflow.com/questions/28766133/faster-way-to-read-excel-files-to-pandas-dataframe#comment136215400_28769537).


####2.1.1 Example: metabolomics data
metabolomics_data.xlsx is an example of the files produced during metabolomics experiments in our lab. It is an export of unfiltered search results from Compound Discoverer which we use for downstream analysis.

For practicality purposes we have reduced the size of the .xlsx file for this demonstration significantly. In its original form (256,547 rows by 506 columns) pandas .read_excel() took 16 minutes to load the file, the Xlsx2csv trick took 7 minutes, and reading in the .tsv version of the file with pandas .read_csv() took 16 seconds. As you can see below, the shortened file is parsed by .read_excel() and Xlsx2csv about 10x faster. This remarkable increase in speed between the original document and its current version is not merely due to cutting the number of rows in half. Rather, there was significant formatting information included in the original export that was stripped during our file processing (converting .xlsx to .tsv back to .xlsx). We expect that these improvements are primarily due to the removed .xlsx formatting having massively complicated the XML representation of the data. This can be seen in the relatively modest improvement in load time for the tsv reader as compared to the impressive improvements for the two direct .xlsx readers.

**TLDR:** Here's a faster way to load Excel files, but the best practice is to just use .csv or .tsv files instead of .xlsx whenever possible.

In [None]:
#The simple .tsv import
@time_this
def tsv_import(file_path):
  df = pd.read_csv(file_path, sep = '\t')
  return df

#The in-memory .csv conversion trick described above
@time_this
def fast_import(file_path):
    #buffer is the in-memory variable in which the converted .csv is placed
    buffer = StringIO()
    #here we do the actual .xlsx parsing and conversion to .csv
    #note that skip_hidden_rows is true by default unlike other .xlsx readers
    Xlsx2csv(file_path,
             outputencoding="utf-8",
             skip_hidden_rows = False).convert(buffer,sheetid=0)
    buffer.seek(0)
    #now we import the .csv with pandas exactly as if it were a regular file
    df = pd.read_csv(buffer, low_memory=False, skiprows=1)
    return df

#the pandas native approach to excel import
@time_this
def pandas_import(file_path):
  df = pd.read_excel(file_path, 0)
  return df

In [None]:
tsv_time, tsv_df = tsv_import('metabolomics_data.tsv')
fast_time, fast_df = fast_import('metabolomics_data.xlsx')
pandas_time, pandas_df = pandas_import('metabolomics_data.xlsx')
tsv_ratio = pandas_time/tsv_time
fast_ratio = pandas_time/fast_time
print(f'Importing a .tsv was {tsv_ratio} times faster than .read_excel()')
print(f'The Xlsx2csv trick was {fast_ratio} times faster than .read_excel()')

###2.2 Using vectorized operations
Numpy carries out most data processing operations using highly optimized pre-compiled code that is orders of magnitude faster than anything Python can natively accomplish. The optimizations include single instruction, multiple data (SIMD) steps in which a single operation that would normally be applied serially to values in an array can be applied simultaniously to blocks of 4-8 values. When doing mathematical operations it is therefore best to let numpy handle as much of the processing as possible. This means that we should avoid writing explicit loops or accessing individual values within numpy arrays and instead perform operations on whole arrays at once using numpy functions.

This strategy holds true for other packages like Pandas, Scipy, and Scikit-Learn as well because Numpy provides most of the underlying data processing functionality for these packages.  

The proteomics dataset we use here comes from [this paper](https://bmcmicrobiol.biomedcentral.com/articles/10.1186/s12866-021-02370-4).

####2.2.1 Example: normalization
Proteomic normalization is important to minimize non-biological variation and ensure reliability of your dataset. Normalization is used to minimize variation coming from factors such as sample preparation inconsistencies, injection volume errors, and fluctuations in instrument sensitivity, which can happen with extended use (i.e. sensitivity drift with instrument cleanliness, or column age). By accounting for these variations, normalization allows for the comparison of biological variances rather than technical ones. Normalization creates a baseline from which conditions can be compared in a more robust way.


In [None]:
#The data processing produces one file per condition that we read into a list
#of dataframes
prot_files = [f for f in os.listdir() if f.endswith('Proteins.txt')]
dataframes = [pd.read_csv(f, sep = '\t') for f in prot_files]
key_cols = ['Accession','Description','MW [kDa]','# AAs']
#We need to merge these data into a single dataframe for further processing
proteins = dataframes.pop()
for df in dataframes:
  #on = key_cols is the list of columns that we use as a key for which rows to merge
  #how = 'outer' means we use the union of these keys
  proteins = proteins.merge(df, how = 'outer', on = key_cols)
#we want a list of the columns containing protein abundances for processing
abundance_cols = sorted([c for c in proteins.columns if c.startswith('Abundance')], key = lambda x: x[-1:1])
#For this example we only want to work with the key and abundance columns
#so we get rid of all the other data in this dataframe.
#The * operator unpacks an interable so [1,*[2,3,4]] becomes [1,2,3,4].
proteins = proteins[[*key_cols, *abundance_cols]]

Lets take a look at our input data:

In [None]:
print(proteins.shape)
proteins.head()

And we'll visualize the batch effects.

In [None]:
def plot_proteins(proteins):
  fig, ax = plt.subplots()
  data = np.log10(proteins.copy()[abundance_cols])
  data = [sorted([v for v in data[c] if np.isfinite(v)]) for c in abundance_cols]
  ax.violinplot(data, showmeans = True)
  ax.set_ylabel('Log Intensity')
  ax.set_xlabel('Sample')
  ax.set_xticks(range(1,len(abundance_cols)+1),
                abundance_cols, rotation = 90)

plot_proteins(proteins)

In [None]:
@time_this
def vector_norm(values):
  #processing is done in the log space because intensity is log-normally
  #distributed across proteins
  values = np.log(values)
  #we calculate the mean and standard deviation of all values in all samples
  gmean = np.nanmean(values)
  gstd = np.nanstd(values)
  #intensities are z-scored by column (axis = 0), which corresponds to files
  values = zscore(values, axis = 0, nan_policy = 'omit')
  #we un-do the z-scoring with the grand mean and standard deviation so that
  #all files now have the same log-mean and log-standard deviation
  values = (values*gstd)+gmean
  #the data are taken back out of log space for downstream processing
  values = np.exp(values)
  return values

@time_this
def loop_norm(values):
  #this is how you do a nested loop within a list comprehension
  logvalues = [np.log(value) for sample in values for value in sample]
  #as above we pre-calculate variables that depend on all values
  gmean = np.nanmean(logvalues)
  gstd = np.nanstd(logvalues)
  #in this verison we construct a list of lists
  new_values = []
  for i in range(values.shape[1]):
    sample = values[:,i]
    new_sample = []
    #in order to do the normalization elementwise we have to precalculate
    #sample-specific values
    smean = np.nanmean(sample)
    sstd = np.nanstd(sample)
    for value in sample:
      #all the processing from here out is the same but done on one element
      #at a time
      value = np.log(value)
      zscore = (value-smean)/sstd
      value = (zscore*gstd)+gmean
      value = np.exp(value)
      new_sample.append(value)
    new_values.append(new_sample)
  #the .T is a transpose function which swaps rows for columns
  return np.array(new_values).T


In [None]:
vec_proteins = proteins.copy()
vec_time, vec_proteins[abundance_cols] = vector_norm(vec_proteins[abundance_cols].to_numpy())
loop_proteins = proteins.copy()
loop_time, loop_proteins[abundance_cols] = loop_norm(loop_proteins[abundance_cols].to_numpy())
ratio = loop_time/vec_time
print(f'Vectorized operation was {ratio} times faster than explicit python loops')

Now we can take a look at what this procedure did to our data. The observed values now have a common mean and standard deviation.

In [None]:
plot_proteins(vec_proteins)

###2.3 Building and accessing dataframes

Pandas dataframes are the most popular representation of data tables in the python ecosystem. Athough other dataframe implementations exist like polars and pyspark which have some performance advantages over pandas we will focus our next two examples on pandas dataframes because of their popularity. We will look at a couple of considerations to bear in mind when working with pandas dataframes.


####2.3.1 Example: protein summary statistics
The goal of this example is to add two columns to a dataframe which respectively hold the mean and standard deviation of the numerical data.

It is often tempting to iterate over the values in a dataframe for processing. As we learned in the previous section pandas uses numpy under the hood for data storage and processing so using vectorized operations wherever possible is by far the best choice for both performance and readablility. When this is not possible, often because you need to apply some user defined function across rows in a dataframe then it does become necessary to iterate over rows. Pandas provides a couple of tools to do this including .apply() and .iterrows(). Like much of pandas these tools are very flexible but they pay for that flexibility with performance overhead. A faster solution is usually to apply your function using a list comprehension, which is a compact method of defining lists using loops in python that avoids the overhead associated with repeated calls to list.append(). What should at all costs be avoided is repeatedly accessing and/or modifying individual values in a dataframe using .at(), .loc[] or .iloc[].  

In [None]:
#This is the ideal approach for data processing if what you are doing can be
#done with numpy. axis = 1 means that an operation is applied rowwise while
#axis = 0 is applied columnwise
@time_this
def numpy_stats(df):
  df['mean'] = np.nanmean(df[abundance_cols], axis = 1)
  df['std'] = np.nanstd(df[abundance_cols], axis = 1)
  return df

#If the function can't be done with numpy (dictionary lookups are a common
#example of this class of tasks) then this strategy is probably your best bet.
#What we do here is zip together the columns we care about processing and then
#apply a function over them within a list comprehension. Remember that zip
#collects elements into tuples e.g. zip([1,2,3],[4,5,6]) -> ((1,4),(2,5),(3,6)).
#We turn the zip into a list so that it can be used twice.
@time_this
def listcomp_stats(df):
  abundance_iter = list(zip(*[df[c] for c in abundance_cols]))
  df['mean'] = [np.nanmean(row) for row in abundance_iter]
  df['std'] = [np.nanstd(row) for row in abundance_iter]
  return df

#Iterrows behaves similarly to the above code but is generally slower. It is
#somewhat easier to read than the zip(*[df[c] for c in cols]) ideom above so for
#uses where performance is not a concern it could be preferred.
@time_this
def iterrows_stats(df):
  means = []
  stds = []
  #iterrows gives both the index and row at each iteration
  for i,row in df[abundance_cols].iterrows():
    #sequential calls to .append() are also slower than using list comprehension
    means.append(np.nanmean(row))
    stds.append(np.nanstd(row))
  df['mean'] = means
  df['std'] = stds
  return df

#Here we construct empty columns in the data frame and fill them element by
#element using df.at to access individual values.
#Accessing, and especially modifying, individual entries in a dataframe is
#extremely slow and should be done only as a very last resort. If you are
#filling or modifying an entire column or row then there is never a need to use
#this approach.
@time_this
def at_stats(df):
  #these two lines initialize empty columns
  df['mean'] = ''
  df['std'] = ''
  for i,row in df[abundance_cols].iterrows():
    df.at[i,'mean'] = np.nanmean(row)
    df.at[i,'std'] = np.nanstd(row)
  return df


In [None]:
numpy_time, numpy_df = numpy_stats(proteins.copy())
listcomp_time, listcomp_df = listcomp_stats(proteins.copy())
iterrows_time, iterrows_df = iterrows_stats(proteins.copy())
at_time, at_df = at_stats(proteins.copy())
np_ratio = listcomp_time/numpy_time
iter_ratio = iterrows_time/listcomp_time
at_ratio = at_time/listcomp_time
print(f'Vectorized operation was {np_ratio} times faster than applying a function in a list comprehension')
print(f'List comprehensions were {iter_ratio} times faster than terrows')
print(f'List comprehensions were {at_ratio} times faster than df.at')

We can now see that what this did was add two columns to the dataframe.

In [None]:
#before
proteins.head()

In [None]:
#after
numpy_df.head()

####2.3.2 Example: Wide to long conversion
The goal for this section is to show how to construct a dataframe from multiple component pieces.

Instruments and mass spec software pipelines often export data in a 'wide' format where all data about a specific analyte are found in a single row of a table and multiple replicate observations means multiple columns. In many cases, especially in the context of statisitcal packages, it is useful to represent data in a 'long' format (in the R ecosystem this is often called the tidy format). The long format has one row for each numerical observation with information about the analyte and sample for that observation being stored in metadata columns. Pandas implements a very fast approach to doing this but the DIY approach to the problem would be to build up the long dataframe by iterating over replicate columns. When building a dataframe iteratively by pieces it is tempting to make repeated calls to pd.concat() or the now depreciated df.append(). There is a significant amount of overhead to these calls which makes that approach slow. If you need to build a dataframe by concatenating component pieces the best choice is to use pd.concat() once on a list of all the pieces you wish to include.

A (dis-)honorable mention goes to the manual method of wide to long conversion: lots and lots of copy-pasting in Excel. Please be smart. Don't do this.

In [None]:
#Pandas implements a function specifically for this task so it should be used in
#this context.
@time_this
def melt(df):
  df = df.melt(id_vars = key_cols, value_vars = abundance_cols)
  return df

Frequently dataframes are built from several component pieces in a context where Pandas does not have a specific function for the task. This often takes the form of repeatedly appending new rows to a growing dataframe. We will use this task to demonstrate the advantage of making only a single call to pd.concat() in these situations.

In [None]:
#Here we make a list of dataframes, one per analyte column, that each have the
#metadata columns and a single analyte intensity column. Finally this list is
#concatenated together with a single call to pd.concat()
@time_this
def loop_melt(df):
  dfs = [ ]
  for a in abundance_cols:
    tmp_df = df[[*key_cols,a]].copy()
    #we have to rename the analyte column so that they all end up as a single
    #column in the concatenated dataframe
    tmp_df.columns = [*key_cols,'Abundance']
    #to keep track of what analyte each row represents we make a list of the
    #column name with as many elements as the dataframe has rows and add that
    #as a column
    tmp_df['sample'] = [a]*df.shape[0]
    dfs.append(tmp_df)
  df = pd.concat(dfs)
  return df

#this works the same way as loop_melt() but at each iteration the temp_df is
#concatenated to a growing dataframe. These repeated calls to pd.concat() are
#expensive and result in noticably poorer performance.
@time_this
def accumulator_melt(df):
  new_df = pd.DataFrame()
  for a in abundance_cols:
    tmp_df = df[[*key_cols,a]].copy()
    tmp_df.columns = [*key_cols,'Abundance']
    tmp_df['sample'] = [a]*df.shape[0]
    new_df = pd.concat([new_df, tmp_df])
  return new_df

In [None]:
melt_time, melt_df = melt(proteins.copy())
loop_time, loop_melt_df = loop_melt(proteins.copy())
accum_time, accum_melt_df = accumulator_melt(proteins.copy())
ratio = accum_time/loop_time
print(f'A single call to pd.concat() was {ratio} times faster than repeated calls')

We can now take a look at what this accomplished.

In [None]:
#before
print(proteins.shape)
proteins.head()

In [None]:
#after
print(loop_melt_df.shape)
loop_melt_df.head()

##3 Black Box Optimizers

###3.1 scipy.optimize
Optimizers are useful tools for a variety of tasks. If you ever find yourself doing a "guess and check" procedure, consider automating it with an optimizer function. The scipy package contains a variety of off-the-shelf black box optimizers that are easily applicable to a variety of situations. All you need is a way of numerically summarizing task "success". This is accomplished with a loss function (function to calculate error). The optimize function will start with an initial guess you provide and iteratively adjust the value searching for an error minimum.

**Possible Pitfall:** If your initial guess is too far off, the scipy optimizer can give pathological results, without nessesarily throwing an error.

####3.1.1 Example: isotopic packet fitting
In one Hettich-lab project, it was nessesary to estimate the level of lipid deuteration by fitting a binomial distribution to isotope abundance data. Given the setup of the experiment, the binomial probability would correspond to the deuteration percent in the lipid. Someone whose name is definitely **not** Matthew built an excel spreadsheet to generate binomial distributions from a given probability. Matthew--\*ahem\*--this nameless person proceeded to generate binomial distributions from a variety of probabilities, plot them against the observed data, and generate new probabilites based on the perceived goodness of fit.

This can be done more precisely and much faster using scipy.optimize.minimize. Basically the data are fed into minimize() which then changes the binomial probablility a bunch of times until it identifies a minimum point. In this case, we are calculating the root-mean squared error in our loss function.

In [None]:
# --- Most of these functions are involved in the isotope distribution calculation and not the actual fitting procedure
      # Jump to 'optimize' to see the actual optimization function

def deu_binom(totalhydrogen, pD, nonexH = 3): #Generate binomial distribution; need to adjust nonexH based on how many hydrogen in the formula are expected to back exchange with the solvent and be unlabelable
    lit = []
    for n in range(totalhydrogen-nonexH+1):
        lit.append(binom.pmf(n, totalhydrogen-nonexH, pD)) #Calculates a binomal probability distribution given a number of "observations" (hydrogen) and a "success" (deuterium) probability
    arr = np.array(lit)
    return arr

@cache
def natisodist(formula): # Generate natural isotope distribution for convolution with the deuterium distribution; uses memoize to potentially speed processing
    chemform = formula.strip(" ") #remove any spaces from the formula that may be present
    parsed = re.findall(r'([A-Z][a-z]?)(\d*)',chemform) #parses the chemical formulae
    composition = {p[0]:int(p[1]) if p[1] else 1 for p in parsed} #forms a dictionary of the numbers of each element
    nH = composition.pop('H')
    theoretical_isotopic_cluster = isotopic_variants(composition, npeaks=5, charge=1) #calculates the natural isotope distribution of the non-hydrogen elements
    array = np.empty(0)
    for peak in theoretical_isotopic_cluster:
        array = np.append(array, peak.intensity)
    return (nH, array)

def natcor_deudist(formula, pD, scaled = True): #Combine (convolute) natural isotope distribution with deuterium distibution
    pD =  pD[0] if hasattr(pD, '__len__') else pD
    nH, natdist = natisodist(formula) #given a formula calculate the natural isotope distribution and get the total number of hydrogen
    deudist = deu_binom(nH, pD) #calcuate the deuterium distribution given the number of hydrogen and the pD
    totdist = np.convolve(natdist, deudist) #Convolves the natural isotope distribution with the deuterium distribution
    if scaled:
        totdist = totdist/max(list(totdist))
    return(totdist)


def error(pD,formula,actualdata):
    '''
    calculate expected dist from elements given using above function.
    calculate RMS error from actualdata as compared to calculated data.
    '''
    expecteddata = natcor_deudist(formula,pD)
    actualdataext = np.append(actualdata, [0,0,0,0,0,0]) # Add extra zeros to the end of the observed data to avoid potential length mismatch with theoretical distribution
    actualdatatrim = actualdataext[:len(expecteddata)] # Trim to appropriate length
    return(mean_squared_error(actualdatatrim, expecteddata, squared=False)) #Calcuate RMSE between observed data and the calculated distribution

def optimize(pD, formula, actualdata):
    '''
    Use error function for least squared optimization of pD.
    Takes RMS error calculated by err and varies pD to minimize RMSE
    Returns optimization output
    Needs pD to be in the ballpark of the best value, otherwise behaviour can be weird
    Assumes Carbon, Hydrogen, Oxygen, Nitrogen, and actualdata are defined prior to using optimize
    '''
    results = minimize(error,
                       x0 = [pD], # This defines the initial guess
                       bounds = [[0,1]], # minimize allows you to set bounds on the range of x values to be tried
                       method = 'Nelder-Mead', # There are several different methods available. Some amount of testing can be required to find the method that works best with your specific data
                       args=(formula, actualdata)) # These are extra arguments not used by minimize and instead passed on to the error function
    param = results.x
    return(results)

@time_this
def deuterium_fitting(df):
    int_cols = [*df.columns[7:]]
    int_iter = list(zip(*[df[c] for c in int_cols]))
    pDs = list(df["pD"])
    formula = list(df["Formula"])
    test = [optimize(pD, form, row) for row, pD, form in zip(int_iter, pDs, formula)]
    maxiso = int(str(df.iloc[:,-1].name)[1:])
    df["BestFit pD"] = [t['x'][0] for t in test]
    df["RMSE"] = [t['fun'] for t in test]
    df["fun termination"] = [t['message'] for t in test]
    def exp_dist(formula, x):
        dist = natcor_deudist(formula, x)
        dist = np.concatenate((dist,np.zeros(maxiso - len(dist))))
        return dist
    dists = np.array([exp_dist(f, t['x'][0]) for f,t in zip(formula, test)])
    dists = pd.DataFrame(dists, columns= [f'isotope_{i}' for i in range(dists.shape[1])])
    df = pd.concat([df,dists], axis = 1)
    return df



In [None]:
df = pd.read_csv("DL_Example_3-25-24.csv", sep=',', encoding="UTF-8") #Load data

df_time, dfproc = deuterium_fitting(df) #Process data using above code

# Plot example data. One of the fit lipids is selected and its deuterium distribution plotted
data = dfproc.loc[103,"M0":"M75"]
bestfit = dfproc.loc[103,"isotope_0":"isotope_75"]
initial_guess = natcor_deudist(dfproc.loc[103,"Formula"], dfproc.loc[103,"pD"])
x = range(76)
fig, axes = plt.subplots(1,2,figsize = (8,3.04), sharey = True)
axes[0].plot(x,data, color = "#003F5C", label = "Observed Distribution")
axes[0].plot(x,initial_guess, "--", color = "#D45087", label = "Initial Guess")
axes[0].set_ylabel("Relative Abundnace")
axes[0].set_xlabel("# Isotopes")
axes[1].plot(x,data, color = "#003F5C")
axes[1].plot(x,bestfit, "--", color = "#F4814E", label = "Best Fit Distribution")
axes[1].set_xlabel("# Isotopes")
fig.legend(loc = "center", bbox_to_anchor = (0.5,-0.1))
fig.tight_layout()

In [None]:
# --- Generate Visualizations ---
#pD = dfproc.loc[103,"pD"] 90
indx = 90
formula = dfproc.loc[indx,"Formula"]
data = dfproc.loc[indx,"M0":"M75"]
pDi = 0.5
history = [(pDi, error(pDi, formula, data))]

def callback(t):
    history.append((t,error(t, formula, data)))
    return

def optimizeviz(pD, formula, actualdata):
    '''
    Use error function for least squared optimization of pD.
    Takes RMS error calculated by err and varies pD to minimize RMSE
    Returns optimization output
    Needs pD to be in the ballpark of the best value, otherwise behaviour can be weird
    Assumes Carbon, Hydrogen, Oxygen, Nitrogen, and actualdata are defined prior to using optimize
    '''
    results = minimize(error,
                       x0 = [pD], # This defines the initial guess
                       bounds = [[0,1]],
                       method = 'Nelder-Mead', # There are several different methods available. Some amount of testing can be required to find the method that works best with your specific data
                       callback = callback,
                       args=(formula, actualdata)) # These are extra arguments not used by minimize and instead passed on to the error function
    param = results.x
    return(results)

optimizeviz(pDi, formula, data)

fit = natcor_deudist(formula, 5.711e-01)
history

x = range(25,55)
fig, axes = plt.subplots(1,4,figsize = (12,3.04), sharey = True)
for ax, title, i in zip(axes, [f'Iteration {i}' for i in range(0,7,2)], range(0,7,2)):
  ax.plot(x,data[25:55], color = "#003F5C")
  ax.plot(x,natcor_deudist(formula, history[i][0])[25:55], "--", color = "#D45087")
  ax.set_xlabel("# Isotopes")
  ax.set_title(title)
axes[0].set_ylabel("Relative Abundance")
fig.legend(["Observed Distribution", "Iterator Fit"], loc = "center", bbox_to_anchor = (0.5,-0.09))
fig.tight_layout()


##4 Multiprocessing

Most modern computers that we are likely to do any amount of bioinformatics on have a processor with more than one core, meaning they can process more than one instruction at a time. Python is not typically capable of taking advantage of this ability because it is constrained to run only one instruction at a time. This constriant has the advantage of making python code less prone to bugs and far easier to write, but sometimes we need the speed that parallelism provides. Python provides the multiprocessing package that provides a suite of tools for creating, running and communicating with multiple processes concurrently. A process is the in-memory representation of your code, associated data, and the instructions being run by the CPU.  

There is a distinction to be made between logical and physical CPU cores. A physical core is a self-contained computational unit capable of carrying out all the tasks we expect of a CPU. Having multiple physical cores always means that you can do multiple computations at once. Some physical cores are capable of doing two computations at a time if, and only if, those two tasks are different, e.g. adding two numbers while bit shifting another. This capacity is represented by the OS as "logical cores". If you're doing many different tasks having two logical cores is almost as good as having two physical cores but if there is significant overlap in tasks there's much less of an advantage. Google colab has only a single physical core with two logical cores. This is not a good processor for demonstrating the advantages of multiprocessing so be aware that you can expect far more dramatic performance improvements on other systems than what you see here.

In the ideal case, splitting up a task across N cores will run in 1/N the time that the task took to run serially. However, this is never achieved in practice because the parallelization process itself requires extra work to carry out. In python specifically there is a lot of extra work to be done because the entire interpreter (the program that reads and runs your code) needs to be duplicated for each process.

Additionally, in order to communicate between processes, e.g. to pass inputs to or results from a paralellized function, the data need to be serialized into a string of bytes then unpacked at the other end. This process can be quite slow for very large variables like dataframes or arrays. On Linux (and google colab runs on Linux) there is a workaround for this. The default way for a new process to be created on Linux is for the child process to share its parent memory until one of the processes modifies a chunk of that memory, at which point only that chunk is copied into a process-specific memory pool. This means that any variables available in the global scope when a child process is created remains available to that child process. On other operating systems this behavior is simulated by re-running the script from the top in each child process which can have significant performance costs if you aren't careful.

For the following examples we will be doing the same task as shown in §3.1.1.

**TLDR:** Multiprocessing can dramatically reduce the run-speed of many programs. The catch is that passing large inputs to functions is quite slow.



###4.1 multiprocessing.Pool
Python's built-in multiprocessing package has many tools for paralell processing that allow an advanced user to set up complex archetectures. We, however, want something simple and the Pool module provides that. Pool will set up a pool of worker processes then, with a function and an iterable of inputs, will distribute the inputs across worker processes to be evaluated in parallel.
We care about two methods that Pool provides: .map(func, jobs) works for functions with a single input, and .starmap(func, jobs) works for functions with multiple inputs where jobs = [(input1a, input1b, ...), ..., (inputNa, inputNb, ...)]

####4.1.1 Example: Data serialization comparison

Here we will compare serial execution against parallel execution. We will also look at just passing the dataframe index to the function and allowing the function to extract the necessary data itself. If your funciton requres a large input this kind of strategy can get around having to communicate those data betewen processes. The amount of data we're passing here is not particularly large so we don't really see any speed improvement but this is just an illustrative example of a strategy that could be important in other contexts.

On my desktop computer parallel execution took ~9 seconds making it roughly 4 times faster than serial execution.

In [None]:
#We redefine optimize() from §3.2 to make the inputs easier to manage.
#Otherwise, this works the same as above.
#we run the same data analysis as §3.2 but we paralellize the processing
def optimize(row, niso):
    pD, formula = row[:2]
    actualdata = row[2:]
    results = minimize(error,
                       x0 = pD,
                       bounds = [[0,1]],
                       method = 'Nelder-Mead',
                       args=(formula, actualdata))
    param = results.x
    dist = natcor_deudist(formula, param[0])
    dist = np.concatenate((dist,np.zeros(niso - len(dist))))
    return (param[0], results.fun, results.message, *dist)

@time_this
def serial_row(lipid_data):
  lipids = lipid_data.copy()
  start_cols = list(lipids.columns)
  input_cols = ['pD', 'Formula', *lipids.columns[7:]]
  maxiso = int(str(lipids.iloc[:,-1].name)[1:])
  jobs = zip(*[lipids[c] for c in input_cols])
  results = [optimize(j, maxiso) for j in jobs]
  results = pd.DataFrame(np.array(results)).T
  lipids = pd.concat((lipids,results))
  new_cols = ["BestFit pD",
              "RMSE",
              "func termination",
              *[f'isotope_{i}' for i in range(results.shape[1] - 3)]]
  lipids.columns = start_cols + new_cols
  return lipids

#this function passes the input data to optimize() directly which means it must
#be serialized to send it between processes
@time_this
def mp_row(lipid_data):
  lipids = lipid_data.copy()
  start_cols = list(lipids.columns)
  input_cols = ['pD', 'Formula', *lipids.columns[7:]]
  maxiso = int(str(lipids.iloc[:,-1].name)[1:])
  jobs = zip(zip(*[lipids[c] for c in input_cols]), [maxiso]*lipids.shape[0])

  if __name__ == '__main__':
    with Pool() as p:
      results = p.starmap(optimize, jobs)
  results = pd.DataFrame(np.array(results)).T
  lipids = pd.concat((lipids,results))
  new_cols = ["BestFit pD",
              "RMSE",
              "func termination",
              *[f'isotope_{i}' for i in range(results.shape[1] - 3)]]
  lipids.columns = start_cols + new_cols
  return lipids

#this is the intermediary function that is called by the index-only
#parallelization code. All it does is grab the necessary data from our list of
#inputs and then call optimize on that input. This runs in the worker process.
def idx_opt(i):
  row = jobs[i]
  return optimize(row,maxiso)

#Here is the code for the index only parallelization. It works the same as the
#previous parallelized code but it first sets up the list of inputs as a global
#variable that can be accessed by the worker processses once they are created.
#We are on linux so this process is 'free' but on windows and mac the jobs list
#would have to be re-created in each worker process, increasing overhead.
@time_this
def mp_idx(lipid_data):
  lipids = lipid_data.copy()
  start_cols = list(lipids.columns)
  input_cols = ['pD', 'Formula', *lipids.columns[7:]]
  global maxiso
  maxiso = int(str(lipids.iloc[:,-1].name)[1:])
  global jobs
  jobs = list(zip(*[lipids[c] for c in input_cols]))

  if __name__ == '__main__':
    with Pool() as p:
      results = p.map(idx_opt, range(len(jobs)))
  results = pd.DataFrame(np.array(results)).T
  lipids = pd.concat((lipids,results))
  new_cols = ["BestFit pD",
              "RMSE",
              "func termination",
              *[f'isotope_{i}' for i in range(results.shape[1] - 3)]]
  lipids.columns = start_cols + new_cols
  return lipids

In [None]:
lipids = pd.read_csv('DL_Example_3-25-24.csv')
ser_time, ser_results = serial_row(lipids)
row_time, row_results = mp_row(lipids)
idx_time, idx_results = mp_idx(lipids)
row_ratio = ser_time/row_time
idx_ratio = ser_time/idx_time
print(f'Parallelization with the full function input was {row_ratio} faster than serial execution')
print(f'Parallelization with index only input was {idx_ratio} faster than serial execution')

####4.1.2 Example: Error handling
One downside of multiprocess.Pool is that if a fatal error is encountered in a function call only the process that encountered the error will termninate. This means that all the other jobs are run by the remaining worker processes before the error is communincated to the main process and your script finally stops. Debugging then becomes slow and painful. What follows is a code snippit that will skip function evaluation for all jobs if one worker encounters and error. This trick comes from [here.](https://superfastpython.com/multiprocessing-pool-stop-all-tasks/)

In [None]:
#Don't bother running this cell. It is only included to make it easy to
#copy-paste it into your own code
from multiprocessing import Pool
from multiprocessing import Manager
#this is a built-in package for printing out more informative error messages
import traceback

#the function we wish to parallelize
def task(input):
  #event is a shared object that can be seen by all child processes.
  #if .is_set() is True that means there's been an error so function evaluation
  #is skipped
  if event.is_set():
    return
  #the try: ... except: block is a technique for gracefully handling errors.
  #first the code under try: is run and if an exception is raised the code under
  #except: is run.
  try:
    #your code here
    return
  except:
    #we print the error message to the console
    traceback.print_exc()
    #and we set the event so that .is_set() returns True and all remaining
    #function evaluations are skipped
    event.set()

if __name__ == '__main__':
  jobs = [] #a list of inputs to task()
  #the manager takes care of communicating the state of event between processes
  with Manager() as manager:
    shared_event = manager.Event()

    #this function will be called first when starting a child process it makes
    #event visible as a global variable within that process
    def init_worker(shared_event):
      global event
      event = shared_event

    #Pool() sets up the pool of worker processes. The first function executed by
    #these workers is init_worker(shared_event). The use of a context mananger
    #i.e. with ...: allows Pool to terminate workers and clean up shared
    #resources when finished.
    with Pool(initializer=init_worker, initargs=(shared_event,)) as p:
      results = p.map(task, jobs)

In [None]:
#this works the same as mp_idx() but will raise the generic exception for only
#the first input
def fail_optimize(i):
  row = jobs[i]
  pD, formula = row[:2]
  actualdata = row[2:]
  results = minimize(error,
                      x0 = pD,
                      bounds = [[0,1]],
                      method = 'Nelder-Mead',
                      args=(formula, actualdata))
  param = results.x
  dist = natcor_deudist(formula, param[0])
  dist = np.concatenate((dist,np.zeros(maxiso - len(dist))))
  results = (param[0], results.fun, results.message, *dist)
  if i == 0:
    raise Exception()
  return results

#runs fail_optimize() on a single job but has the code to skip evaluation if a
#process has failed
def safe_task(input):
  if event.is_set():
    return
  try:
    return fail_optimize(input)
  except:
    traceback.print_exc()
    event.set()
    return None


#this runs fail_optimize() on all lipids but implements the code in the previous
#cell to skip evaluation if a worker process encounters an exception
@time_this
def run_safe(lipid_data):
  if __name__ == '__main__':
    lipids = lipid_data.copy()
    start_cols = list(lipids.columns)
    input_cols = ['pD', 'Formula', *lipids.columns[7:]]
    global maxiso
    maxiso = int(str(lipids.iloc[:,-1].name)[1:])
    global jobs
    jobs = list(zip(*[lipids[c] for c in input_cols]))
    with Manager() as manager:
      shared_event = manager.Event()

      def init_worker(shared_event):
        global event
        event = shared_event

      with Pool(initializer=init_worker, initargs=(shared_event,)) as p:
        return p.map(safe_task, range(len(jobs)))

#does the same as safe_task() but will not skip evaluation
def unsafe_task(input):
    return fail_optimize(input)

#does the same as run_safe() but will not skip evaluation
@time_this
def run_unsafe(lipid_data):
  if __name__ == '__main__':
    lipids = lipid_data.copy()
    start_cols = list(lipids.columns)
    input_cols = ['pD', 'Formula', *lipids.columns[7:]]
    global maxiso
    maxiso = int(str(lipids.iloc[:,-1].name)[1:])
    global jobs
    jobs = list(zip(*[lipids[c] for c in input_cols]))
    with Pool() as p:
      return p.map(unsafe_task, range(len(jobs)))

In [None]:
lipids = pd.read_csv('DL_Example_3-25-24.csv')
safe_time, safe_results = run_safe(lipids)
unsafe_time, unsafe_results = run_unsafe(lipids)
ratio = unsafe_time/safe_time
print(f'The error was detected {ratio} times faster with the safe execution trick.')

The error messages look different between the two outputs because the safe_task() function handles the exception and does not pass it along to Pool(). The extra information show in run_unsafe() is just showing that Pool() recieved an exception and failed to continue.

####4.1.3 Example: Multiprocesspandas

The multiprocesspandas package implements a drop-in replacement for the pandas .apply() called .apply_parallel() which allows you to easily apply a function to either rows or columns of a data frame in parallel. Parallelization reqires some amount of overhead so for simple functions that are fast to compute this overhead may result in the total runtime being longer than taking a serial approach. In these cases overhead can be minimized by trying different values for the n_chunks parameter, although the default should be ideal in most circumstances.

Internally multiprocesspandas usess multiprocessing.pool so the considerations outlined in §4 apply here as well. Which, in light of colab only having 2 logical cores is why we see very little improvement from parallelization in this situation.

On my desktop computer parallel execution took ~9 seconds making it roughly 4 times faster than serial execution.

In [None]:
#This block implements .apply_parallel from multiprocesspandas
#note that the import statement is "from multiprocesspandas import applyparallel"
#which adds the .apply_parallel() method to pandas dataframes
@time_this
def parallel_fit(lipid_data):
    lipids = lipid_data.copy()
    start_cols = list(lipids.columns)
    input_cols = ['pD', 'Formula', *lipids.columns[7:]]
    maxiso = int(str(lipids.iloc[:,-1].name)[1:])
    results = lipids[input_cols].apply_parallel(optimize, niso = maxiso)
    results = pd.DataFrame(np.array(results)).T
    lipids = pd.concat((lipids,results))
    new_cols = ["BestFit pD",
                "RMSE",
                "func termination",
                *[f'isotope_{i}' for i in range(results.shape[1] - 3)]]
    lipids.columns = start_cols + new_cols
    return lipids

#.apply_parallel() is a drop-in replacement for the pandas method .apply() so we
#implement the serial .apply() here for comparison
@time_this
def serial_fit(lipid_data):
    lipids = lipid_data.copy()
    start_cols = list(lipids.columns)
    input_cols = ['pD', 'Formula', *lipids.columns[7:]]
    maxiso = int(str(lipids.iloc[:,-1].name)[1:])
    results = lipids[input_cols].apply(optimize, niso = maxiso, axis = 1)
    results = pd.DataFrame(np.array(results)).T
    lipids = pd.concat((lipids,results))
    new_cols = ["BestFit pD",
                "RMSE",
                "func termination",
                *[f'isotope_{i}' for i in range(results.shape[1] - 3)]]
    lipids.columns = start_cols + new_cols
    return lipids

In [None]:
lipids = pd.read_csv('DL_Example_3-25-24.csv')

parallel_time, parallel = parallel_fit(lipids)
serial_time, serial = serial_fit(lipids)
ratio = serial_time/parallel_time
print(f'Parallel execution was {ratio} times faster than serial execution')