# Urban growth modeling in GRASS GIS: Parallel computing case study
The purpose of this notebook is to demonstrate several parallel computing pronciples and how they are implemented in GRASS GIS.
We use FUTURES urban growth model implemented as a GRASS GIS addon.

This notebook requires prepared dataset available here. This dataset is a GRASS GIS Location containing:
 * NLCD 2001-2019
 * US county boundaries
 * US-PAD protected areas
 * USGS DEM

The required software includes
 * _GRASS GIS v8.2_ with the following addons: _r.futures, r.mapcalc.tiled, r.sample.category_
 * _R_ with the following packages: _lme4, optparse, MuMIn, snow_
 * _GNU Parallel_

The expected computing resources for the notebook as intended are 8 cores and 32GB memory. You can change those values as needed.

In [None]:
nprocs = 8
memory = 32

This notebook combines Python 3 and Bash cells. By default a code cell is in Python. We use IPython [cell magick](https://ipython.readthedocs.io/en/stable/interactive/magics.html#cell-magics) including `%%bash`, `%%time`, and `%%writefile`.

## Setting up
Change the current directory to wherever you unzipped the data:

In [None]:
import os

os.chdir("/data/FUTURES/")

Import Python packages and initialize GRASS GIS session:

In [None]:
import subprocess
import sys
import pathlib
import json
import pandas as pd
from IPython.display import Image

# Ask GRASS GIS where its Python packages are.
sys.path.append(
    subprocess.check_output(["grass", "--config", "python_path"], text=True).strip()
)

# Import GRASS packages
import grass.script as gs
import grass.jupyter as gj

# Start GRASS Session
session = gj.init(".", "FUTURES", "PERMANENT")

Install GRASS addons:

In [None]:
gs.run_command("g.extension", extension="r.futures")
gs.run_command("g.extension", extension="r.mapcalc.tiled")
gs.run_command("g.extension", extension="r.sample.category")

## Data preprocessing

List dataset layers:

In [None]:
%%bash
g.list type=raster,vector -p

### Process county boundaries
Extract states in South-East US (Alabama, Florida, Georgia, North carolina, South Carolina, Tennessee)
and convert

In [None]:
%%bash
v.extract tl_2021_us_county output=SE_counties where="STATEFP in ('01', '12', '13', '37', '45', '47')" --q
v.db.addcolumn SE_counties column="state integer" --q
v.db.addcolumn SE_counties column="county integer" --q
v.db.update SE_counties col=state qcol="CAST(STATEFP AS integer)" --q
v.db.update SE_counties col=county qcol="CAST(GEOID AS integer)" --q

In [None]:
m = gj.GrassRenderer()
m.d_vect(map="SE_counties")
m.show()

Split and rasterize states for further parallelization steps:

In [None]:
states = [1, 12, 13, 37, 45, 47]
gs.use_temp_region()
for state in states:
    gs.run_command("v.extract", input="SE_counties", where=f"state == '{state}'", output=f"state_{state}")
    gs.run_command("g.region", vector=f"state_{state}", align="nlcd_2019")
    gs.run_command("v.to.rast", input=f"state_{state}", output=f"state_{state}", use="attr", attribute_column="county")
gs.del_temp_region()

### DEM to slope
Compute slope with r.slope.aspect which uses OpenMP for parallelization:

In [None]:
gs.run_command("r.slope.aspect", elevation="DEM", slope="slope", flags="e", nprocs=nprocs)

In [None]:
m = gj.GrassRenderer()
m.d_rast(map="slope")
m.show()

### Protected land
Rasterize protected areas to later include them in a mask. We use GridModule to split the computation in tiles:

In [None]:
%%python
from grass.pygrass.modules.grid import GridModule

grid = GridModule("v.to.rast", input="protected", output="protected", type="area", use="val",
                  processes=nprocs, patch_backend="r.patch")
grid.run()

In [None]:
m = gj.GrassRenderer()
m.d_rast(map="protected")
m.show()

### Process NLCD data
Most of our predictors we will derive from NLCD data (land cover type and impervious descriptor products). With r.reclass we create water, forest, roads, urban rasters.
Note that those rasters are virtual (they behave the same way, but are only pointing to the original NLCD raster),
so reclassification is very fast.

In [None]:
NLCD_years = [2001, 2004, 2006, 2008, 2011, 2013, 2016, 2019]
NLCD_descriptor_years = [2001, 2019]
gs.write_command("r.reclass", input="nlcd_2019", output="water", rules="-", stdin="11 = 1")
gs.write_command("r.reclass", input="nlcd_2019", output="forest", rules="-", stdin="41 42 43 = 1")
gs.write_command("r.reclass", input="nlcd_descriptor_2019", output="roads", rules="-", stdin="20 21 22 23 = 1")
for year in NLCD_years:
    gs.write_command("r.reclass", input=f"nlcd_{year}", output=f"urban_{year}", rules="-", stdin="21 22 23 24 = 1\n* = 0")
for year in NLCD_descriptor_years:
    gs.write_command("r.reclass", input=f"nlcd_descriptor_{year}", output=f"urban_no_roads_{year}", rules="-", stdin="24 25 26 = 1\n* = 0")

In Bash use background processing (append &) to compute distance to water, forest, and roads in parallel since these are independent computations. Command wait forces to wait for the background processes to finish.
Once the distance is computed, we use raster algebra to transform it logarithmically.

In [None]:
%%bash
r.grow.distance input=water distance=dist_to_water -m &
r.grow.distance input=forest distance=dist_to_forest -m &
r.grow.distance input=roads distance=dist_to_roads -m &
wait
r.mapcalc "log_dist_to_water = log(dist_to_water + 1)" &
r.mapcalc "log_dist_to_forest = log(dist_to_forest + 1)" &
r.mapcalc "log_dist_to_roads = log(dist_to_roads + 1)" &
wait

In [None]:
m = gj.GrassRenderer()
m.d_rast(map="log_dist_to_forest")
m.show()

FUTURES uses a special predictor called development pressure, which can be computed with r.futures.devpressure, which is internally parallelized.
Since we need to compute it for 2 years, we use a hybrid approach which runs both command as background process and each of them runs in parallel.
To do that we split the number of available processes so that each r.futures.devpressure process gets half of the available processes:

In [None]:
%%bash -s "{int(nprocs/2)}"
r.futures.devpressure input=urban_no_roads_2001 output=devpressure_2001 size=20 gamma=1 nprocs=$1 &
r.futures.devpressure input=urban_no_roads_2019 output=devpressure_2019 size=20 gamma=1 nprocs=$1 &
wait

In [None]:
m = gj.GrassRenderer()
m.d_rast(map="devpressure_2001")
m.show()

### Mask
Compute mask to avoid urban growth simulation in water, protected areas, and outside the study area. We use r.mapcalc.tiled:

In [None]:
gs.run_command("r.mapcalc.tiled", expression="masking = if((isnull(protected) &&  isnull(water) && nlcd_2019), 1, null())", nprocs=nprocs)

In [None]:
m = gj.GrassRenderer()
m.d_rast(map="masking")
m.show()

## Demand computation
Here we compute how much land will be developed in each step of the simulation.
Logarithmic curves are fit to per-capita land consumption data derived from NLCD time series and observed population for each county.

This computation is parallelized by state. First we create a Python script that takes the state as an input parameter,
sets the computational region to the state extent, excludes roads from the computation, and runs r.futures.demand,
creating an output CSV and a plot specific to that state.

In [None]:
%%writefile demand_for_state.py
import sys
import grass.script as gs

# input parameter: state FIPS code
state = sys.argv[1]

gs.run_command("g.region", raster=f"state_{state}")
gs.mapcalc("MASK = if (isnull(roads), 1, null())")
gs.run_command("r.futures.demand", subregions=f"state_{state}",
               development=[f"urban_{year}" for year in [2001, 2004, 2006, 2008, 2011, 2013, 2016, 2019]],
               observed_population="observed_population_SE_counties_NLCD_2001-2019.csv",
               projected_population="Hauer_2020_2100_SE_counties_SSP2_projections_demand.csv",
               simulation_times=list(range(2019, 2101)), method="logarithmic",
               demand=f"demand_{state}.csv", plot=f"demand_{state}.png", overwrite=True)

For each state generate grass command calling the script within a temporary mapset and append the the line to a file `demand_jobs.sh`.
Run these commands in parallel with GNU Parallel.

In [None]:
%%bash -s "{nprocs}"
rm -f demand_jobs.sh
for S in 1 12 13 37 45 47
do
    echo grass --tmp-mapset FUTURES --exec python demand_for_state.py ${S} >> demand_jobs.sh
done
cat demand_jobs.sh
parallel -j ${1} < demand_jobs.sh 2> log.txt

Visualize results for one state:

In [None]:
Image("demand_37.png")

## Development Potential
### Predictor Sampling
First we need a layer representing newly developed areas between 2001 and 2019 (value 1) and areas that didn't transition:

In [None]:
gs.run_command("r.mapcalc.tiled", expression="urban_change = if(urban_2001 == 0, if(urban_2019 == 1, 1, 0), null())", nprocs=nprocs)

Similarly to Demand computation, we create a script that is then executed with different states.
Since we need to patch the results together, we won't use temporary mapset.
Setting the mask ensures area outiside of it won't be sampled.

In [None]:
%%writefile sampling_for_state.py
import sys
import grass.script as gs

state = sys.argv[1]

gs.run_command("g.region", raster=f"state_{state}")
gs.run_command("r.mask", raster=f"state_{state}")
# create an identical, virtual map of counties with different name to later simplify patching of attribute tables
gs.write_command("r.reclass", input=f"state_{state}", output="counties", rules="-", stdin="* = *")
gs.run_command("r.sample.category", input="urban_change", output=f"sample_{state}",
               sampled=["counties", "slope", "devpressure_2001", "log_dist_to_forest", "log_dist_to_water", "log_dist_to_roads"],
               npoints=[10000, 5000],
               random_seed=1)
gs.run_command("r.mask", flags="r")

In [None]:
%%bash -s "{nprocs}"
rm -f sampling_jobs.sh
for S in 1 12 13 37 45 47
do
    # remove mapset to start fresh after previous run
    rm -rf FUTURES/sampling_${S}
    echo grass -c FUTURES/sampling_${S} --exec python sampling_for_state.py ${S} >> sampling_jobs.sh
done
parallel -j ${1} < sampling_jobs.sh 2> log.txt

Here we patch the results:

In [None]:
gs.run_command("v.patch", input=[f"sample_{state}@sampling_{state}" for state in states], output="samples", flags="e")

Zoom in to see samples in newly developed and undeveloped areas:

In [None]:
gs.run_command("g.region", n=1496445, s=1473765, e=1373865, w=1346175, save="zoomin")
m = gj.GrassRenderer(use_region="zoomin")
m.d_rast(map="urban_change")
m.d_vect(map="samples_test", size=10, fill_color="red", icon="basic/pin")
m.show()

### Potential
Runs r.futures.potential to select the "best" model and compute regression coefficients.
The best model selection runs in parallel (parallelized in R).

In [None]:
gs.run_command("r.futures.potential", input="samples", output="best_model.csv",
               columns=["slope", "devpressure_2001", "log_dist_to_forest", "log_dist_to_water", "log_dist_to_roads"],
               developed_column="urban_change",
               subregions_column="counties",
               random_column="devpressure_2001",
               min_variables=3,
               nprocs=nprocs, flags="d",
               dredge_output="all_models.csv")

In [None]:
pd.read_csv("all_models.csv", index_col=0)

In [None]:
pd.read_csv("best_model.csv", index_col=0)

### Calibration
Here we derive the distribution of the historical patch sizes per county. r.futures.calib is internally parallelized, however
with 8 cores, we can't use the internal parallelization because we are also parallelizing on the level of states.
If you have more cores, use `nprocs=` with more cores than 1.

In [None]:
%%writefile calibration_for_state.py
import sys
import grass.script as gs

state = sys.argv[1]

gs.run_command("g.region", raster=f"state_{state}")
gs.run_command("r.futures.calib", flags="sl",
               development_start="urban_2001", development_end="urban_2019",
               subregions=f"state_{state}", patch_threshold=1800,
               patch_sizes=f"patch_sizes_{state}.csv", nprocs=1)

In [None]:
%%bash -s "{nprocs}"
rm -f calibration_jobs.sh
for S in 1 12 13 37 45 47
do
    echo grass --tmp-mapset FUTURES/ --exec python calibration_for_state.py ${S} >> calibration_jobs.sh
done
parallel -j ${1} < calibration_jobs.sh 2> log.txt

### Patch Growing Algorithm
Putting all the intermediate results together, we can now finally run the simulation from 2019 until 2050.

We restrict the memory consumption to 10 GB and we will run 3 simulations in parallel. r.futures.pga
will be executed many times (number of stochastic runs * number of states = 10 * 6 = 60).
If more memory is available, more cores can be used.

In [None]:
%%writefile simulation_for_state.py
import sys
import grass.script as gs

state, seed = sys.argv[1:3]

gs.run_command("g.region", raster=f"state_{state}")
gs.run_command("r.mask", raster="masking")
gs.run_command("r.futures.pga", developed="urban_2019", development_pressure="devpressure_2019",
               compactness_mean=0.3, compactness_range=0.1, discount_factor=0.5,
               predictors=["log_dist_to_forest", "log_dist_to_roads", "log_dist_to_water", "slope"],
               n_dev_neighbourhood=20, devpot_params="best_model.csv", num_neighbors=4, seed_search="probability",
               development_pressure_approach="gravity", gamma=1, scaling_factor=1,
               subregions=f"state_{state}", demand=f"demand_{state}.csv", num_steps=31,
               output=f"out_state_{state}_seed_{seed}", patch_sizes=f"patch_sizes_{state}.csv", memory=10, random_seed=seed)
gs.run_command("r.mask", flags="r")

In [None]:
%%bash
rm -f pga_jobs.sh
for SEED in {1..10}
do
    for STATE in 1 12 13 37 45 47
    do
        rm -rf FUTURES/pga_${STATE}_${SEED}
        echo grass -c FUTURES/pga_${STATE}_${SEED} --exec python simulation_for_state.py ${STATE} ${SEED} >> pga_jobs.sh
    done
done
time parallel -j 3 < pga_jobs.sh 2> log.txt

Afterwards, we patch the results together. Tool r.patch is internally parallelized, so we can use that extra speed up is we have available cores.

In [None]:
%%bash -s "{nprocs}"
rm -f patch_jobs.sh
for SEED in {1..10}
do
    MAPS=$(grass --tmp-mapset FUTURES/ --exec \
           g.list type=raster pattern="out_state_*_seed_${SEED}" mapset="*" -m separator=comma)
    rm -rf FUTURES/results_${SEED}
    echo grass -c FUTURES/results_${SEED} --exec r.patch input=${MAPS} output="out_seed_${SEED}" nprocs=1 >> patch_jobs.sh
done
parallel -j ${1} < patch_jobs.sh 2> log.txt

In [None]:
m = gj.GrassRenderer()
m.d_rast(map="out_seed_1@results_1")
m.show()

### Future Development Probability
By aggregating the stochastic runs we can compute the projected development probability. First we reclassify output
to binary developed/undeveloped results. Then we run r.series in parallel to compute how many times a cell was developed
and then divide that by number of runs.

In [None]:
%%writefile reclass.txt
-1 0 = 0
1 thru 100 = 1
* = 0

In [None]:
for seed in range(1, 11):
    gs.run_command("r.reclass", input=f"out_seed_{seed}@results_{seed}", output=f"out_seed_{seed}", rules="reclass.txt")
gs.run_command("r.series", input=[f"out_seed_{seed}" for seed in range(1, 11)], output="sum", method="sum", nprocs=nprocs)
gs.run_command("r.mapcalc.tiled", expression="probability = float(sum) / 10", width=width, height=height, nprocs=nprocs)

In [None]:
m = gj.GrassRenderer()
m.d_rast(map="probability")
m.show()

## Validation


In [None]:
%%writefile reclass.txt
-1 = 0
0 thru 100 = 1
* = 0

In [None]:
gs.write_command("r.reclass", input=f"nlcd_2019", output="urban_2019", rules="-", stdin="21 22 23 24 = 1\n* = 0")
for seed in range(1, 11):
    gs.run_command("r.reclass", input=f"out_seed_{seed}@results_{seed}", output=f"out_seed_{seed}", rules="reclass.txt")

In [None]:
import os
import sys
import json
from multiprocessing import Pool


def compute(params):
    state, seed = params
    env = os.environ.copy()
    env["GRASS_REGION"] = gs.region_env(raster=f"state_{state}")
    results = gs.read_command(
        "r.futures.validation", reference="urban_2019", simulated=f"out_seed_{seed}", format="json", env=env, quiet=True,
    )
    results = json.loads(results)
    results["state"] = state
    return results

params = []
for seed in range(1, 11):
    for state in states:
        params.append((state, seed))

with Pool(processes=nprocs) as pool:
    results = pool.map_async(compute, params).get()

pd.DataFrame(results).groupby("state")["total_allocation", "total_quantity", "kappa"].mean()

In [None]:
df = pd.read_csv("best_model.csv", index_col=0)
df.loc[37183]

## Forest fragmentation analysis

In [None]:
from math import ceil
from multiprocessing import Pool
from grass.exceptions import CalledModuleError


def forest_fragmentation(seed):
    forest = f'forest_2050_{seed}'
    fragmentation = f'fragment_{seed}'
    # forest in 2050
    try:
        gs.mapcalc(f"{forest} = if(isnull(forest), 0, if(isnull(out_seed_{seed}), 1, if (out_seed_{seed} == -1, 1, 0)))")
        gs.run_command('r.forestfrag', flags='a', input=forest, output=fragmentation, window=25)
        gs.run_command('g.remove', type='raster', name=[forest], flags='f')
        return True
    except CalledModuleError:
        return False

    
with Pool(processes=nprocs) as pool:
    success = pool.map_async(forest_fragmentation, list(range(1, 11))).get()
print(success)

In [None]:
def fragmentation_stats(params):
    env = os.environ.copy()
    seed, region = params
    env["GRASS_REGION"] = gs.region_env(**region)
    results = gscript.parse_command('r.stats', input=f'fragment_{seed}', flags='cn', parse=(gs.parse_key_val, {'sep': ' ', 'val_type': int}), env=env)
    results["n"] = (region["n"] + region["s"]) / 2
    results["e"] = (region["e"] + region["w"]) / 2
    results["seed"] = seed
    return 


region = gs.region()
regions = []
grid = 1000
row = col = 0
for row in range(ceil(region["rows"] / grid) + 1):
    for col in range(ceil(region["cols"] / grid) + 1):
        s = region["s"] + row * grid * region["nsres"]
        n = region["s"] + (row + 1) * grid * region["nsres"]
        w = region["w"] + col * grid * region["ewres"]
        e = region["w"] + (col + 1) * grid * region["ewres"]
        regions.append(
            {
                "n": n,
                "s": s,
                "w": w,
                "e": e,
                "nsres": region["nsres"]),
                "ewres": region["ewres"]),
            }
        )

params = []
for seed in range(1, 11):
    for region in regions:
        params.append((seed, region))

with Pool(processes=nprocs) as pool:
    results = pool.map_async(fragmentation_stats, params).get()
print(results)