## Parallel Computation Lab

(40 points)

This lab will expand upon the in-class activity in the Parallel Computation lab. Process is a method that executes a targeted function such as greeting with a set of arguments. While a developer has full control over the parallel execution of their Process (or Processes), they have to individually manage each one using start() and join() procedures. That can get tedious quickly.

So we learned about Map (and MapReduce in lecture), which allows a function to be applied a list of elements in parallel. The benefit is that we don't have to manage the parallel execution, we only have to setup a function to be mapped to a list of things. The downside is that we lose the customization that we could have gotten in Process. So there is a tradeoff. However, Map has a lot of uses in GIS when applied to a list of raster cells or vector features or files (depending on the level of parallelism, which is also called the granularity of parallelism).

In this lab, we will experiment with incorporating parallelism at different levels of a geospatial problem using map and reduce tasks. You will explore the performance benefits or detriments of adding or subtracting parallelism using a generic case study. These codes can then be used for a variety of GIS problems later in your career if a need for parallelism ever comes up.

Using the template Jupyter Notebook examine the performance trade-offs of parallelism. I would encourage everyone to try running these experiments on notebooks.msi.umn.edu. However, you are free to run them on your own computer/laptop/environment. Just please document where you were running it, because it could influence the interpretation of the results.


### Name: Matt Braaksma
### Email: braak014@umn.edu

### Where are you running this notebook?
My personal computer.

In [40]:
import multiprocessing
cpu_count = multiprocessing.cpu_count()
print("Number of cores on this machine", cpu_count)

Number of cores on this machine 12


In [41]:
import psutil

# Get CPU information
cpu_info = psutil.cpu_times()
cpu_percent = psutil.cpu_percent()

print("CPU Information:")
print(f"User time: {cpu_info.user}")
print(f"System time: {cpu_info.system}")
print(f"Idle time: {cpu_info.idle}")
print(f"CPU Usage: {cpu_percent}%")

# Get memory information
memory_info = psutil.virtual_memory()

print("\nMemory Information:")
print(f"Total memory: {memory_info.total / (1024 ** 3):.2f} GB")
print(f"Available memory: {memory_info.available / (1024 ** 3):.2f} GB")
print(f"Memory usage: {memory_info.percent}%")

CPU Information:
User time: 130044.66
System time: 66989.96
Idle time: 1802627.64
CPU Usage: 23.7%

Memory Information:
Total memory: 16.00 GB
Available memory: 3.47 GB
Memory usage: 78.3%


### Setup Code
Please execute the following code cell to create 25 rasters that are 5,000 x 5,000 cells each.

In [42]:
# What if we had a bunch of raster tiles?
# Let's make some!!
nfiles = 8
nrows = 2000 
ncols = 2000

def maketiles(nfiles,nrows,ncols):
   for f_i in range(nfiles):
        f = open("tmp_raster"+str(f_i)+".asc","w")
        f.write("ncols "+str(ncols)+"\n")
        f.write("nrows "+str(nrows)+"\n")
        f.write("xllcorner 0.0\n")
        f.write("yllcorner 0.0\n")
        f.write("cellsize 1.0\n")
        f.write("NODATA_value -999\n")
        
        for i in range(nrows):
            for j in range(ncols):
                f.write(str(i+j+f_i)+" ")
            f.write("\n")

        f.close()

maketiles(nfiles,nrows,ncols)

### Template Code
The code cell below is the template that we will be using. This is the 'serial version' of the code that we will be manipulating to add parallelism. Start your exploration by timing the execution of this cell when using serial processing. This becomes 'the time to beat' when adding parallelism.

Also feel free to refer to the in-class activity notebook for information or code examples (such as Pool/map) as well.

In [43]:
%%time
# Note we can use what is called a Jupyter Magic command to time the execution of this cell, which will allow us to tell if we are improving performance when adding parallelism.
# Using the %%time command we can record the amount of execution time that it takes to execute this cell.

# We will use the reduce function in functools.
import functools
# We use numpy for calculations
import numpy as np
# Use use Pool for parallel processing
# from multiprocessing import Pool

# This is a function that will be applied to each cell of our raster using the 'map' function
def my_map(val):
    #Let's just take the square root of the value in this example.
    # This functionality could be swapped out for any type of operation you would like
    return np.sqrt(val)

# This is a function that will be applied to each of the mapped values using the 'functools.reduce' function
def my_reduce(a,b):
    # Let's just take the sum of the values in this example.
    # Like map, this functionality could be swapped out.
    return a+b

# This function will process each tile array
def process_tile_array(tile_array):
    # This rather complex looking statement will flatten the 2D tile_array to a 1D array.
    # Then we map the my_map function to each element in the 1D array.
    # Next, we cast it to a list and back to a np.array (long story as to why)
    mapped_grid = np.array(list(map(my_map,tile_array.flatten())))

    # Now we apply our my_reduce function using the functools.reduce to the mapped_grid
    reduced_value = functools.reduce(my_reduce,mapped_grid)
    print("reduced_value",reduced_value)
    
    return reduced_value

# This function will process a file (when given a file number) by running process_tile_array
def process_file(f_i):
    ascii_grid = np.loadtxt("tmp_raster"+str(f_i)+".asc",skiprows=6)
    val = process_tile_array(ascii_grid)
    return val


def process_tiles(nfiles):
    # We want to apply the reduce function across all the files
    global_reduce = None

    # Loop over all the number of files
    for f_i in range(nfiles):
        ascii_grid = np.loadtxt("tmp_raster"+str(f_i)+".asc",skiprows=6)
        val = process_tile_array(ascii_grid)
        
        if global_reduce is None:
            global_reduce = val
        else:
            global_reduce = my_reduce(val, global_reduce)
        
    print("global_reduce",global_reduce)

process_tiles(nfiles)


reduced_value 174392717.6301426
reduced_value 174442128.78514895
reduced_value 174491513.74298924
reduced_value 174540873.48921126
reduced_value 174590208.42358
reduced_value 174639518.84948972
reduced_value 174688805.02044958
reduced_value 174738067.1580947
global_reduce 1396523833.099106
CPU times: user 15.3 s, sys: 443 ms, total: 15.7 s
Wall time: 15.8 s


### Question 1: (2 points)
Record the wall time for serial execution (seconds): Wall time: 15.8 s

## Map the Tile Array

This stage will look at the finest granularity of parallelism.
Parallelizing the the processing of each individual cell in our 2D raster tile array.

 1. Create a Pool of 4 processes in the process_tile_array function
 2. Replace the python map function with the map function from Pool to process the tile_array in parallel
 
(Please note that there will be very few code changes to introduce parallelism in this stage. As a hint look at the in-class activity code for creating a pool and applying map to a pool of processors)

In [None]:
%%time

import multiprocess as mp

# NOTE: Due to the following error, multiprocess was used instead of multiprocessing based on the fact that 'the multiprocessing module has a major limitation when 
# it comes to IPython use: Functionality within this package requires that the __main__ module be importable by the children. [...] This means that some examples, 
# such as the multiprocessing.pool.Pool examples will not work in the interactive interpreter. [from the documentation]. Fortunately, there is a fork of the 
# multiprocessing module called multiprocess which uses dill instead of pickle to serialization and overcomes this issue conveniently. 
# (https://stackoverflow.com/questions/41385708/multiprocessing-example-giving-attributeerror)

# Process SpawnPoolWorker-23:
# Traceback (most recent call last):
#   File "/Users/mbraaksma/mambaforge/envs/geovenv1/lib/python3.10/multiprocessing/process.py", line 314, in _bootstrap
#     self.run()
#   File "/Users/mbraaksma/mambaforge/envs/geovenv1/lib/python3.10/multiprocessing/process.py", line 108, in run
#     self._target(*self._args, **self._kwargs)
#   File "/Users/mbraaksma/mambaforge/envs/geovenv1/lib/python3.10/multiprocessing/pool.py", line 114, in worker
#     task = get()
#   File "/Users/mbraaksma/mambaforge/envs/geovenv1/lib/python3.10/multiprocessing/queues.py", line 367, in get
#     return _ForkingPickler.loads(res)
# AttributeError: Can't get attribute 'my_map' on <module '__main__' (built-in)>

# This is a function that will be applied to each cell of our raster using the 'map' function
def my_map(val):
    #Let's just take the square root of the value in this example.
    # This functionality could be swapped out for any type of operation you would like
    return np.sqrt(val)

# This is a function that will be applied to each of the mapped values using the 'functools.reduce' function
def my_reduce(a,b):
    # Let's just take the sum of the values in this example.
    # Like map, this functionality could be swapped out.
    return a+b

# This function will process each tile array
def process_tile_array(tile_array):
    
    # Use a Pool to parallelize the mapping step
    with mp.Pool(2) as pool:
        # Flatten the 2D array to a 1D array and map `my_map` in parallel
        mapped_grid = pool.map(my_map, tile_array.flatten())
    
    # This rather complex looking statement will flatten the 2D tile_array to a 1D array.
    # Then we map the my_map function to each element in the 1D array.
    # Next, we cast it to a list and back to a np.array (long story as to why)
    # mapped_grid = np.array(list(map(my_map,tile_array.flatten())))

    # Now we apply our my_reduce function using the functools.reduce to the mapped_grid
    reduced_value = functools.reduce(my_reduce,mapped_grid)

    print("reduced_value", reduced_value)
    return reduced_value

# This function will process a file (when given a file number) by running process_tile_array
def process_file(f_i):
    ascii_grid = np.loadtxt("tmp_raster"+str(f_i)+".asc",skiprows=6)
    val = process_tile_array(ascii_grid)
    return val


def process_tiles(nfiles):
    # We want to apply the reduce function across all the files
    global_reduce = None

    # Loop over all the number of files
    for f_i in range(nfiles):
        ascii_grid = np.loadtxt("tmp_raster"+str(f_i)+".asc",skiprows=6)
        val = process_tile_array(ascii_grid)
        
        if global_reduce is None:
            global_reduce = val
        else:
            global_reduce = my_reduce(val, global_reduce)
        
    print("global_reduce",global_reduce)

process_tiles(nfiles)


reduced_value 174392717.6301426
reduced_value 174442128.78514895
reduced_value 174491513.74298924
reduced_value 174540873.48921126
reduced_value 174590208.42358
reduced_value 174639518.84948972
reduced_value 174688805.02044958
reduced_value 174738067.1580947
global_reduce 1396523833.099106
CPU times: user 8min 32s, sys: 11.5 s, total: 8min 43s
Wall time: 9min 51s


### Question 2: (2 points)
Record the wall time for parallel times using a 4 pooled processes launched in process_tile_array: 9min 14s

### Question 3: (2 points)
Is this wall time what you expected? Why or why not? 

I suspected that it would not be faster and might even be slower because it has to start the parallel processing many times. However, I did not expect it to be **that** much slower, from 15 seconds to nearly 10 minutes. That is a shocking difference and it makes me wonder if I made a mistake somewhere. 

### Question 4: (4 points)
Change the number of parallel process from 4 to 2. 

Record the time using 2 pool processes (2 points): 9min 51s

Write a one sentence summary explaining why there might be a difference in execution times (3 points): The difference, although minimal, would be due to the fact that fewer processes are being used here so it takes slightly longer to compute. 

## Let's move Pool

If you notice, we starting up a new Pool of processors for every raster tile we process.
This may be hurting our performance.
So let's only launch one pool of processors and reuse it for each tile.
In theory this should be faster so lets test it.

 1. Move the Pool() function from process_tile_array to process_tiles
 2. Alter the code so that the parallel map function in process_tile_array can use the pool from process_tiles. Hint: You will likely need to change the way the functions are defined.
 3. Let's use 4 processors again, so change the pool size back to 4.
 4. Run the newer version, verify the results are correct and record the time.

In [None]:
%%time

import multiprocess as mp

# Define the mapping function to be applied to each cell
def my_map(val):
    return np.sqrt(val)

# Define the reducing function to aggregate mapped results
def my_reduce(a, b):
    return a + b

# Modify process_tile_array to accept the pool as an argument
def process_tile_array(tile_array, pool):
    # Use pool.map to process the flattened array in parallel
    mapped_grid = pool.map(my_map, tile_array.flatten())
    
    # Reduce the results to a single value
    reduced_value = functools.reduce(my_reduce, mapped_grid)
    return reduced_value

# Main function to process all tiles 
def process_tiles(nfiles):
    # Create the pool with 4 processes
    with mp.Pool(4) as pool:
        global_reduce = None
        
        # Loop over all files
        for f_i in range(nfiles):
            # Load the raster file
            ascii_grid = np.loadtxt(f"tmp_raster{f_i}.asc", skiprows=6)
            
            # Process the tile array 
            val = process_tile_array(ascii_grid, pool)
            
            # Perform the reduction across tiles
            if global_reduce is None:
                global_reduce = val
            else:
                global_reduce = my_reduce(global_reduce, val)
        
        print("Global reduced value:", global_reduce)

process_tiles(nfiles)

Global reduced value: 1396523833.099106
CPU times: user 8min 32s, sys: 10.7 s, total: 8min 42s
Wall time: 9min 17s


### Question 5: (2 points)
Record the wall time for parallel execution using a 4 pooled processes **launched in process_tiles**: 9min 17s

### Question 6: (4 points)
Is this wall time what you expected? Why or why not? 

This took almost exactly as much time as starting the pooled processes in the process_tile_array function. This makes sense because the nothing has functionally changed about the process, the parallel pools are being executed in the same way. The only difference is that the pools are being input into the function. It is still surprising to me that this level of parallel process takes so much longer than the serial process, but it makes sense that it matches the earlier parallel process. 

## Let's try processing entire files in parallel.

Once we moved Pool to process_tiles we see that beautiful for loop processing each tile over the range of nfiles and we thought to ourselves - we can parallelize that loop!

 1. Using the original serial template code (i.e., remove the parallel map code you just wrote) lets parallelize the file loop
 2. Create a Pool of 4 processes in process_tiles
 3. Change the process_tiles function to use the pool's parallel map function for all the files (nfiles).
    (Hint: look at the unused process_file function)
 4. Change the remaining code in process_tiles to accept the output from our parallel map function and still calculate the correct global_reduce.
    (Hint: the for loops and structure of the code will need to change to add this level of parallelism)
 5. Run the newer version, verify the results are correct and record the time.

In [48]:
%%time

def my_map(val):
    return np.sqrt(val)

def my_reduce(a,b):
    return a+b

def process_tile_array(tile_array):
    mapped_grid = np.array(list(map(my_map,tile_array.flatten())))
    reduced_value = functools.reduce(my_reduce,mapped_grid)
    print("reduced_value",reduced_value)
    return reduced_value

def process_file(f_i):
    ascii_grid = np.loadtxt("tmp_raster"+str(f_i)+".asc",skiprows=6)
    val = process_tile_array(ascii_grid)
    return val


def process_tiles(nfiles):
    # We want to apply the reduce function across all the files
    global_reduce = None

    # Create a list of file indices
    file_indices = list(range(nfiles))
    
    # Use a single pool to parallelize the processing of files
    with mp.Pool(4) as pool:
        # Use pool.map to process each file in parallel
        results = pool.map(process_file, file_indices)
    
    # Reduce the results from all files
    global_reduce = functools.reduce(my_reduce, results)
    print("Global reduced value:", global_reduce)

process_tiles(nfiles)


reduced_valuereduced_valuereduced_valuereduced_value    174540873.48921126174442128.78514895174392717.6301426174491513.74298924



reduced_value 174688805.02044958
reduced_value 174738067.1580947
reduced_value 174639518.84948972
reduced_value 174590208.42358
Global reduced value: 1396523833.099106
CPU times: user 9.83 ms, sys: 35.1 ms, total: 45 ms
Wall time: 4.42 s


### Question 7: (2 points)
Record the wall time for parallel execution using 4 pooled processes to process the files in parallel: 4.42 s

### Question 8: (4 points)
Is this wall time what you expected? Why or why not? 

I expected the wall time to be faster, but the difference is still glaring. This is now faster than the original serial process. Clearly, this is illustrating the fact the the placement of the parallelization is crucial and requires careful consideration. I knew this was important, but I never realized putting it in the wrong spot could also be so detrimental (assuming I did not make an error). Understanding the workflow and where the bottlenecks occur is key. 

### Question 9: (4 points)
Explain (in your own words) why processing files in parallel was different from processing numpy arrays in parallel. Does Amdahl's Law (https://cvw.cac.cornell.edu/Parallel/amdahl) provide any clues to explain the difference in execution times?

Interacting with the file system is generally much slower than working with data already loaded into memory. NumPy is optimized for fast, in-memory calculations, so adding parallel processes often slowed things down due to extra setup time and the need to share data between processes. In contrast, loading files from disk is slow but can be sped up with parallel processing because each file can be read independently without dependencies. The key is to identify which part of the workflow is slow and focus parallel processing there. Amdahl's Law shows that the NumPy processing had a large serial portion, limiting speedup, while file processing was easier to parallelize due to minimal shared overhead. 

### Question 10: (10 points)

Open problem: Now that you have experience with parallelism using the multiprocessing module. Take some time to create your own parallel geospatial method. Feel free to use the code in the Activity and Lab sections. Examples include: FocalMean (Raster), Mean Center (Vector), etc. You have the freedom to make this a relatively simple method or a more complex method.


In [65]:

import numpy as np
from multiprocess import Pool
from pyspatialstats.focal_stats import focal_std



def process_tile(file_index):
    # Load the raster data
    ascii_grid = np.loadtxt("tmp_raster"+str(file_index)+".asc",skiprows=6)
    
    # Apply the focal standard deviation function
    processed_tile = focal_std(ascii_grid, window=3, fraction_accepted=0.7, std_df=0)
    
    return processed_tile

# Main function to process all tiles using a single pool
def process_tiles_in_parallel(nfiles, num_processes, parallel=True):

    if parallel == False:
        for file_index in range(nfiles):
            ascii_grid = np.loadtxt("tmp_raster"+str(file_index)+".asc",skiprows=6)
            result = processed_tile = focal_std(ascii_grid, window=3, fraction_accepted=0.7, std_df=0)
        print(f"Focal STD Complete for serial processes.")

    else:
        file_indices = list(range(nfiles))
        with Pool(num_processes) as pool:
            results = pool.map(process_tile, file_indices)
            print(f"Focal STD Complete for {num_processes} processes.")


In [66]:
%%time
process_tiles_in_parallel(nfiles, num_processes=2)

Focal STD Complete for 2 processes.
CPU times: user 80.1 ms, sys: 349 ms, total: 429 ms
Wall time: 1.25 s


In [67]:
%%time
process_tiles_in_parallel(nfiles, num_processes=4)

Focal STD Complete for 4 processes.
CPU times: user 87.2 ms, sys: 380 ms, total: 467 ms
Wall time: 841 ms


In [68]:
%%time
process_tiles_in_parallel(nfiles, num_processes=None, parallel=False)

Focal STD Complete for serial processes.
CPU times: user 1.53 s, sys: 127 ms, total: 1.65 s
Wall time: 1.68 s


### Question 11: (4 points)
    
Run your new parallel geospatial method using 2 and 4 processes and record the wall time. Is this wall time what you expected?

The wall time is what I would expect, increasing from serial to 2 processes to 4 processes. This pattern makes sense because I set up the code to parallelize the section where file processing is the highest, which benefits significantly from parallel execution. Since this part of the code doesn't have interdependencies between files, it scales well with the addition of more processes. As a result, the speedup from parallel processing is fairly unsurprising, especially given that the overhead associated with managing multiple processes is outweighed by the gains in efficiency for these file reading operations.