# Parallel Processing Basics
Developing this as a followup to getting started.   Shows how to run a parallel workflow.  Builds on data saved in getting started.

In [1]:
from mspasspy.db.database import Database
import mspasspy.client as msc
dbclient=msc.Client()
db = dbclient.get_database('getting_started')

In [None]:
from mspasspy.algorithms.window import WindowData
from mspasspy.algorithms.basic import ator
from mspasspy.ccore.algorithms.basic import TimeWindow
from mspasspy.db.normalize import (normalize,
                                   ObjectIdMatcher,
                                )
from mspasspy.algorithms.window import WindowData
from obspy.geodetics import gps2dist_azimuth,kilometers2degrees
from obspy.taup import TauPyModel
from obspy import UTCDateTime
import time

### Parallel Workflow with dask distributed
We next introduce an important variant of the above when running a job on a large cluster with multiple nodes.  The method above is appropriate for testing a workflow on a desktop before running the job on a bigger scale data set on a large cluster with multiple nodes.   In that case the authors of dask recommend the use of the newer scheduler they call [dask distributed](https://distributed.dask.org/en/stable/). Besides better performance in a cluster dask distributed adds the capability of monitoring a job in real time and profiling a job through the dask [diagnostic monitor](https://distributed.dask.org/en/stable/diagnosing-performance.html).   The next block enables this capability with this notebook. 

In [None]:
from dask.distributed import Client
scheduler_client=Client()
scheduler_client

The status page for this notebook is now available to you an can be accessed via port 8787.   It is because of that requirement that you may have had to restart this container with the `-p 8787:8787` incantation.   Without that port mapping you would not be able to connect to the diagnostics page.   We note that in our experience using the hyperlink above will no work either.   You instead will probably need to use the link via the default localhost of `127.0.0.1:8787/status`.  You might be able to click on [this link](http://127.0.0.1:8787/status).   If that doesn't work resort to a cut and paste of the above url. 

Now that we have dask diagnostics running let's run a variation of the above workflow that will allow you to watch dask work.  Note this workflow differs from the above in three ways:
1.  It doesn't repeat the initializations.  Note that was done here only because of the structure of this notebook and would not be normal.
2.  We use a different approach to launch the computations.  We link the bag (`bg`) to the dask distributed scheduler we just created.   Without that line the diagnostics monitor will not display.
3.  We intentionally commented out the line to save the data.  We did that to allow you to run this next box repeatedly and not produce duplicate data.  

As item 3 says run the next code box and watch the real time display.   Experiment with the different menu options as described in the dask documentation link above.   When the job completes you might also want to look at the profiling output.   We won't dwell on the details of dask diagnostics, but refer you to documentation.  The main point here is that those tools can be useful to improve performance on a workflow you need to run on a large amount of data.

In [None]:
def shift_by_Ptime(d):
    if d.live:
        if d.is_defined('Ptime'):
            tshift = d['Ptime']
            d = ator(d,tshift)
        else:
            d.kill()
            d.elog.log_error("shift_by_Ptime",
                             "Required key Ptime is was not defined",
                             ErrorSeverity.Invalid)
    return d

In [None]:
ttmodel = TauPyModel(model="iasp91")
station_normalizer = MiniseedMatcher(db)
# we loaded data with this offset relative to origin time
# TODO:  This matcher is currently broken. Workaround below
t0offset=763.0 - 4.0*60.0  
source_normalizer = OriginTimeMatcher(db,
                                      t0offset=t0offset,
                                      source_time_key='time',
                                     )
resampler=ScipyResampler(20.0)
decimator=ScipyDecimator(20.0)
stime=-100.0
etime=600.0    
cursor = db.wf_miniseed.find({})  # this is a loop over the full dataset
t0 = time.time()
dataset = read_distributed_data(db,
                                collection='wf_TimeSeries',
                                normalize=[station_normalizer],
                               )
#dataset = dataset.map(load_source_data,db)
#dataset = dataset.map(detrend,type="constant")
#dataset = dataset.map(resample,decimator,resampler)
#dataset = dataset.map(set_Ptime,model=ttmodel)
#dataset = dataset.map(shift_by_Ptime)
#dataset = dataset.map(WindowData,stime,etime)
dataset = dataset.map(terminator)
result=dataset.compute()
t=time.time()
nlive=0
ndead=0
#for x in result:
 #   if x:
  #      nlive += 1
   # else:
    #    ndead += 1
print("Total processing time=",t-t0)
print("Number of live data saved=",nlive)
print("number of data killed=",ndead)

Compare the above with the earlier serial job.  The for loop command and read_data line are replaced by the following line:
```
bg = read_distributed_data(db,cursor,normalize=['source'])
```
The `read_distributed_data` function creates a container called a dask "bag".  A convenient way to view a bag is list of things that doesn't need to fit in memory.   The "things", in our case, are mspass TimeSeries objects.   The `read_distributed_data` line is followed by a series of lines that in python jargon apply the "map method" of the "bag" object/container. The concept of a "map" operator is one of the two keywords in the modern concept of the "map-reduce" model of big data science.  You can find many web pages and turorials discussing map-reduce in general and map-reduce for dask in particular.   For now, we emphasize that arg0 of the map method is a function name.  Each call to map applies a named function to data that it assumes emits another datum that is always the same type.   All the processing functions in the loop above use that model.  For example, the `resample` function takes an input TimeSeries of any sample rate and returns a resampled representation of that datum at 20 sps.  

With that background, note the workflow runs a sequence of algorithms through the map method driven by the same function names as above in the same order. For example, consider this line in the serial job that runs the normalize function we used earlier:
```
d = normalize(d,station_matcher)
```
The comparable operator above is this:
```
bg = bg.map(normalize,station_matcher)
```
The key point we want to make here is that it is straightforward to convert any loop like the serial job to a parallel version using dask.  There are three deviation:
1.  The call to the ator function required us to use a python `lambda` function.  That is often a useful trick to handle variable arguments padded through header values.   If you are unfamiliar with lambda function there are numerous articles on this topic on the web.
2. We added a call to `db.save_data` so we an work on these data further befow.   
3.  We use a terminator lambda function to return only the value of the boolean "live" attribute.    

A feature of dask potentially confusing to newcomers is all the calls the the bag "map method" are "lazy".   What that means is nothing is actually computed until we call the bag's "compute method".   A simple way to understand the call to compute is that it converts a bag to a python list and returns the result.  We store that list here as `res`.  This entire data set may not fit in your local machine.  That is why we used the last lambda function. It reduces the bag to a list of booleans that are unlikely to cause a memory problem. 

In [None]:
cursor=db.wf_TimeSeries.find(query)
t0 = time.time()
bg = read_distributed_data(db,cursor,normalize=['source'])
bg = bg.map(normalize,station_matcher)
bg = bg.map(detrend,type="constant")
bg = bg.map(resample,decimator,resampler)
bg = bg.map(set_Ptime,model=ttmodel)
bg = bg.map(lambda d : ator(d,d["Ptime"]))
bg = bg.map(WindowData,stime,etime)
bg = bg.map(lambda d : d.live)
#bg = bg.map(db.save_data,data_tag="Pwave_windowed_data")
scheduler_client.persist(bg)
res=bg.compute()
t=time.time()    
print("Parallel job processing time with dask distributed=",t-t0)
print("Time per waveform=",(t-t0)/n)
nlive=0
n=len(res)
for x in res:
    if x:
        nlive += 1
print("Processing completed ",nlive," of ",n," waveforms handled")