# Introduction to Parallelization in GRASS GIS
The goal of parallelization is to speed up computation by using multiple cores. This notebooks introduces parallelization concepts, existing parallelized tools, and approaches to parallelizing user scripts.

<div class="alert alert-info">Throughout the workshop, we will be using shell syntax (Bash shell specifically) and Python. Some things are faster to write in Bash, and some in Python. By default, a cell in this notebook will interpret the code as Python, unless we use <code>%%bash</code> magic to let the notebook know a particular cell contains Bash syntax.</div>

First, let's download and unzip the [prepared dataset](https://doi.org/10.5281/zenodo.8206463) by executing the cell below:

In [None]:
%%bash
sh download_dataset.sh

Let's start GRASS to run examples:

In [None]:
import os
import subprocess
import sys

# Ask GRASS GIS where its Python packages are.
sys.path.append(
    subprocess.check_output(["grass", "--config", "python_path"], text=True).strip()
)

# Import GRASS packages
import grass.script as gs
import grass.jupyter as gj

# Start GRASS Session
session = gj.init("opengeohub_2023/part_1");

The examples will run in *opengeohub_2023* project (previously called location) that contains sample data in the *PERMANENT* mapset (subproject). To keep things organized, this part of the workshop will run in a currently empty mapset *part_1*:

<img src="data_structure_embed.svg" alt="GRASS data structure" width="400"/>

### Measuring time
It's useful to be able to measure how much time a particular computation takes. We will be using `time` command in shell and `%%timeit` [IPython magic](https://ipython.readthedocs.io/en/stable/interactive/magics.html) for Python cells.

In [None]:
%%bash
time sleep 1

Command time gives you 3 different numbers, but we are usually interested in the first one (real time), which is the real-life time it takes for a process to run from start to finish. The other two measure CPU time and you may see the "user" time can be larger than the "real" time for parallel tools.

In [None]:
%%timeit  -n10 -r3
import time

def function(x):
    time.sleep(x)

function(0.1)

%%timeit magic will return elapsed time executing the Python cell. It typically runs the code multiple times to get a more accurate estimate. You can modify the number of loops (-n) and numer of repeated runs (-r).

##### *Try it yourself*
Using bash or Python, construct your own timed cell. You can use a sleep function or your own simple function (i.e. Hello World, 2+2). 

In [None]:
# Try it here

### Running GRASS tools in Bash and Python
GRASS tools can be executed from Bash and Python (using the [grass.script package](https://grass.osgeo.org/grass-stable/manuals/libpython/script_intro.html) which is part of [GRASS GIS Python API](https://grass.osgeo.org/grass-stable/manuals/libpython/index.html). In the following example, you can print min and max values of a raster with [r.info](https://grass.osgeo.org/grass-stable/manuals/r.info.html) using the following Bash and Python syntax:

In [None]:
%%bash
r.info map=DEM -r

In [None]:
gs.run_command("r.info", map="DEM", flags="r")

## Using parallelized tools in GRASS GIS

There are many tools in GRASS GIS that are already parallelized ([see the list](https://grass.osgeo.org/grass-stable/manuals/keywords.html#parallel)). Many tools in GRASS Addons are parallelized as well.

Generally, there are two types of implementation in GRASS GIS.
Multithreading in C tools:
   * Threads have low overhead, so they can be spawned more efficiently.
   * Tools use OpenMP API. One of the advantages of OpenMP for software distribution is that code works (compiles and runs in serial) also without OpenMP library present on the system.
   * Memory is shared, so programmer needs to be cautious about race conditions (e.g., writing into the same variable).
   
Multiprocessing in Python tools:
   * There are multiple ways to implement it, typically tools use `subprocess` and `multiprocessing` package.
   * Python tools are often wrappers around GRASS tools implemented in C. For example, tool [r.sun.daily](https://grass.osgeo.org/grass-stable/manuals/addons/r.sun.daily.html) runs [r.sun](https://grass.osgeo.org/grass-stable/manuals/r.sun.html) for multiple days in parallel.
   
Parallelized tools have `nprocs` parameter to specify number of cores to use. For C tools using OpenMP, GRASS GIS needs to be compiled with OpenMP support to take advantage of it. Both implementations work well on a single machine, but can't be scaled to a distributed system. Scaling to a distributed system is covered at the end of this tutorial.


<div class="alert alert-info">The following examples assume an active GRASS session, meaning GRASS is started (or initialized in the notebook) in a specific project and mapset and tools are executed interactively. Towards the end of this notebook, we will cover running tools non-interactively (sometimes called batch mode).</div>

### Example
Let's run our first GRASS tool using multiple cores. Tool [r.neighbors](https://grass.osgeo.org/grass-stable/manuals/r.neighbors.html) computes moving window analysis, in this case we will smooth a digital elevation model. r.neighbors use OpenMP for parallelization.

In [None]:
%%bash
time r.neighbors --q input=DEM output=DEM_smooth size=15 method=average nprocs=1
time r.neighbors --q input=DEM output=DEM_smooth size=15 method=average nprocs=2

The speedup (ratio of serial time to parallelized time with N cores) typically does not increase linearly with  the number of cores and parallel efficiency (speedup / N cores) decreases when adding cores. See, for example, the [benchmarks for r.neighbors](https://grass.osgeo.org/grass-stable/manuals/r.neighbors.html#performance). This behavior is due to the serial parts of the code (see [Amdahl's law](https://en.wikipedia.org/wiki/Amdahl%27s_law)) and computation overhead.

##### *Try it yourself*
What's the speedup for the r.neighbors command above with 2 cores? The efficiency?

In [None]:
# Try it here

Let's use GRASS benchmarking library and matplotlib to look at this behavior in more detail. We will compare the time and parallel efficiency for different number of cores and different neighborhood window size of 3 and 9 cells.

In [None]:
import grass.benchmark as bm
from grass.pygrass.modules import Module
from subprocess import DEVNULL

results = []
module = Module(
        "r.neighbors",
        input="DEM",
        output="benchmark",
        size=3,
        run_=False,
        stdout_=DEVNULL,
        overwrite=True,
    )
results.append(bm.benchmark_nprocs(module, label="size 3", max_nprocs=4, repeat=1))
module.inputs.size = 15
results.append(bm.benchmark_nprocs(module, label="size 15", max_nprocs=4, repeat=1))


In [None]:
%matplotlib inline
import matplotlib.pyplot as plt
import numpy as np
plt.figure(figsize=(10,5))
plt.subplot(1, 3, 1)
plt.plot(results[0].nprocs, results[0].times, "-o", label="size_3")
plt.plot(results[1].nprocs, results[1].times, "-^", label="size_15")
plt.ylim(bottom=0)
plt.xlabel("Number of cores")
plt.ylabel("Time [s]")
plt.legend()
# plt.tight_layout()

plt.subplot(1, 3, 2)
plt.plot(results[0].nprocs, np.array(results[0].times[0]) / results[0].times, "-o", label="size_3")
plt.plot(results[1].nprocs, np.array(results[1].times[0]) / results[1].times, "-o", label="size_15")
# plt.plot(results[1].nprocs, results[1].efficiency, "-^", label="size_9")
# plt.ylim(bottom=0)
plt.xlabel("Number of cores")
plt.ylabel("Parallel speedup")
# plt.tight_layout()


plt.subplot(1, 3, 3)
plt.plot(results[0].nprocs, results[0].efficiency, "-o", label="size_3")
plt.plot(results[1].nprocs, results[1].efficiency, "-^", label="size_15")
plt.ylim(bottom=0)
plt.xlabel("Number of cores")
plt.ylabel("Parallel efficiency")
plt.tight_layout()

While computation with larger window size takes longer, it has better parallel efficiency, because a larger proportion of time is spent in the parallel part of the computation. As a rule of thumb, the parallel efficiency should be greater than 0.5 to not waste resources.

## Parallelization of workflows
In a geoprocessing workflow, there are often multiple independent tasks that can be executed in parallel.
The strategy how to divide the workflow into parallel tasks generally falls under either data-based or task-based parallelization.

Task-based parallelism partitions tasks so that independent tasks can be completed simultaneously.

With data-based parallelization, the spatial domain is partitioned for concurrent computations of individual spatial units 
and once processed, spatial units are mosaicked back into the initial spatial domain (if applicable).

<img src="parallelization_strategies.png" alt="data-based and task-based diagrams" width="600"/>

### Data-based parallelization
Data-based parallelism involves spatial domain decomposition, a process of splitting data into smaller datasets that can be processed in parallel.
As part of [GRASS GIS Python API](https://grass.osgeo.org/grass-stable/manuals/libpython/index.html), [GridModule](https://grass.osgeo.org/grass-stable/manuals/libpython/pygrass.modules.grid.html) decomposes input data into rectangular tiles, executes a given tool for each tile in parallel, and merges the results. Effectively, tiling is applied virtually (using computational region), determining the spatial extent of analysis for each parallel process. In some cases such as moving window analysis, tiles need to overlap to get correct results. Note that GridModule can be fairly inefficient due to the overhead from merging results back and is therefore best suited for computationally-itensive tasks such as interpolation.

The following example shows IDW interpolation split into 4 tiles. In this case, specifying an overlap is needed to get correct results without edge artifacts. Here, the number and size of tiles is automatically derived from the number of cores, but can be specified. First we create sample data for this example by taking 10,000 random sample points from the DEM.

In [None]:
%%bash
g.region res=100
r.random -z input=DEM npoints=10000 vector=samples seed=1 --q

We also set the resolution for our IDW computation and sampling to 100 using the g.region tool.

<div class="alert alert-info">Each mapset is using a <strong>computational region</strong> that determines the extent and resolution of raster-based computations.
It can be changed with <a href="https://grass.osgeo.org/grass-stable/manuals/g.region.html">g.region</a> tool.</div>

In [None]:
%%python
import time
from grass.pygrass.modules.grid import GridModule
import time
start = time.time()
grid = GridModule(
    "v.surf.idw",
    input="samples",
    output="idw",
    processes=4,
    overlap=20,
    quiet=True,
)
grid.run()
print(f"Elapsed time: {time.time() - start} s")

The following is the same tool ran in serial:

In [None]:
%%bash
v.surf.idw --q input=samples output=idw

##### *Try it yourself*
Time the serial computation above. What is the speedup?

In [None]:
# Try it here

There are tools that already integrate tiling. For example, addon [r.mapcalc.tiled](https://grass.osgeo.org/grass-stable/manuals/addons/r.mapcalc.tiled.html) uses the tiling concept for raster algebra computation. Using this is better for more complex algebra expression and large input data, otherwise the parallel efficiency of this method can be low.

In [None]:
%%bash
g.region raster=DEM
r.mapcalc.tiled expression="vertical_sobel = -DEM[-1, -1] - 2 * DEM[0, -1] - DEM[1, -1] + DEM[-1, 1] + 2 * DEM[0, 1] + DEM[1, 1]" overlap=1 nprocs=4

### Task-based parallelization
With task-based parallelism, we identify independent tasks and execute them concurrently.
Tasks are typically GRASS processing tools executed as separate processes. Processes, unlike threads, do not share memory. When tasks are limited by disk I/O, parallel processing may have large overhead.


#### Examples in Python
There are multiple ways to execute tasks in parallel using Python, for example, there are libraries `multiprocessing` and `concurrent.futures`.

In the following example viewsheds from different coordinates are computed in parallel using `multiprocessing.Pool` class.

<div class="alert alert-warning">Note that Python multiprocessing.Pool examples do not work in an interactive interpreter (such as Jupyter Notebook). That's why we will first write a Python script with %%writefile and then use %run magic to execute it.</div>

In [None]:
%%writefile example.py
import os
from multiprocessing import Pool
import grass.script as gs

def viewshed(point):
    x, y, cat = point
    gs.run_command("r.viewshed", input="DEM", output=f"viewshed_{cat}", coordinates=(x, y), maxdistance=10000)
    return f"viewshed_{cat}"

if __name__ == "__main__":
    viewpoints = [(594231, 275545, 1),
                  (659805, 259566, 2),
                  (646109, 232901, 3),
                  (602946, 203226, 4)]
    with Pool(processes=2) as pool:
        maps = pool.map(viewshed, viewpoints)
    print(maps)

In [None]:
%%bash
g.region raster=DEM

In [None]:
%run example.py

#### Examples in Bash
In the simplest case, tasks can be executed in parallel from a command line shell by running the geoprocessing tasks in the background (by appending `&`). Command `wait` forces to wait for the background processes to finish.

In [None]:
%%bash
g.region raster=DEM res=100
v.kernel --q input=samples output=kernel_10000 radius=10000 &
v.kernel --q input=samples output=kernel_15000 radius=15000 &
wait

Larger number of tasks can be scheduled to run in parallel by tools such as [GNU Parallel](https://www.gnu.org/software/parallel/) and xargs.
In this simple example, we use a loop to write commands into a file and execute those commands in parallel, using 2 cores. 
Whenever a task is finished, a next one is picked from the queued tasks.


In [None]:
%%bash
for R in 5000 10000 15000
do
    echo v.kernel --q input=samples output=kernel_${R} radius=${R} >> commands.sh
done
time parallel -j 2 < commands.sh

<div class="alert alert-info">While waiting, let's look at the processors working using <code>htop</code>. Click on the blue button New Launcher, create a terminal, and run htop.</div>

See manual pages of GNU Parallel or xargs for more advanced uses. GNU Parallel can be configured to distribute jobs across multiple machines. In that case, use `--exec` interface described below.

### Executing processes on distributed systems
To enable parallelization on distributed systems, tasks or scripts need to be executed non-interactively using the `--exec` [interface](https://grass.osgeo.org/grass-stable/manuals/grass.html) interface.
For that you need to specify Location and Mapset.


For example, here is a simple call to list all available rasters in PERMANENT mapset:


In [None]:
%%bash
grass opengeohub_2023/PERMANENT --exec g.list type=raster mapset=PERMANENT

Non-interactive tasks need to be run in separate mapsets. One of the previous examples that was running within GRASS session in a single mapset can be rewritten so that each task runs in a newly created mapset. Note that by default newly created mapsets use default computational region for that GRASS location (you can use `g.region -s` to modify it). For raster computations, you need to change the computational region for each new mapset if the default one is not desired.

In [None]:
%%bash
for R in 5000 10000 15000
do
    # first create a new mapset with -c flag and set computational region
    grass -c opengeohub_2023/mapset_${R} --exec g.region raster=DEM res=100
    # write the command executing v.kernel in the newly created mapset to a file
    echo grass opengeohub_2023/mapset_${R} --exec v.kernel --q input=samples@part_1 output=kernel_${R} radius=${R} >> exec_commands.sh
done
parallel -j 2 < exec_commands.sh

In some cases, only a temporary mapset or location is needed, see [examples](https://grass.osgeo.org/grass-stable/manuals/grass.html#batch-jobs-with-the-exec-interface).
Besides individual tools, the `--exec` interface can execute an entire script to enable more complex workflows.

## Safe execution of parallel tasks (advanced)

There are certain situations you need to avoid when running GRASS tools in parallel:

 * write output maps/files with identical names
 * modify computational region when running within the same mapset
 * modify MASK when running within the same mapset
 * modify vector attribute database within the same mapset
 * use [r.reclass](https://grass.osgeo.org/grass-stable/manuals/r.reclass.html) to reclassify from the same base map


The following examples show how to deal with some of these situations when running parallel tasks in the same mapset is desired.

### Safely modifying computational region in a single mapset

Sometimes modifying computational region in a script is needed. It is a good practice to not change the global computational region, which effectively modifies a file in a mapset,
but only change the environment variable `GRASS_REGION`.
Here, we modified the previous viewshed example to compute in parallel viewsheds with different extents:

In [None]:
%%writefile example.py
import os
from multiprocessing import Pool
import grass.script as gs

def viewshed(point):
    x, y, cat = point
    # copy current environment, modify and pass it to r.viewshed
    env = os.environ.copy()
    maxdistance = 10000
    # create region based on the viewpoint and maxdistance
    env["GRASS_REGION"] = gs.region_env(e=x + maxdistance, w=x - maxdistance, n=y + maxdistance, s=y - maxdistance, align="DEM")
    gs.run_command("r.viewshed", input="DEM", output=f"viewshed_{cat}", coordinates=(x, y), max_distance=maxdistance, env=env)
    return f"viewshed_{cat}"

if __name__ == "__main__":
    viewpoints = [(594231, 275545, 1),
                  (659805, 259566, 2),
                  (646109, 232901, 3),
                  (602946, 203226, 4)]
    with Pool(processes=2) as pool:
        maps = pool.map(viewshed, viewpoints)
    print(maps)

In [None]:
%run example.py

### Safely modifying vectors with attributes in a single mapset

By default vector maps share a single SQLite database file, however SQLite does not support concurrent write access. That poses a problem when modifying vectors with attributes in parallel. While this can be solved by running the computations in separate mapsets, it is also possible to change the default behavior to write attributes of each vector to the vector's individual SQLite file. This behavior can be activated after a new mapset is created with:

```
 db.connect driver=sqlite database='$GISDBASE/$LOCATION_NAME/$MAPSET/vector/$MAP/sqlite.db'
```

Alternatively, a PostgreSQL or another database backend can be used for attributes to offload the parallel writing to the database system.

## Exercise

Consider this (slightly simplified) example code taken from the [r.sun documentation](https://grass.osgeo.org/grass-stable/manuals/r.sun.html):
```
# slope + aspect
r.slope.aspect elevation=DEM aspect=aspect slope=slope

# calculate global irradiation for day 180
r.sun elevation=DEM aspect=aspect slope=slope glob_rad=global_rad day=180 nprocs=2
# result: output global (total) irradiation [Wh.m-2.day-1] for given day
r.univar global_rad
```

Use [r.sun](https://grass.osgeo.org/grass-stable/manuals/r.sun.html) to compute the solar radiation on the winter and summer soltices (day of year 355 and 172 respectively), the spring equinox (day 80) and your birthday (you can look it up [here](https://asd.gsfc.nasa.gov/Craig.Markwardt/doy2023.html)). Try to parallelize the workflow while considering available resources. What strategies for parallelizing have you used? 

In [None]:
# Your work here (use Bash or Python)

# Urban growth modeling in GRASS GIS: Parallel computing case study
The purpose of this notebook is to demonstrate several parallel computing principles and how they are implemented in GRASS GIS.
We use FUTURES urban growth model implemented as a GRASS GIS addon [r.futures](https://grass.osgeo.org/grass-stable/manuals/addons/r.futures.html).

This notebook uses a [prepared dataset](https://doi.org/10.5281/zenodo.6577922) you downloaded at the beginning of the workshop. This dataset is a GRASS GIS location (project) containing:
 * [NLCD 2001-2019](https://www.mrlc.gov/) (National Land Cover Database; land use and impervious surface descriptor)
 * [US county boundaries](https://www.census.gov/geographies/mapping-files/time-series/geo/tiger-line-file.html)
 * [US-PAD protected areas](https://www.usgs.gov/programs/gap-analysis-project/science/pad-us-data-overview)
 * [USGS DEM](https://www.usgs.gov/3d-elevation-program/about-3dep-products-services)
 
Population files were downloaded from [Zenodo](https://doi.org/10.5281/zenodo.6577903).
 

<div  class="alert alert-info">This notebook combines Python 3 and Bash cells. By default a code cell is in Python.
We use IPython <a href="https://ipython.readthedocs.io/en/stable/interactive/magics.html#cell-magics">cell magic</a> including `%%bash`, `%%time`, `%%timeit` and `%%writefile`.</div>

## Setting up
Import Python packages and initialize GRASS GIS session:

In [None]:
import subprocess
import sys
import pathlib
import json
import pandas as pd
from IPython.display import Image

# Ask GRASS GIS where its Python packages are.
sys.path.append(
    subprocess.check_output(["grass", "--config", "python_path"], text=True).strip()
)

# Import GRASS packages
import grass.script as gs
import grass.jupyter as gj

# Start GRASS Session
session = gj.init("opengeohub_2023/part_2")

## Data preprocessing
Before we start processing, let's take a look at what data we have and our study area. List dataset layers:

In [None]:
%%bash
g.list type=raster,vector -p

Display counties using [grass.jupyter.InteractiveMap](https://grass.osgeo.org/grass83/manuals/libpython/grass.jupyter.html):

In [None]:
m = gj.InteractiveMap()
m.add_vector(name="counties")
m.show()

Set computational region to match counties' extent and aligns with the NLCD raster. Computational region defines the extent and resolution of all raster computations.

In [None]:
gs.run_command("g.region", vector="counties", align="nlcd_2019")

Now, we will prepocess the data to derive spatial predictors used by the urban growth model in this case study. This diagram shows the workflow and highlights the parts that we will parallelize.

![FUTURES data preparation](FUTURES_case_study_data_prep_opengeohub.svg)

### DEM to slope
Compute slope with [r.slope.aspect](https://grass.osgeo.org/grass-stable/manuals/r.slope.aspect.html) which uses OpenMP for parallelization. For fun, measure the time difference between using 1 or more cores:

In [None]:
%%timeit -n2 -r3
gs.run_command(
    "r.slope.aspect", elevation="DEM", slope="slope", flags="e", nprocs=1
)

In [None]:
%%timeit -n2 -r3
gs.run_command(
    "r.slope.aspect", elevation="DEM", slope="slope", flags="e", nprocs=2
)

In [None]:
m = gj.Map()
m.d_rast(map="slope")
m.d_vect(map="counties", fill_color="none")
m.d_legend(raster="slope", range=[0, 10])
m.show()

### Derive predictors and other layers from NLCD data
Most of our predictors we will derive from NLCD data (land cover type and impervious descriptor products). See [NLCD legend](https://www.mrlc.gov/data/legends/national-land-cover-database-class-legend-and-description) for classification. With [r.reclass](https://grass.osgeo.org/grass-stable/manuals/r.reclass.html) we will create separate layers for water, wetland, forest, roads, and developed land.
Note that those rasters are virtual (they behave the same way, but are only pointing to the original NLCD raster),
so reclassification is very fast.

In [None]:
NLCD_years = [2001, 2004, 2006, 2008, 2011, 2013, 2016, 2019]
NLCD_start_end_years = [2001, 2019]
# water (1 or no data, class 11)
gs.write_command(
    "r.reclass", input="nlcd_2019", output="water", rules="-", stdin="11 = 1"
)
# binary wetlands (classes 90 and 95)
gs.write_command(
    "r.reclass",
    input="nlcd_2019",
    output="wetlands",
    rules="-",
    stdin="90 95 = 1 \n * = 0",
)
# developed land (classes 21 to 24)
for year in NLCD_years:
    gs.write_command(
        "r.reclass",
        input=f"nlcd_{year}",
        output=f"urban_{year}",
        rules="-",
        stdin="21 22 23 24 = 1\n* = 0",
    )
for year in NLCD_start_end_years:
    # forest classes 41 to 43
    gs.write_command(
        "r.reclass",
        input=f"nlcd_{year}",
        output=f"forest_{year}",
        rules="-",
        stdin="41 42 43 = 1",
    )
    # roads classes 20-23 in the descriptor layer
    gs.write_command(
        "r.reclass",
        input=f"nlcd_descriptor_{year}",
        output=f"roads_{year}",
        rules="-",
        stdin="20 21 22 23 = 1",
    )
    # urban without roads (classes 24-26 in the descriptor layer)
    gs.write_command(
        "r.reclass",
        input=f"nlcd_descriptor_{year}",
        output=f"urban_no_roads_{year}",
        rules="-",
        stdin="24 25 26 = 1\n* = 0",
    )

Next, we will compute **distance to water, forest, and roads** (from different years). One of the simplest way to compute these independent tasks in parallel is to run them in the background using Bash by appending &.
Command `wait` forces to wait for the background processes to finish. Adding time in front of wait will report the elapsed time.
Once the distance is computed, we will use raster algebra to transform it logarithmically.

In [None]:
%%bash
r.grow.distance input=water distance=dist_to_water -m --q &
r.grow.distance input=forest_2001 distance=dist_to_forest_2001 -m --q &
r.grow.distance input=forest_2019 distance=dist_to_forest_2019 -m --q &
r.grow.distance input=roads_2001 distance=dist_to_roads_2001 -m --q &
r.grow.distance input=roads_2019 distance=dist_to_roads_2019 -m --q &
time wait
r.mapcalc "log_dist_to_water = log(dist_to_water + 1)" --q &
r.mapcalc "log_dist_to_forest_2001 = log(dist_to_forest_2001 + 1)" --q &
r.mapcalc "log_dist_to_forest_2019 = log(dist_to_forest_2019 + 1)" --q &
r.mapcalc "log_dist_to_roads_2001 = log(dist_to_roads_2001 + 1)" --q &
r.mapcalc "log_dist_to_roads_2019 = log(dist_to_roads_2019 + 1)" --q &
time wait

#### *Try it yourself*

For comparison, compute the run time for a single r.grow.distance call (i.e. copy and time a single command from above):

In [None]:
# Try it here

In [None]:
m = gj.Map()
m.d_rast(map="log_dist_to_water")
m.d_vect(map="counties", fill_color="none")
m.show()

As another predictor, we compute **wetland density** (percentage of wetland in 1 km squared circular neighborhood). Module [r.neighbors](https://grass.osgeo.org/grass-stable/manuals/r.neighbors.html) for moving window analysis is internally parallelized using OpenMP, so we can use `nprocs` option:

In [None]:
%%timeit -n1 -r1
gs.run_command(
    "r.neighbors",
    input="wetlands",
    output="wetland_density",
    size=37,
    method="average",
    flags="c",
    nprocs=4,
)

In [None]:
m = gj.Map()
m.d_rast(map="wetland_density")
m.d_vect(map="counties", fill_color="none")
m.show()

FUTURES uses a special predictor called development pressure, which can be computed with [r.futures.devpressure](https://grass.osgeo.org/grass-stable/manuals/addons/r.futures.devpressure.html), which is internally parallelized.
Since we need to compute it for 2 years, if we have enough cores, we can use a hybrid approach which runs both commands as background process and each of them runs in parallel.
To do that we split the number of available processes so that each r.futures.devpressure process gets half of the available processes:

In [None]:
%%bash
r.futures.devpressure input=urban_no_roads_2001 output=devpressure_2001 size=15 gamma=0.5 nprocs=2 scaling_factor=0.1 &
r.futures.devpressure input=urban_no_roads_2019 output=devpressure_2019 size=15 gamma=0.5 nprocs=2 scaling_factor=0.1 &
time wait

In [None]:
m = gj.Map()
m.d_rast(map="devpressure_2001")
m.show()

### Mask
Compute mask to avoid urban growth simulation in water, protected areas, and outside the study area.
Here we demonstrate data-based parallelization using  [r.mapcalc.tiled](https://grass.osgeo.org/grass-stable/manuals/addons/r.mapcalc.tiled.html). Note that with our relatively small dataset, this approach is going to actually slow down the computation in comparison to simple r.mapcalc call due to overhead from patching the results together. However, for larger dataset, we would be able to see speedup.

In [None]:
%%timeit -n1 -r1
gs.run_command(
    "r.mapcalc.tiled",
    expression="masking = if((isnull(protected) &&  isnull(water) && nlcd_2019), 1, null())",
    nprocs=4,
)

In [None]:
%%timeit -n1 -r1
gs.mapcalc("masking = if((isnull(protected) &&  isnull(water) && nlcd_2019), 1, null())")

In [None]:
m = gj.Map()
m.d_rast(map="masking")
m.show()

## FUTURES simulation
FUTURES simulation has several components to compute how much land is going to be developed, where, and with what size of patches:
![FUTURES diagram](FUTURES.svg)

To keep this workshop on schedule, in this part we will demonstrate the computation of **land demand** using [r.futures.demand](https://grass.osgeo.org/grass-stable/manuals/addons/r.futures.demand.html), then we skip a few steps and proceed with the patch growing simulation. If interested, you can find the skipped steps in `FUTURES_potential_and_calibration.ipynb`.



### Land demand
Here we compute how much land will be developed in each step of the simulation.
Logarithmic curves are fit to per-capita land consumption data derived from NLCD time series and observed population for each county. 

Here we wanted to demonstrate a parallelization approach that splits spatial data into spatial units that are then processed in parallel, and that can be run in a distributed way (on an HPC). Technically, each spatial unit will be processed in a separate GRASS mapset. Distributing of the tasks is done here with GNU Parallel, but that would depend on the specific HPC setup.
While US states would be more appropriate spatial units for this approach (given their size), in this small case study we will apply this approach to counties.

![FUTURES simulation_with highlighted parts of the workflow](FUTURES_case_study_simulation_nosketch_opengeohub.svg)

First, we split and rasterize counties for further parallelization steps:

In [None]:
# list of all county ids
county_ids = gs.read_command("v.db.select", map="counties", column="FIPS", format="csv", flags="c").splitlines()
print(county_ids)
# use temporary region
gs.use_temp_region()
for fips in county_ids:
    gs.run_command(
        "v.extract",
        input="counties",
        where=f"FIPS == '{fips}'",
        output=f"county_{fips}",
    )
    gs.run_command("g.region", vector=f"county_{fips}", align="nlcd_2019")
    gs.run_command(
        "v.to.rast",
        input=f"county_{fips}",
        output=f"county_{fips}",
        use="attr",
        attribute_column="FIPS",
    )
gs.del_temp_region()
gs.run_command("g.remove", pattern="county_*", type="vector", flags="f")

In [None]:
m = gj.Map(use_region=True)
m.d_rast(map="county_37183")
m.d_vect(map="counties", attribute_column="FIPS", fill_color="none", label_color="black", xref="center")
m.show()

Then, we create a Python script that takes a county id as an input parameter,
sets the computational region to the county extent, excludes roads from the computation, and runs [r.futures.demand](https://grass.osgeo.org/grass-stable/manuals/addons/r.futures.demand.html),
creating an output CSV needed as input for r.futures.simulation and a plot for visual check.

In [None]:
%%writefile demand_for_county.py
import sys
import grass.script as gs

# input parameter: state FIPS code
fips = sys.argv[1]

gs.run_command("g.mapsets", mapset="part_2", operation="add")
gs.run_command("g.region", raster=f"county_{fips}")
gs.mapcalc("MASK = if (isnull(roads_2019), 1, null())")
gs.run_command(
    "r.futures.demand",
    subregions=f"county_{fips}",
    development=[
        f"urban_{year}" for year in [2001, 2004, 2006, 2008, 2011, 2013, 2016, 2019]
    ],
    observed_population="observed_population_SE_counties_2001-2019.csv",
    projected_population="projected_population_SE_counties_2020-2100_SSP2.csv",
    simulation_times=list(range(2019, 2051)),
    method="logarithmic",
    demand=f"demand_{fips}.csv",
    plot=f"demand_{fips}.png",
    overwrite=True,
)

For each county we will use Bash scripting to generate grass `--exec` command calling the script within a temporary mapset and append each line to a file `demand_jobs.sh`.
Then, we will run these commands in parallel with GNU Parallel.

<div class="alert alert-info">See <a src="https://www.freecodecamp.org/news/bash-scripting-tutorial-linux-shell-script-and-command-line-for-beginners/">Bash scripting basics</a> 
or other sources to understand bash commands, for loops, or command substitution.</div>

In [None]:
%%bash
# remove the job file (for repeated executions of this cell)
rm -f demand_jobs.sh
# for each of the county ids
for S in $(v.db.select map="counties" column="FIPS" format="csv" -c)
do
    # write "grass ..." command into demand_jobs.sh
    # we will use temporary mapset, no need to store it afterwards
    echo grass --tmp-mapset opengeohub_2023 --exec python demand_for_county.py ${S} >> demand_jobs.sh
done
# print
cat demand_jobs.sh
# specify number of parallel jobs, reroute error output
parallel -j 2 < demand_jobs.sh 2> log.txt

In [None]:
Image("demand_37183.png")

<div class="alert alert-info">At this point, we skip several steps of FUTURES modeling workflow to keep this short and relevant. We will use precomputed files included in the repository. If you are interested in the skipped workflow, see FUTURES_potential_and_calibration.ipynb.</div>

### Patch Growing Algorithm
With [r.futures.simulation](https://grass.osgeo.org/grass-stable/manuals/addons/r.futures.simulation.html) we will run the simulation from 2019 until 2050. r.futures.simulation is stochastic, so with different random seeds, we will get somewhat different result. We will use the same parallelization approach as with r.futures.demand.

First, create a Python script with county id and random seed as an input. The script sets computational region to match the county, turns on the mask, and runs the simulation.


In [None]:
%%writefile simulation_for_county.py
import sys
import grass.script as gs

fips, seed = sys.argv[1:3]

# g.mapsets for accessing maps from part_2 mapset
gs.run_command("g.mapsets", mapset="part_2", operation="add")
gs.run_command("g.region", raster=f"county_{fips}")
gs.run_command("r.mask", raster="masking")
gs.run_command(
    "r.futures.simulation",
    developed="urban_2019",
    development_pressure="devpressure_2019",
    compactness_mean=0.4,
    compactness_range=0.1,
    discount_factor=0.5,
    predictors=[
        "log_dist_to_forest_2019",
        "log_dist_to_roads_2019",
        "log_dist_to_water",
        "slope",
        "wetland_density",
    ],
    n_dev_neighbourhood=15,
    devpot_params="best_model.csv",
    num_neighbors=4,
    seed_search="probability",
    development_pressure_approach="gravity",
    gamma=0.5,
    scaling_factor=0.1,
    subregions=f"county_{fips}",
    demand=f"demand_{fips}.csv",
    num_steps=31,
    output=f"out_county_{fips}_seed_{seed}",
    patch_sizes=f"patch_sizes/patch_sizes_{fips}.csv",
    incentive_power=2,
    random_seed=seed,
)
gs.run_command("r.mask", flags="r")


Similarly, we create a list of commands executing this Python script with a given county and seeds (from 1 to 10).
We will create a new mapset for each command. There will be *number of counties* * *number of random seeds* commands.

In [None]:
%%bash
rm -f pga_jobs.sh
for SEED in {1..10}
do
    for COUNTY in $(v.db.select map="counties" column="FIPS" format="csv" -c)
    do
        # to start fresh delete the mapset in case it exists from repeated runs.
        rm -rf opengeohub_2023/pga_${COUNTY}_${SEED}
        echo grass -c opengeohub_2023/pga_${COUNTY}_${SEED} --exec python simulation_for_county.py ${COUNTY} ${SEED} >> pga_jobs.sh
    done
done
cat pga_jobs.sh
parallel -j 4 < pga_jobs.sh 2> log.txt

Afterwards, we patch the results from all the counties together. Tool [r.patch](https://grass.osgeo.org/grass-stable/manuals/r.patch.html) is internally parallelized, so we can use that extra speed up if we have available cores. For each seed we will get the list of layers and patch them. This simple loop can be easily parallelized in Python.

<div class="alert alert-warning">Note that Python multiprocessing.Pool examples do not work in an interactive interpreter (such as Jupyter Notebook). That's why we use %run magic to execute Python code as a script.</div>

In [None]:
%%writefile patch.py

import tqdm
from multiprocessing import Pool
import grass.script as gs


def patch(seed):
    maps = gs.read_command("g.list", type="raster", pattern=f"out_county_*_seed_{seed}",
                           mapset="*", flags="m", separator="comma").strip()
    gs.run_command("r.patch", input=maps, output=f"out_seed_{seed}")

if __name__ == "__main__":
    with Pool(processes=2) as pool:
        # a simple way to run it:
        # pool.map(patch, list(range(1, 11)))
        # more complex, but gives you progress bar:
        r = list(tqdm.tqdm(pool.imap(patch, list(range(1, 11))), total=10))

In [None]:
%run patch.py

Set the color ramp of all merged simulation outputs for visualization:

In [None]:
gs.run_command(
    "r.colors",
    map="out_seed_1",
    raster="out_county_37183_seed_1@pga_37183_1",
)
m = gj.InteractiveMap()
m.add_raster(name="out_seed_1", opacity=0.8)
m.show()

## FUTURES results postprocessing
In this last section we will derive some useful outputs from the stochastic simulation runs.

### Future Development Probability
By aggregating the stochastic runs, we can compute the projected development probability. First we reclassify output
to binary developed/undeveloped results. Then we run [r.series](https://grass.osgeo.org/grass-stable/manuals/r.series.html) in parallel to compute how many times a cell was developed
and then divide that by number of runs.

In [None]:
%%writefile reclass.txt
-1 0 = 0
1 thru 100 = 1
* = 0

In [None]:
for seed in range(1, 11):
    gs.run_command(
        "r.reclass",
        input=f"out_seed_{seed}",
        output=f"out_seed_{seed}_binary",
        rules="reclass.txt",
    )
gs.run_command(
    "r.series",
    input=[f"out_seed_{seed}_binary" for seed in range(1, 11)],
    output="probability",
    method="sum",
    weights=[0.1] * 10,
    nprocs=2,
)

In [None]:
# zoom in to see more details
gs.run_command(
    "g.region",
    raster="county_37183",
    save="zoomin",
)
gs.run_command("r.colors", map="urban_2019", color="grey")
m = gj.Map(saved_region="zoomin", width=700)
m.d_background(color="grey")
m.d_rast(map="probability", values="0.2-1")
m.d_rast(map="urban_2019", values=1)
m.show()

### Forest loss analysis
Here we compute future forest loss due to development, demonstrating how to parallelize more complex tasks that require different computational region using Python multiprocessing.
The goal is to compute forest loss for each of 30x30 km tiles in parallel across the landscape to capture large scale forest loss patterns.
To do that we use `GRASS_REGION` environmental variable and set different region for each seed and tile combination.

This Python script includes a function `forest_loss_stats` that for a given tile derives a forest loss layer and counts the number of cells. The function takes as input tile extent and seed, and returns a Python dictionary with tile coordinates, seed, and number of forest cells. Once completed for all tiles, results are written to a JSON file.

The `__main__` function creates the tiles and executes the function for each tile and seed in parallel.

In [None]:
%%writefile forest_loss.py
import os
import sys
import json
import tqdm
from math import ceil
from multiprocessing import Pool

import grass.script as gs
from grass.exceptions import CalledModuleError


def forest_loss_stats(params):
    """Compute projected forest loss for selected area.
    This function can be safely run in parallel."""
    seed, region = params
    # create unique temporary map names
    forest_loss = f"forest_{seed}_{region['n']}_{region['e']}"
    # copy and modify environment to change region based on input
    env = os.environ.copy()
    env["GRASS_REGION"] = gs.region_env(align="forest_2019", **region)
    # pass the environment, so that the computations run with different region than the current one
    # compute lost forest comparing to 2019
    gs.mapcalc(
        f"{forest_loss} = if (forest_2019 && out_seed_{seed} >= 0, 1, 0)",
        env=env,
        quiet=True,
    )
    # get number of cells for 0 (no change) and 1 (forest lost) category
    results = {"0": 0, "1": 0}
    results.update(dict(
        gs.parse_command(
            "r.stats",
            input=forest_loss,
            flags="an",
            parse=(gs.parse_key_val, {"sep": " ", "val_type": float}),
            env=env,
            quiet=True,
        )
    )
                  )
    # add N, E as a center of the region, and add seed to results
    results["n"] = (region["n"] + region["s"]) / 2
    results["e"] = (region["e"] + region["w"]) / 2
    results["seed"] = seed
    # remove temporary maps
    gs.run_command("g.remove", type="raster", name=forest_loss, flags="f", quiet=True)
    # return dictionary with results
    return results


if __name__ == "__main__":
    current = gs.region()
    regions = []
    tile = 30000  # meter^2
    # create a region where each cell is a tile
    gs.run_command("g.region", res=tile, flags="a", save="tiles")
    env = os.environ.copy()
    env["GRASS_REGION"] = gs.region_env(region="tiles")
    grid_region = gs.region(env=env)
    # save extents of each tile
    for row in range(int(grid_region["rows"])):
        for col in range(int(grid_region["cols"])):
            s = float(grid_region["s"]) + row * float(grid_region["nsres"])
            n = float(grid_region["s"]) + (row + 1) * float(grid_region["nsres"])
            w = float(grid_region["w"]) + col * float(grid_region["ewres"])
            e = float(grid_region["w"]) + (col + 1) * float(grid_region["ewres"])
            regions.append(
                {
                    "n": n,
                    "s": s,
                    "w": w,
                    "e": e,
                }
            )

    params = []
    # collect parameters (for each seed and tile)
    for seed in range(1, 11):
        for region in regions:
            params.append((seed, region))

    # execute forest loss computation for each tile and seed in parallel
    results = []
    with Pool(processes=1) as pool:
        with tqdm.tqdm(total=len(params)) as pbar:
            for result in pool.imap(forest_loss_stats, params):
                results.append(result)
                pbar.update()
        # write results as json to file
        with open("forest_results.json", "w") as f:
            json.dump(results, f)


Execute the file:

In [None]:
%run forest_loss.py

Load results, process them with Pandas and write a raster:

In [None]:
with open("forest_results.json", "r") as f:
    results = json.load(f)

df = pd.DataFrame(results)
df = df[df["1"].notna()].groupby(["n", "e"])[["1"]].mean()
df["km"] = df["1"] / 1000000

csv = df.drop(columns=["1"]).to_csv(index=True, header=False)
env = os.environ.copy()
env["GRASS_REGION"] = gs.region_env(region="tiles")
gs.write_command(
    "r.in.xyz",
    stdin=csv,
    input="-",
    x=2,
    y=1,
    output="forest_loss",
    method="mean",
    separator="comma",
    env=env,
)
gs.run_command("r.colors", map="forest_loss", color="grass", flags="n")

Visualize:

In [None]:
m = gj.Map()
m.d_rast(map="forest_loss")
m.d_vect(map="counties", fill_color="none", color="grey")
m.d_legend(raster="forest_loss", title="Loss of forest [km sq]")
m.show()