# 33) HW4 Solution

## Assignment Steps

1. (30%) Write your own MST reduce(to-one) algorithm to sum the rank IDs of all the ranks in the whole communicator. This version should use recursion (as we've seen in class for the MST broadcast)

In [None]:
# Example implementation of MSTReduce (Fig. 3(b)) from
# Chan, E., Heimlich, M., Purkayastha, A. and van de Geijn, R. (2007),
# Collective communication: theory, practice, and experience. Concurrency
# Computat.: Pract. Exper., 19: 1749–1783. doi:10.1002/cpe.1206
#
# In this minimum spanning tree (mst) reduce algorithm:
#   - Divide ranks into two (almost) equal group
#   - all ranks in the partition (called srce) reduce data to one rank (root)
#   - recurse on two groups with root and srce being the "roots" of respective
#     groups
#

function mstreduce!(buf, root, mpicomm;
    left = 0, right = MPI.Comm_size(mpicomm)-1)

    # If there is no one else, let's get outta here!
    left == right && return
    # Short hand for:
    #=
    if left == right
    return
    end
    =#

    # Determine the split
    mid = div(left + right, 2) # integer division

    # Whom do I send to?
    srce = (root <= mid) ? right : left
    # Short hand for:
    #=
    if root <= mid
    srce = right;
    else
    srce = left;
    end
    =#

    # Figure out who we are
    mpirank = MPI.Comm_rank(mpicomm)

    # Recursion:
    # I'm in the left group and the root is my new root
    if mpirank <= mid && root <= mid
        mstreduce!(buf, root, mpicomm; left=left, right=mid)
    # I'm in the left group and the srce is my new root
    elseif mpirank <= mid && root > mid
        mstreduce!(buf, srce, mpicomm; left=left, right=mid)
    # I'm in the right group and the srce is my new root
    elseif mpirank > mid && root <= mid
        mstreduce!(buf, srce, mpicomm; left=mid + 1, right=right)
    # I'm in the right group and the root is my new root
    elseif mpirank > mid && root > mid
        mstreduce!(buf, root, mpicomm; left=mid + 1, right=right)
    end

    # If I'm the root or srce send or recv (respectively)
    req = MPI.REQUEST_NULL
    if mpirank == srce
        req = MPI.Isend(buf, root, 7, mpicomm)
    elseif mpirank == root
        # Allocate somewhere to put the data when we get it
        recv_buf = similar(buf)
        MPI.Recv!(recv_buf, srce, 7, mpicomm)
        # update buffer with the received data
        buf .= buf .+ recv_buf
    end


    # Make sure all my sends are done before I get outta dodge
    if req != MPI.REQUEST_NULL
        MPI.Wait!(req)
    end

end

2. (20%) Extend the testing code `naivereduce_test.jl` to compare the execution times of the naive implementation and your MST implementation (similar to what we did in class to compare the naive and MST broadcasts).

I will give the test that compares all methods below in Part 4.

3. (15%) Use the tuckoo cluster to test the execution up to 16 MPI ranks. Use the SLURM batch job script provided in `batch_scripts/batch.jello` as a template and modify it accordingly to launch your program.

```{literalinclude} ../julia_codes/hw4_sol/batch.jreduce
:language: bash
:linenos: true
```

4. (35%) In the attached Report.ipynb write a commentary and implement your own post-processing/data analysis of your execution results, producing 1 or 2 figures with detailed captions presenting the results of your study.

In [None]:
using MPI
include("../julia_codes/hw4_sol/naivereduce.jl")
include("../julia_codes/hw4_sol/mstreduce.jl")
include("../julia_codes/hw4_sol/mstreduce_iter.jl")

let
  # Initialize MPI
  MPI.Init()

  # store communicator
  mpicomm = MPI.COMM_WORLD

  # Get some MPI info
  mpirank = MPI.Comm_rank(mpicomm)
  mpisize = MPI.Comm_size(mpicomm)

  # Divide all ranks halfway to determine the root
  root = div(mpisize, 2) # Integer division

  # create buffer for the communication
  buf = [mpirank]

  # test the different reduce versions (non-recursive MST, recursive MST, and naivereduce)
  mstreduce_iter!(buf, root, mpicomm)
  mstreduce!(buf, root, mpicomm)
  naivereduce!(buf, root, mpicomm)

  # Let's time them
  mst_iter_t1 = time_ns() # The time_ns() function in Julia returns the current time in nanoseconds
  mstreduce_iter!(buf, root, mpicomm)
  mst_iter_t2 = time_ns()
  mst_t1 = time_ns() # The time_ns() function in Julia returns the current time in nanoseconds
  mstreduce!(buf, root, mpicomm)
  mst_t2 = time_ns()
  nve_t1 = time_ns()
  naivereduce!(buf, root, mpicomm)
  nve_t2 = time_ns()

  mpirank == 0 && print("Elapsed time for the naive algorithm: \n")
  mpirank == 0 && @show (nve_t2 - nve_t1) * 1e-9
  mpirank == 0 && print("Elapsed time for the recursive mst algorithm: \n")
  mpirank == 0 && @show (mst_t2 - mst_t1) * 1e-9
  mpirank == 0 && print("Elapsed time for the iterative (non-recursive) mst algorithm: \n")
  mpirank == 0 && @show (mst_iter_t2 - mst_iter_t1) * 1e-9

  # shutdown MPI
  MPI.Finalize()
end


Launching the script from step 3) that executes the `reduce_compare.jl` under different MPI ranks, produces an output file with the following output:

```{literalinclude} ../julia_codes/hw4_sol/jred_comp.out
:language: bash
:linenos: true
```

The following post-processing code is used to produce the comparison plot:

In [None]:
using Plots
default(linewidth=4, legendfontsize=12)
using CSV, DataFrames

test_results = DataFrame(exec_num = Int[], naive_time = Float64[], mst_time = Float64[], iter_time = Float64[])

open("../julia_codes/hw4_sol/jred_comp.out", "r")  do f

    output_file = read(f, String)

    # Regular expressions
    # First, split the read file into blocks per Execution
    blocks = split(output_file, "=====================================")

    # Prepare an empty DataFrame
    df = DataFrame(execution = Int[], naive_time = Float64[], recursive_mst_time = Float64[], iterative_time = Float64[])

    # Loop through each block
    for block in blocks
        if occursin(r"Execution \d+ MPI rank", block)
            # Extract Execution number
            exec_match = match(r"Execution (\d+)", block)
            exec_num = parse(Int, exec_match.captures[1])

            # Extract times
            naive_match = match(r"Elapsed time for the naive algorithm:.*?=\s*(-?\d+\.?\d*(?:[eE][-+]?\d+|))"s, block)
            recursive_mst_match = match(r"Elapsed time for the recursive mst algorithm:.*?=\s*(-?\d+\.?\d*(?:[eE][-+]?\d+|))"s, block)
            iterative_match = match(r"Elapsed time for the iterative \(non-recursive\) mst algorithm:.*?=\s*(-?\d+\.?\d*(?:[eE][-+]?\d+|))"s, block)

            if naive_match !== nothing && recursive_mst_match !== nothing && iterative_match !== nothing

                # Populate if matches found

                naive_time = parse(Float64, naive_match.captures[1])
                mst_time = parse(Float64, recursive_mst_match.captures[1])
                iter_time = parse(Float64, iterative_match.captures[1])

                push!(test_results, (exec_num = exec_num, naive_time = naive_time, mst_time = mst_time, iter_time = iter_time))
            end
        end
    end
end

CSV.write("test_results.csv", test_results)

pl = plot(
    test_results.exec_num, test_results.naive_time,
    yscale = :log10,
    xlims = (0,16),
    xlabel = "MPI size",
    ylabel = "Execution time",
    label = "Naive reduce",
    title = "Comparison of different parallel MPI reduce(to-one) algorithms",
    marker=:o
)

plot!(test_results.exec_num, test_results.mst_time, label = "MST (recursive) reduce", marker=:diamond)
plot!(test_results.exec_num, test_results.iter_time, label = "Iterative reduce", marker=:star5)

display(pl)
savefig(pl, "comparison_naive_mst_iter_reduce.png")
