In order to run the code in parallel, we first need to add some julia processes.

In [1]:
using Distributed
nprocs() < 3 && addprocs(3)
@everywhere begin
    using Pkg
    Pkg.activate(".")
end
nprocs()

4

The `@everywhere` block activates the current project on the workers. By
default, the workers are created as any other julia session, e.g. defaulting to
the project `vx.y` with `x.y` the current version of julia, unless the
environment variable
['JULIA_PROJECT'](https://docs.julialang.org/en/v1/manual/environment-variables/index.html#JULIA_PROJECT-1)
is set.

In [2]:
@everywhere begin
    using Pkg
    Pkg.activate(pwd())
    using BlockBandedMatrices: BandedBlockBandedMatrix, _BandedBlockBandedMatrix
    using BlockBandedMatrices: BandedBlockBandedSizes
    using BlockArrays: PseudoBlockArray, Block, nblocks
    using SharedArrays: SharedArray

    const SharedPseudoBlock = 
        PseudoBlockArray{T, 2, SharedArray{T, 2}, B} where {T, B}
    const SharedBandedBlockBandedMatrix =
        BandedBlockBandedMatrix{T, SharedPseudoBlock{T, B}} where {T, B}
end

So we have a shared array at the very bottom. It holds the actual data. On it,
we layer an abstraction, the `PseudoBlockArray` that allows us to view the
shared array as an array of blocks. On top of that, the
`BandedBlockBandedMatrix` adds another layer of abstraction to the view the
block array as banded matrix of block banded matrices.

We can now create an overloaded function to initialize the array:

In [3]:
@everywhere function SharedBandedBlockBandedMatrix{T}(
        ::UndefInitializer, bs::BandedBlockBandedSizes; kwargs...) where T
    shared = SharedArray{T}(size(bs); kwargs...)
    block = PseudoBlockArray(shared, bs.data_block_sizes)
    _BandedBlockBandedMatrix(block, bs)
end

@everywhere function SharedBandedBlockBandedMatrix{T}(
        ::UndefInitializer,
        dims::NTuple{2, AbstractVector{Int}},
        lu::NTuple{2, Int},
        λμ::NTuple{2, Int};
        kwargs...
) where T
    blocksizes = BandedBlockBandedSizes(dims..., lu..., λμ...)
    SharedBandedBlockBandedMatrix{T}(undef, blocksizes; kwargs...)
end

To illustrate this, let's create a banded block banded matrix with blocks
composed of 10x10 matrices (`n` and `m`), with the band of blocks from `l` to
`u` and each block banded from `λ` to `μ`.

In [4]:
N, n = 2, 3
l, u, λ, μ = 1, 0, 0, 2
M, m = N + 1, n + 1
A = SharedBandedBlockBandedMatrix{Float64}(undef, (repeat([n], N), repeat([m], M)), (l, u), (λ, μ))

2×3-blocked 6×12 BandedBlockBandedMatrix{Float64,PseudoBlockArray{Float64,2,SharedArray{Float64,2},BlockArrays.BlockSizes{2,Tuple{Array{Int64,1},Array{Int64,1}}}}}:
 0.0  0.0  0.0   ⋅   │   ⋅    ⋅    ⋅    ⋅   │   ⋅    ⋅    ⋅    ⋅ 
  ⋅   0.0  0.0  0.0  │   ⋅    ⋅    ⋅    ⋅   │   ⋅    ⋅    ⋅    ⋅ 
  ⋅    ⋅   0.0  0.0  │   ⋅    ⋅    ⋅    ⋅   │   ⋅    ⋅    ⋅    ⋅ 
 ────────────────────┼──────────────────────┼────────────────────
 0.0  0.0  0.0   ⋅   │  0.0  0.0  0.0   ⋅   │   ⋅    ⋅    ⋅    ⋅ 
  ⋅   0.0  0.0  0.0  │   ⋅   0.0  0.0  0.0  │   ⋅    ⋅    ⋅    ⋅ 
  ⋅    ⋅   0.0  0.0  │   ⋅    ⋅   0.0  0.0  │   ⋅    ⋅    ⋅    ⋅ 

But what we really want to do is to populate the matrix in parallel. We can do
this by allocating blocks for each process to write to:

In [21]:
@everywhere begin
    """Populates the shared banded block banded matrix in parallel"""
    function populate!(
            block_populate!::Function, A::SharedBandedBlockBandedMatrix)
        @sync begin
            for (p, proc) in enumerate(procs(A.data.blocks))
                @async remotecall_wait(populate!, proc, block_populate!, A, p)
            end
        end
        A
    end

    """Populates the blocks of A that are assigned to process p."""
    function populate!(
            block_populate!::Function, A::SharedBandedBlockBandedMatrix, p)
        n = sum(
            min(i + A.u, nblocks(A, 2)) -  max(i - A.l, 1) + 1
            for i in 1:nblocks(A, 1)
        )
        m = length(procs(A.data.blocks))
        start = (n ÷ m) * (p - 1) + min((n % m), p - 1)
        stop = (n ÷ m) * p + min((n % m), p)
        k = 0
        for i in 1:nblocks(A, 1), j in max(i - A.l, 1):min(i + A.u, nblocks(A, 2))
            if k >= stop
                break
            elseif k >= start
                block_populate!(view(A, Block(i, j)), i, j)
            end
            k += 1
        end
        A
    end
end

The code proceeds in a cascade of three steps. First we have a functions to
signal workers to start working. Then we have a funtion that will run on each
worker process. It simply parcels out the list of mutable blocks in the banded
matrix and figures out which are assigned to it's process id. For each assigned
block, it then calls the third function in the cascade, to actually populate the
given block.

Performance-minded users will notice that this simple-minded implementation runs
on idle for a fair bit of the loop over blocks. But in practice, this is a small
waste compared to latency issues we will soon dive into.

The magic happens in the `@sync` for loop. In practice, each `@async` loop item
generates a task and schedules it to be sent and executed by a process.  Then,
right before exiting the `@sync` code-block, the tasks are sent over to the
processes, and the code waits until the tasks are finished running.

In practice, usage looks like this:

In [22]:
populate!(A) do block, i, j
    # block: a view of the block to populate
    # i, j: indices of the block
    for x in 1:size(block, 1), y in max(x - A.λ, 1):min(x + A.μ, size(block, 2))
        block[x, y] = 1
    end
end

2×3-blocked 6×12 BandedBlockBandedMatrix{Float64,PseudoBlockArray{Float64,2,SharedArray{Float64,2},BlockArrays.BlockSizes{2,Tuple{Array{Int64,1},Array{Int64,1}}}}}:
 1.0  1.0  1.0   ⋅   │   ⋅    ⋅    ⋅    ⋅   │   ⋅    ⋅    ⋅    ⋅ 
  ⋅   1.0  1.0  1.0  │   ⋅    ⋅    ⋅    ⋅   │   ⋅    ⋅    ⋅    ⋅ 
  ⋅    ⋅   1.0  1.0  │   ⋅    ⋅    ⋅    ⋅   │   ⋅    ⋅    ⋅    ⋅ 
 ────────────────────┼──────────────────────┼────────────────────
 1.0  1.0  1.0   ⋅   │  1.0  1.0  1.0   ⋅   │   ⋅    ⋅    ⋅    ⋅ 
  ⋅   1.0  1.0  1.0  │   ⋅   1.0  1.0  1.0  │   ⋅    ⋅    ⋅    ⋅ 
  ⋅    ⋅   1.0  1.0  │   ⋅    ⋅   1.0  1.0  │   ⋅    ⋅    ⋅    ⋅ 

For comparison we also construct the serial version of populate, as well as a
simple scheme to fill blocks with ones:

In [31]:
@everywhere function populate!(
        block_populate!::Function, A::BandedBlockBandedMatrix)
    for i in 1:nblocks(A, 1), j in max(i - A.l, 1):min(i + A.u, nblocks(A, 2))
        block_populate!(view(A, Block(i, j)), i, j)
    end
    A
end

@everywhere function simplefill!(A::BandedBlockBandedMatrix)
    populate!(A) do bk, i, j
        for x in 1:size(bk, 1), y in max(x - A.λ, 1):min(x + A.μ, size(bk, 2))
            bk[x, y] = 1
        end
    end
end

And now we can run benchmarks:

In [32]:
using BenchmarkTools

N, n = 8, 3
l, u, λ, μ = 1, 2, 0, 1
M, m = 6, n
A = SharedBandedBlockBandedMatrix{Float64}(undef, (repeat([n], N), repeat([m], M)), (l, u), (λ, μ))
@benchmark simplefill!($A)

BenchmarkTools.Trial: 
  memory estimate:  22.58 KiB
  allocs estimate:  414
  --------------
  minimum time:     715.410 μs (0.00% GC)
  median time:      846.439 μs (0.00% GC)
  mean time:        916.948 μs (3.02% GC)
  maximum time:     112.000 ms (88.85% GC)
  --------------
  samples:          5417
  evals/sample:     1

In [33]:
B = BandedBlockBandedMatrix{Float64}(undef, (repeat([n], N), repeat([m], M)), (l, u), (λ, μ))
@benchmark simplefill!($B)

BenchmarkTools.Trial: 
  memory estimate:  9.86 KiB
  allocs estimate:  127
  --------------
  minimum time:     5.593 μs (0.00% GC)
  median time:      5.980 μs (0.00% GC)
  mean time:        9.163 μs (28.97% GC)
  maximum time:     12.282 ms (99.89% GC)
  --------------
  samples:          10000
  evals/sample:     6

And that's not a win for the multi-core team... But we do not always expect a
win in every situation. Surely, the performance depends on the size of the
matrix? So let's try a few:

In [None]:
n, l, u, λ, μ = 500, 1, 2, 2, 1
serial, parallel, Ns = [], [], [10, 50, 100, 150, 200, 300, 500]
for N = Ns
    A = SharedBandedBlockBandedMatrix{Float64}(
            undef, (repeat([n], N), repeat([n], N)), (l, u), (λ, μ))
    bench = @benchmark simplefill!($A)
    push!(parallel, bench)

    B = BandedBlockBandedMatrix{Float64}(
            undef, (repeat([n], N), repeat([n], N)), (l, u), (λ, μ))
    bench = @benchmark simplefill!($B)
    push!(serial, bench)
end

In [None]:
using VegaLite, DataFrames, Statistics
DataFrame(
    mean = vcat(
        getfield.(mean.(serial), :time) ./ 1000,
        getfield.(mean.(parallel), :time) ./ 1000,
    ),
    stddev = vcat(
        std.(getfield.(serial, :times)) ./ 1000,
        std.(getfield.(parallel, :times)) ./ 1000
    ),
    n = repeat(Ns, 2),
    method=vcat(
        repeat([:serial], length(Ns)),
        repeat([:parallel], length(Ns))
    )
) |> (
    @vlplot(mark={:line, point=:true}, x=:n, y=:mean, color=:method) +
    @vlplot(
        mark={:errorband, extent=:ci},
        n=:n,
        y=:stddev
    )
)

In [14]:
std.(getfield.(serial, :times)) ./ 10000

7-element Array{Float64,1}:
  303.8889937782906
  747.7536767424269
 1339.3343172786615
  899.3954615635149
  310.9093080492296
 3391.0202258713753
 3315.8159739456196

In [15]:
mean.(getfield.(serial, :times)) ./ 10000

7-element Array{Float64,1}:
   476.9193021988528
  2775.319064640884 
  5451.698193478261 
  8101.326285483871 
 10480.640877083333 
 18175.371239285712 
 28587.91227222222  

In [16]:
std.(getfield.(parallel, :times)) ./ 10000

7-element Array{Float64,1}:
 318.24505392235704
 661.6445949526782 
 299.16077736186764
 522.0748811666896 
 776.0944252164588 
 584.9854600656942 
 952.1325470190965 

In [17]:
mean.(getfield.(parallel, :times)) ./ 10000

7-element Array{Float64,1}:
   389.83337492187496
  1482.526084272997  
  2420.3909579710144 
  3683.3924544117644 
  5146.81323877551   
  7490.143041791044  
 12107.092588095238  

In [73]:
minimum(parallel[1].times)

2.887732e6