# How to run pygrass `Modules` in parallel

It is possible to run `Modules` object in parallel. There are more ways, in this documents there are two of them.

* using `subprocess` module
* using `ParallelModuleQueue` pygrass modules

## subprocess

`subprocess` is the Python module to spawn new processes, connect to their input/output/error pipes, and obtain their return codes.

In [None]:
from multiprocessing import Queue, Process
import grass.script as grass

First it is required to create a function with the analysis needed, in this case create a buffer around the input vector map.

In [None]:
def calculate(inp, dist):
    # function to create a buffer
    out = "{ip}_buffer_{di}".format(ip=inp, di=dist)
    grass.run_command("v.buffer", input=inp, output=out, distance=dist, overwrite=True, quiet=True)
    return out

In [None]:
def spawn(func):
    def funct(q_in, q_out):
        while True:
            vec, dis = q_in.get()
            if vec is None:
                break
            q_out.put(func(vec, dis))
    return funct

Now create the needed input

In [None]:
# two queue object, one for input and output
q_in = Queue(1)
q_out = Queue()
# number of process to create
nproc = 3
# input vector and distances
invect = "schools"
inbuffer = range(0,220,20)

In [None]:
Queue?

Create a list of process according to `nproc` variable, the target is the `spawn` function with `calculate` as variable for `spawn`

In [None]:
procs = [Process(target=spawn(calculate), args=(q_in, q_out)) for _ in range(nproc)]

In [None]:
Process?

For each process set it as daemon and start the process

In [None]:
for proc in procs:
    proc.daemon = True
    proc.start()

Set the inputs (vector map and distance)

In [None]:
ans = [q_in.put((invect, i)) for i in inbuffer if i > 0]

Set `None` to each processes to terminate them

In [None]:
[q_in.put((None, None)) for proc in procs]

At this point check if the processes run correctly and if the maps are created

In [None]:
[proc.join() for proc in procs]
processed = [q_out.get() for _ in ans]
if len(processed) != len(inbuffer) - 1:
    print("An error occurs")
print(grass.read_command('g.list', type="vector", pattern="schools_buffer*", mapset='.'))

## ParallelModuleQueue

The `ParallelModuleQueue` is designed to run an arbitrary number of pygrass Module or MultiModule processes in parallel. 

In [None]:
from grass.pygrass.modules import Module, ParallelModuleQueue

In [None]:
ParallelModuleQueue?

Initialize the `ParallelModuleQueue` object

In [None]:
queue = ParallelModuleQueue(nprocs=nproc)

Put the `Modules` objects inside the `queue` and wait until the finish

In [None]:
val = 0
while val < 200:
    queue.put(Module("r.mapcalc",
                     expression="elevation_{va}=if(elevation@PERMANENT>{va} && elevation@PERMANENT<={va2}, elevation@PERMANENT, null())".format(va=val, va2=val+50),
                     overwrite=True, run_=False, finish_=False)
             )
    val += 50
queue.wait()

Check how the processes terminate

In [None]:
print("Number of running process ".format(queue.get_num_run_procs()))
print("Number of max number of process ".format(queue.get_max_num_procs()))

In [None]:
for mapcalc in queue.get_finished_modules():
    print(mapcalc.popen.returncode)

# Speed test

Please try a speed test comparing the `ParallelModuleQueue` and the `MultiModule` modules 

In [None]:
from grass.pygrass.modules import MultiModule

In [None]:
%%timeit -n20
listmapcalc = []
val = 0
while val < 200:
    listmapcalc.append(Module("r.mapcalc",
                       expression="elevation_{va}=if(elevation@PERMANENT>{va} && elevation@PERMANENT<={va2}, elevation@PERMANENT, null())".format(va=val, va2=val+50),
                       overwrite=True, run_=False, finish_=False)
    )
    val += 50
mm = MultiModule(module_list=listmapcalc, sync=False)
pro = mm.run()
mapcalc_list = mm.wait()

In [None]:
%%timeit -n20
queue = ParallelModuleQueue(nprocs=nproc)
val = 0
while val < 200:
    queue.put(Module("r.mapcalc",
                     expression="elevation_{va}=if(elevation@PERMANENT>{va} && elevation@PERMANENT<={va2}, elevation@PERMANENT, null())".format(va=val, va2=val+50),
                     overwrite=True, run_=False, finish_=False)
             )
    val += 50
queue.wait()

# Summary

We have seen

* how to run GRASS modules using the standard library `subprocess` module
* how to use `ParallelModuleQueue` pygrass module

# Exercise

Create a procedure to extract the summits (use `r.geomorphon`) over 150 m. Convert the summits raster map in a vector point map and run the viewshed (`r.viewshed`) for each point using `ParallelModuleQueue` module