# Scaling Computations using Parallel Computing

### By Julian Samaroo and Przemyslaw Szufel

In [1]:
# Add some workers with threads
using Distributed; addprocs(2; exeflags=["--project=$(joinpath(pwd(), ".."))", "--threads=2"])
@everywhere using Dagger
@everywhere using BenchmarkTools

I'll make a brazen assertion: Everyone wants their code to go fast, and everyone would love if their code could work on any kind of hardware efficiently: multiple CPUs, multiple servers, GPUs, other accelerators, etc. You can just imagine running your code and suddenly all your hardware spins up, doing your bidding at full tilt.

Unfortunately, unless you're very knowledgeable about computer hardware, locking and atomics, latency hiding, distributed scheduling, GPU programming, etc., this is very difficult to accomplish by rolling your own solution.

# What is Dagger?

![](https://github.com/JuliaParallel/Dagger.jl/raw/master/docs/logo.jpg)

## Overview of Dagger.jl

- Unified task interface for multithreading, distributed, and GPU programming
- Distributed high-performance scheduler using machine learning
- APIs for programming with arrays, tables, and graphs
- Checkpointing and out-of-core enables solving large problems

### What difficult problems does Dagger solve?

- Cross-task dependency management
- Abstracting computation across servers and threads
- Dynamic workload balancing of heterogeneous tasks
- Automating data transfer while hiding latency
- Automatic GPU utilization and data conversion

## How to program with tasks

In Dagger, all tasks take any number of arguments, a function to call with those arguments, and produce a result:

In [2]:
t = Dagger.@spawn 1+2
@show t
fetch(t)

t = EagerThunk (running)


3

That `Dagger.@spawn` operation (a Julia macro) wraps the `+` function and the arguments `1` and `2` in a Dagger task, and executes it asynchronously.  (You'll notice that it's an `EagerThunk` object - but you really don't need to worry about the weird name, just know that it means "Dagger task").

Another thing to notice is that just like other "async/await"-style implementations, we need to call the `fetch` function to wait for the task to finish and get its result. If you don't call `fetch`, that's ok - it'll still run in the background, and you can `fetch` it whenever you're ready to see the result.

Like you might expect when working with functions, one task can use the result of another task as its input:

In [3]:
a = Dagger.@spawn 1+2
fetch(Dagger.@spawn 2*a)

6

Dagger will always execute your tasks in the order specified by its arguments (in the form of a "Directed Acyclic Graph", or "DAG", hence the name "Dagger"!)

It's easy to construct your program from a bunch of inter-dependent tasks:

In [4]:
a = Dagger.@spawn 1 + 1
b = Dagger.@spawn a * 2
c = Dagger.@spawn a / 2
d = Dagger.@spawn b - c
fetch(d)

3.0

In the above code, Dagger can execute `b` and `c` in parallel, since they don't depend on each other:

In [5]:
# `a` ---> `b` --->
#  |              |
#  v              v
#  ------> `c` ->`d`

Conveniently, you don't have to tell Dagger to execute `b` and `c` in parallel - it's automatic! Dagger will automatically use all Julia threads to run computations, and will automatically balance each thread's workload to keep them busy. We'll show this in more depth momentarily.

And it's not just multithreading, Dagger also supports distributed computing with multiprocessing. Let's use the workers that I sneakily added at the beginning of this notebook and prove that Dagger will use them:

In [6]:
# We use @everywhere here to make sure all workers know about our `who_am_i` function
@everywhere function who_am_i()
    id = Distributed.myid() # Which worker are we? Starts from 1
    println("Hello from worker $id")
end

wait(Dagger.@spawn scope=Dagger.scope(worker=1) who_am_i()); # This will definitely run locally (on worker 1)
wait(Dagger.@spawn scope=Dagger.scope(worker=2) who_am_i()); # This will definitely run on worker 2
wait(Dagger.@spawn scope=Dagger.scope(worker=3) who_am_i()); # This will definitely run on worker 3
wait(Dagger.@spawn who_am_i()); # This might run locally or on any worker; try re-running this!

Hello from worker 1


      From worker 2:	Hello from worker 2


      From worker 3:	Hello from worker 3


      From worker 3:	Hello from worker 3


So wait, why did we need the `scope=...` thing to make sure that Dagger runs our code on workers 2 and 3? Unlike other libraries, Dagger can run your code on worker and any thread, whenever it wants! `scope=...` forces Dagger to constrain itself to executing your code only where you request.

Hold up, isn't this a source of madness? How can I ever write reliable code with Dagger? Do I just need to litter `scope=...` everywhere? How is this any better than Julia's `remotecall_fetch`, `@spawnat`, and friends, which are slightly less verbose?

Dagger takes a different approach to high performance computing: instead of architecting your code around a specific workload/data distribution (which takes expertise to implement and maintain, and adds a lot of complexity to otherwise simple code), Dagger gives you the freedom to write code which is free from hardware-specific minutia.

Instead of needing to decide where to run your code, and then needing to also make sure your data gets there first, Dagger just asks you for three things: your functions, their arguments, and how they depend on each other. Dagger's scheduler handles the rest - it moves data around, schedules your functions to run close to that data, and balances execution of your functions so that they exploit all the computing resources you have available.

Forget learning about multithreading, atomics, parallel data movement, even GPU computing - this is the future! Just write your code, and let Dagger handle those pesky details for you.

Let's extend our `who_am_i` function to see how Dagger really shines:

In [7]:
# We use @everywhere here to make sure all workers know about our `who_am_i_extended` function
@everywhere function who_am_i_extended()
    id = Distributed.myid()
    tid = Threads.threadid()
    println("Hello from worker $id's thread $tid")
end

wait.([Dagger.@spawn who_am_i_extended() for i in 1:20]); # These can each run on any worker and any thread

      From worker 2:	Hello from worker 2's thread 1
      From worker 3:	Hello from worker 3's thread 1
Hello from worker 1's thread 2
Hello from worker 1's thread 1
Hello from worker 1's thread 1
Hello from worker 1's thread 2
Hello from worker 1's thread 1
Hello from worker 1's thread 2
      From worker 3:	Hello from worker 3's thread 2
      From worker 3:	Hello from worker 3's thread 1
      From worker 2:	Hello from worker 2's thread 2
      From worker 2:	Hello from worker 2's thread 1
      From worker 2:	Hello from worker 2's thread 2
      From worker 2:	Hello from worker 2's thread 1
      From worker 3:	Hello from worker 3's thread 1
      From worker 2:	Hello from worker 2's thread 1
      From worker 2:	Hello from worker 2's thread 2
      From worker 3:	Hello from worker 3's thread 2
      From worker 3:	Hello from worker 3's thread 1
      From worker 3:	Hello from worker 3's thread 1


Congratualations, you are now a high-performance computing practitioner! You've just parallelized the execution of a bunch of functions without doing much of anything, which is something that would be quite difficult to accomplish with other libraries.

Of course, this example is quite simple and doesn't do much. Let's show off Dagger's performance with a problem which does some heavy computations:

In [8]:
# A function which does something expensive with arrays
@everywhere function some_function(i, arrs...)
    @nospecialize arrs
    total = 0.0
    for arr in arrs
        total += i * sum(inv(arr))
    end
    return rand(0.0:abs(total), 100, 100)
end

# A function which generates a large DAG of expensive computations
function create_dag(f, Ns...; use_dagger=false)
    last_nodes = []
    current_nodes = []
    
    push!(last_nodes, rand(100, 100))

    for N in Ns
        println("Generating $N nodes")
        for i in 1:N
            if use_dagger
                push!(current_nodes, Dagger.@spawn f(i, last_nodes...))
            else
                push!(current_nodes, f(i, last_nodes...))
            end
        end
        empty!(last_nodes)
        append!(last_nodes, current_nodes)
        empty!(current_nodes)
    end
    
    return fetch.(last_nodes)
end

# Disable BLAS multithreading - while BLAS multithreading is very efficient,
# many workloads do not automatically parallelize themselves :)
using LinearAlgebra; BLAS.set_num_threads(1)

In [9]:
# Let's try just running this code in serial:
@time create_dag(some_function, 100, 50; use_dagger=false);

Generating 100 nodes


Generating 50 nodes


  1.503183 seconds (596.34 k allocations: 692.465 MiB, 1.88% gc time, 28.31% compilation time)


In [10]:
# Let's go faster with Dagger:
@time create_dag(some_function, 100, 50; use_dagger=true);

Generating 100 nodes


Generating 50 nodes


  3.077079 seconds (1.57 M allocations: 319.618 MiB, 7.25% gc time, 60.92% compilation time)


Of course, Dagger's performance is dependent on the problem. In the above case, we don't get linear scaling because linear algebra operations are so homogeneous (thus bottlenecking within the CPU processor) and use so much memory (thus bottlenecking the CPU caches).

Ok, now let's take a look at a problem which, while being still homoegeneous in nature, utilizes less memory per function:

In [11]:
using Random, Statistics

# This function allocates a lot of memory, but doesn't need to transfer it
@everywhere function delay(job_size)
    z = Vector{Float64}(undef, 1_000_000)
    sum(std(rand!(z)) for i in 1:round(Int, 660*job_size))
end

In [12]:
# Calling it like this generates one long-running task, and a bunch of shorter-running tasks
@time [delay(i <= 1 ? 1.5 + i : round((i % 3)/5 .+ 0.1; digits=2)) for i in 1:15];

 10.510233 seconds (225.56 k allocations: 129.867 MiB, 1.00% gc time, 1.51% compilation time)


In [13]:
# Let's watch Dagger smash this problem:
@time wait.([(Dagger.@spawn delay(i <= 1 ? 1.5 + i : round((i % 3)/5 .+ 0.1; digits=2))) for i in 1:15]);

  2.802479 seconds (1.03 M allocations: 110.260 MiB, 0.54% gc time, 50.10% compilation time)


This amazing performance looks like:

![image.png](attachment:image.png)

## Data Movement

Tasks can run anywhere, which also means that their data must move with them. Dagger automatically moves data from where it is, to where it needs to be, as necessary:

In [14]:
function time_data_xfer()
	# Make some data on worker 2
	A = Dagger.with_options(scope=Dagger.scope(worker=2)) do
		Dagger.@spawn rand(10240,1024)
	end
	wait(A)

	# Use the data on the same worker
	t_local = @timed(Dagger.with_options(scope=Dagger.scope(worker=2)) do
		fetch(Dagger.@spawn sum(A))
	end).time

	# Use the data on a different worker
	t_remote = @timed(Dagger.with_options(scope=Dagger.scope(worker=3)) do
		fetch(Dagger.@spawn sum(A))
	end).time
	return t_remote / t_local
end

time_data_xfer (generic function with 1 method)

In [15]:
println("Speedup when data isn't moved around:")
time_data_xfer()

Speedup when data isn't moved around:


1.076997621309409

Of course, it would be quite silly for Dagger to always move data around when that data gets really big - sometimes it's better for the data not to move at all. Thankfully, Dagger knows this, and will schedule tasks such that if it truly is faster to execute a task where the data is, it will do so. We can see this pretty reliably with a simple example:

In [16]:
@everywhere function where_am_i_data(arr, name)
	println("Accessing $name (sum $(sum(arr)))")
end
function time_data_xfer_no_move()
	# Make some data on worker 2
	A = Dagger.with_options(scope=Dagger.scope(worker=2)) do
		Dagger.@spawn rand(1024,1024)
	end
	# Make some data on worker 3
	B = Dagger.with_options(scope=Dagger.scope(worker=3)) do
		Dagger.@spawn rand(1024,1024)
	end
	wait(A)
	wait(B)

	# Let Dagger choose where to execute
	for i in 1:5
		wait(Dagger.@spawn where_am_i_data(A, :A))
	end
	for i in 1:5
		wait(Dagger.@spawn where_am_i_data(B, :B))
	end
end

time_data_xfer_no_move (generic function with 1 method)

In [17]:
time_data_xfer_no_move()

      From worker 2:	Accessing A (sum 524055.5343318614)


      From worker 2:	Accessing A (sum 524055.5343318614)
      From worker 2:	Accessing A (sum 524055.5343318614)
      From worker 2:	Accessing A (sum 524055.5343318614)
      From worker 2:	Accessing A (sum 524055.5343318614)


      From worker 3:	Accessing B (sum 524560.1945853664)
      From worker 3:	Accessing B (sum 524560.1945853664)
      From worker 3:	Accessing B (sum 524560.1945853664)
      From worker 3:	Accessing B (sum 524560.1945853664)
      From worker 3:	Accessing B (sum 524560.1945853664)


We can see that Dagger's scheduler (the logic that chooses where tasks should execute) has decided to schedule tasks on the same worker that holds the data that they're accessing. This is the optimal choice to make, but it's also pretty obvious.

What happens if Dagger has conflicting priorities (say, a task accesses data spread across multiple workers)? Let's check:

In [18]:
@everywhere function where_am_i_data_split(a_arr, a_name, b_arr, b_name)
	println("Accessing $a_name (sum $(sum(a_arr))) and $b_name (sum $(sum(b_arr)))")
end
function time_data_xfer_maybe_move()
	# Make some data on worker 2
	A = Dagger.with_options(scope=Dagger.scope(worker=2)) do
		Dagger.@spawn rand(1024,1024)
	end
	# Make some data on worker 3
	B = Dagger.with_options(scope=Dagger.scope(worker=3)) do
		Dagger.@spawn rand(1024,1024)
	end
	wait(A)
	wait(B)

	# Let Dagger choose where to execute
	for i in 1:10
		wait(Dagger.@spawn where_am_i_data_split(A, :A, B, :B))
	end
end

time_data_xfer_maybe_move (generic function with 1 method)

In [19]:
time_data_xfer_maybe_move()

      From worker 3:	Accessing A (sum 524502.0286563789) and B (sum 524503.2006047677)


      From worker 3:	Accessing A (sum 524502.0286563789) and B (sum 524503.2006047677)
      From worker 3:	Accessing A (sum 524502.0286563789) and B (sum 524503.2006047677)
      From worker 3:	Accessing A (sum 524502.0286563789) and B (sum 524503.2006047677)
      From worker 3:	Accessing A (sum 524502.0286563789) and B (sum 524503.2006047677)
      From worker 3:	Accessing A (sum 524502.0286563789) and B (sum 524503.2006047677)
      From worker 2:	Accessing A (sum 524502.0286563789) and B (sum 524503.2006047677)
      From worker 2:	Accessing A (sum 524502.0286563789) and B (sum 524503.2006047677)
      From worker 2:	Accessing A (sum 524502.0286563789) and B (sum 524503.2006047677)


      From worker 2:	Accessing A (sum 524502.0286563789) and B (sum 524503.2006047677)


Interesting! Dagger chose to schedule these tasks almost evenly across workers 2 and 3. This makes sense, if you think about it - there's no benefit to picking worker 2 vs worker 3, so we might as well spread the work between them.

This is the sort of stuff that feels obvious to us as humans, but is quite a lot of work to implement everytime you want to solve a parallel computing problem (especially considering that Julia's Distributed library doesn't do this for you). It can be invaluable to have a smart scheduler on your side when writing parallel code!

Now that we've seen how Dagger's task system works, and the benefits it provides, let's take a look at the user-friendly APIs that build on top of it - arrays and tables.

## Array Programming with Dagger

Arrays are super useful! Julia uses them all throughout its codebase, and nearly every Julia package uses them heavily. They are simple, storage-efficient, and have a huge range of available functionality. You can build anything with the right set of arrays and algorithms that use them.

All of that said, parallel array operations is not really first-class in Julia - operations like `sum`, `reduce`, broadcasting, and much more, don't use multithreading at all, even if you have a bunch of CPUs and have started Julia with multiple threads. While this may eventually change in future versions of Julia, it will probably only happen for a handful of array operations at a time, and won't just magically apply to the operations that your code uses.

Of course, Julia *does* have first-class multithreading support, so we can always roll our own multithreaded array operations:

In [20]:
function mt_sum(A::Array{T}) where T
    nchunks = Threads.nthreads()
    chunksize = cld(length(A), nchunks)
    reds = zeros(T, nchunks)
    @sync for (idx, chunk) in enumerate(Iterators.partition(A, chunksize))
        Threads.@spawn begin
            reds[idx] = sum(chunk)
        end
    end
    return sum(reds)
end

mt_sum (generic function with 1 method)

In [21]:
A = rand(10240, 10240)
@assert sum(A) ≈ mt_sum(A)
println("Single-threaded:")
@btime sum($A)
println("Multi-threaded:")
@btime mt_sum($A)

Single-threaded:


  43.957 ms (0 allocations: 0 bytes)
Multi-threaded:


  31.067 ms (30 allocations: 2.22 KiB)


5.243345009842046e7

So it can definitely be done, but it requires some work, and you have to redo that work for every operation you want multithreaded.

Also, the above code doesn't support distributed execution. If you want that, there's a different way to write the sum:

In [22]:
function dist_sum(A::Array{T}) where T
    nchunks = nprocs()
    chunksize = cld(length(A), nchunks)
    return sum(pmap(sum, Iterators.partition(A, chunksize)))
end

dist_sum (generic function with 1 method)

In [23]:
A = rand(10240)
@assert sum(A) ≈ dist_sum(A)
println("Single-worker:")
@btime sum($A)
println("Multi-worker:")
@btime dist_sum($A)

Single-worker:


  793.032 ns (0 allocations: 0 bytes)
Multi-worker:


  215.460 μs (234 allocations: 275.62 KiB)


5108.65573377436

Not bad, but it blows up if we use really big arrays, and it would be nice if we could use both multithreading and distributed. Let's try rolling our own by combining the two above implementations:

In [24]:
# We need to support slices of arrays, so redefine this for AbstractArray
@everywhere function mt_sum(A::AbstractArray{T}) where T
    nchunks = Threads.nthreads()
    chunksize = cld(length(A), nchunks)
    reds = zeros(T, nchunks)
    @sync for (idx, chunk) in enumerate(Iterators.partition(A, chunksize))
        Threads.@spawn begin
            reds[idx] = sum(chunk)
        end
    end
    return sum(reds)
end
function mt_dist_sum(A::Array{T}) where T
    nchunks = nprocs()
    chunksize = cld(length(A), nchunks)
    return sum(pmap(mt_sum, Iterators.partition(A, chunksize)))
end

mt_dist_sum (generic function with 1 method)

In [25]:
A = rand(10240, 10240)
@assert sum(A) ≈ mt_dist_sum(A)
println("Single-worker, single-threaded:")
@btime sum($A)
println("Multi-worker, multi-threaded:")
@btime mt_dist_sum($A)

Single-worker, single-threaded:


  39.764 ms (0 allocations: 0 bytes)
Multi-worker, multi-threaded:


  531.157 ms (249 allocations: 800.01 MiB)


5.242705427442513e7

Ok, it's workable, but there's something that's rather annoying about this implementation - we need to allocate the array on one worker, then send portions of it to all the other workers, only to copy the results back to the original worker. What if we could construct the chunks of the array on the other workers to start with, and then just let the workers compute their local sums?

Enter the `DArray`:

In [26]:
# Create a random distributed array with chunks of size 1024 x 1024
DA = fetch(rand(Blocks(1024, 1024), 10240, 10240))

# Do a distributed and multithreaded sum
@btime sum($DA)

# Make sure results are correct
@assert sum(collect(DA)) ≈ sum(DA)

  204.933 ms (129198 allocations: 14.82 MiB)


Not bad! Performance isn't perfect, but it's not too bad for an automatically distributed and multithreaded operation (which is generically implemented, and will have similar performance for all kinds of reductions). This performance will improve over time as Dagger itself improves.

You can see that just like Dagger tasks, Dagger arrays (the `DArray`) is lazy. `fetch` is used to ensure that operations are completed, and `collect` is used to convert a `DArray` into a regular `Array`.

The `DArray` supports a variety of array operations just like Julia's `Array`s:

In [27]:
# Create a local array (not distributeed)
A = rand(64, 64)

# Let's distribute A across our workers by partitioning in 16 x 16 blocks
DA = distribute(A, Blocks(16, 16))

# Do a map operation on it:
DB = map(x->x*2, DA)

# And get the result back as an Array:
collect(DB)

64×64 Matrix{Float64}:
 0.901096  0.936028    0.98404   …  0.244343   0.923511    1.55203
 1.68035   0.00911117  1.1324       1.31244    0.793101    0.260467
 1.9033    1.60656     0.304407     0.248602   1.4813      1.44645
 1.02934   0.123294    0.333614     1.15623    0.359544    1.92108
 1.38282   0.953327    1.54404      0.224876   1.04766     0.0654717
 0.503089  0.237317    0.738415  …  1.88981    0.785976    0.265817
 0.77943   0.940832    1.08712      1.61909    1.67164     0.0268055
 1.26979   1.14929     0.502468     1.4354     0.838045    0.66643
 0.535601  1.58683     1.04563      0.0708674  0.94892     1.59977
 1.89287   1.96128     0.137881     0.316871   1.30119     0.665576
 ⋮                               ⋱                         
 1.94155   1.8341      1.42129   …  0.843813   0.915071    0.292815
 1.49684   0.722937    1.3549       1.56335    0.00381643  1.62092
 1.37029   0.158569    1.36387      1.73274    0.047664    1.84886
 1.01971   0.274718    0.685214     1.

Complex operations like broadcasting work too:

In [28]:
collect(DA .* 2 .+ DB)

64×64 Matrix{Float64}:
 1.80219   1.87206    1.96808   3.85757    …  0.488687  1.84702     3.10405
 3.36071   0.0182223  2.26479   2.32175       2.62488   1.5862      0.520934
 3.8066    3.21311    0.608814  1.62834       0.497205  2.96261     2.89289
 2.05869   0.246588   0.667228  0.0416518     2.31246   0.719088    3.84215
 2.76563   1.90665    3.08808   0.255756      0.449752  2.09532     0.130943
 1.00618   0.474634   1.47683   2.77223    …  3.77962   1.57195     0.531633
 1.55886   1.88166    2.17425   3.38939       3.23818   3.34328     0.053611
 2.53957   2.29859    1.00494   3.50963       2.8708    1.67609     1.33286
 1.0712    3.17366    2.09127   0.885672      0.141735  1.89784     3.19954
 3.78574   3.92256    0.275762  2.49054       0.633742  2.60237     1.33115
 ⋮                                         ⋱                        
 3.88311   3.66821    2.84257   2.4796     …  1.68763   1.83014     0.585629
 2.99367   1.44587    2.7098    1.07904       3.1267    0.00763286 

Ok, but where's the proof that this is actually distributed or multithreaded? Well, we can easily see which workers and threads it's operating on:

In [29]:
DC = rand(Blocks(4, 4), 16, 16)

# Which worker is running our tasks?
collect(map(x->Distributed.myid(), DC))

16×16 Matrix{Int64}:
 2  2  2  2  2  2  2  2  2  2  2  2  2  2  2  2
 2  2  2  2  2  2  2  2  2  2  2  2  2  2  2  2
 2  2  2  2  2  2  2  2  2  2  2  2  2  2  2  2
 2  2  2  2  2  2  2  2  2  2  2  2  2  2  2  2
 1  1  1  1  2  2  2  2  3  3  3  3  2  2  2  2
 1  1  1  1  2  2  2  2  3  3  3  3  2  2  2  2
 1  1  1  1  2  2  2  2  3  3  3  3  2  2  2  2
 1  1  1  1  2  2  2  2  3  3  3  3  2  2  2  2
 3  3  3  3  2  2  2  2  3  3  3  3  2  2  2  2
 3  3  3  3  2  2  2  2  3  3  3  3  2  2  2  2
 3  3  3  3  2  2  2  2  3  3  3  3  2  2  2  2
 3  3  3  3  2  2  2  2  3  3  3  3  2  2  2  2
 3  3  3  3  3  3  3  3  2  2  2  2  3  3  3  3
 3  3  3  3  3  3  3  3  2  2  2  2  3  3  3  3
 3  3  3  3  3  3  3  3  2  2  2  2  3  3  3  3
 3  3  3  3  3  3  3  3  2  2  2  2  3  3  3  3

In [30]:
# And which threads are running our tasks?
collect(map(x->Threads.threadid(), DC))

16×16 Matrix{Int64}:
 2  2  2  2  1  1  1  1  2  2  2  2  2  2  2  2
 2  2  2  2  1  1  1  1  2  2  2  2  2  2  2  2
 2  2  2  2  1  1  1  1  2  2  2  2  2  2  2  2
 2  2  2  2  1  1  1  1  2  2  2  2  2  2  2  2
 1  1  1  1  2  2  2  2  2  2  2  2  2  2  2  2
 1  1  1  1  2  2  2  2  2  2  2  2  2  2  2  2
 1  1  1  1  2  2  2  2  2  2  2  2  2  2  2  2
 1  1  1  1  2  2  2  2  2  2  2  2  2  2  2  2
 2  2  2  2  2  2  2  2  1  1  1  1  2  2  2  2
 2  2  2  2  2  2  2  2  1  1  1  1  2  2  2  2
 2  2  2  2  2  2  2  2  1  1  1  1  2  2  2  2
 2  2  2  2  2  2  2  2  1  1  1  1  2  2  2  2
 1  1  1  1  1  1  1  1  2  2  2  2  1  1  1  1
 1  1  1  1  1  1  1  1  2  2  2  2  1  1  1  1
 1  1  1  1  1  1  1  1  2  2  2  2  1  1  1  1
 1  1  1  1  1  1  1  1  2  2  2  2  1  1  1  1

You might be wondering, how does the `DArray` work? Is it a magical built-in? Well, it's really just a generic container around Dagger tasks, which is easy enough to see for yourself:

In [31]:
DC.chunks

4×4 Matrix{Any}:
 EagerThunk (finished)  EagerThunk (finished)  …  EagerThunk (finished)
 EagerThunk (finished)  EagerThunk (finished)     EagerThunk (finished)
 EagerThunk (finished)  EagerThunk (finished)     EagerThunk (finished)
 EagerThunk (finished)  EagerThunk (finished)     EagerThunk (finished)

Since this array is 16 x 16 total elements and partitioned into chunks of 4 x 4, we thus have 4 x 4 Dagger tasks (`EagerThunk` == "Dagger task", if you recall), one for each chunk of the partitioned array. You can check the value of any of these chunks by `fetch`ing them, as you'd expect:

In [32]:
fetch(DC.chunks[3,4])

4×4 Matrix{Float64}:
 0.483508  0.996795  0.287642  0.730436
 0.413356  0.194679  0.110177  0.783232
 0.903374  0.887638  0.871194  0.0981869
 0.218791  0.207176  0.925929  0.987586

Operations on the `DArray` simply take these tasks as input, passing them into other Dagger tasks, and putting the resulting tasks into a new `DArray` that's returned to you. All of the computations and data movement happen in Dagger in the background.

Feel free to try out other useful operations:

In [33]:
# Reshape and matrix-matrix multiply
collect(reshape(DC, 8, 32) * reshape(DC, 32, 8))

8×8 Matrix{Float64}:
 10.3146   9.92081  7.92013  8.64311  8.99091  10.0831   10.7142   9.60595
  7.21018  6.87454  6.85152  6.5248   6.79444   6.31211   8.49605  7.44065
  9.12997  8.47182  7.77737  7.81695  8.6721    8.66513   8.91281  8.96582
 10.2062   9.31685  8.36177  8.05261  7.88796   8.55122  10.0873   8.59102
  8.86075  9.3405   7.72462  7.09214  8.47612   8.25761   9.03776  8.81714
  8.54905  8.73592  7.59796  7.6096   8.54079   8.18595   8.99388  7.89931
  9.05033  9.73915  7.69769  7.56248  8.51061   8.33451   9.78208  8.20258
  8.67064  9.6854   8.43777  7.28484  8.39054   8.85947   9.19034  9.54042

In [34]:
# Sorting
DD = rand(Blocks(16), 64)
collect(sort(DD))

64-element Vector{Float64}:
 0.005851403848905257
 0.020575258567507415
 0.03927518266092589
 0.04515232012819803
 0.04992007784522756
 0.08406162129142714
 0.11963313105506135
 0.12351621047658945
 0.13064638138600704
 0.1349076097573878
 ⋮
 0.8876272269551913
 0.8962516331143147
 0.9590325303039045
 0.9591999722790696
 0.9600691571010103
 0.9700797690270988
 0.9721303718862124
 0.9831141013923244
 0.9983702881758747

## Tabular Programming in Dagger

Arrays are cool and super useful, but for certain use cases (like tabular data analysis, ETL operations, database interfacing, etc.), they just don't really cut it. In Julia, users can use DataFrames.jl to gain access to a very capable, high-performance table implementation. DataFrames is very well-tuned and even automatically multithreads many operations. If your data is small enough to fit on a single worker and in-memory, the DataFrame is a great option!

In [35]:
using DataFrames

In [36]:
df1 = DataFrame(a=rand('a':'z', 1000), b=rand(1000))
df2 = DataFrame(a=rand('a':'z', 1000), c=rand(1:10, 1000))
df3 = innerjoin(df1, df2, on=:a)

Row,a,b,c
Unnamed: 0_level_1,Char,Float64,Int64
1,l,0.443905,8
2,l,0.171912,8
3,l,0.85424,8
4,l,0.254782,8
5,l,0.063185,8
6,l,0.412543,8
7,l,0.551376,8
8,l,0.45702,8
9,l,0.880832,8
10,l,0.893324,8



However, there are three main areas where the DataFrame won't help you very much:
- Your data is too big to fit in RAM
- You have multiple servers and want to distribute your table operations across them
- DataFrames doesn't have a multithreaded implementation of the operation that's slowing you down

For these cases and more, Dagger provides the `DTable`, which, much like the `DArray`, is a larger table created by combining a set of partitioned chunks of smaller tables. More specifically, the `DTable` row-wise concatenates multiple tables of your choice (`DataFrame`s, CSV files, Arrow files, etc.) into a single larger distributed table. It provides a number of basic operations, as well as a number of more complex DataFrames-compatible operations.

Let's see what it looks like to create a `DTable` from an existing `DataFrame`:

In [37]:
# The DTable is provided by an extra Julia package
using DTables

In [38]:
dt3 = DTable(df3, 1000)

DTable with 39 partitions
Tabletype: DataFrame

It's that easy! We can immediately notice some oddities, however:
- There is an additional parameter that we needed to provide - this is the `chunksize`, which specifies how big each chunk of the table should be, in terms of row count
- There is no "preview" of the underlying table contents - since the table is asynchronously being built or generated in the background, the contents of the table may not yet be ready, so by default we don't see a preview (otherwise we might have to wait for a while)
- There is some "tabletype" thing - this is the type of the underlying table chunks, and can be all kinds of Julia table types, but is a `DataFrame` for this `DTable`

Just like a `DataFrame`, we can do the usual set of expected operations:

In [39]:
fetch(select(dt3, [:a, :c]))[1:10,:]

Row,a,c
Unnamed: 0_level_1,Char,Int64
1,l,8
2,l,8
3,l,8
4,l,8
5,l,8
6,l,8
7,l,8
8,l,8
9,l,8
10,l,8


In [40]:
first(DataFrame(fetch(reduce(+, groupby(dt3, :a); cols=[:b]))), 10)

Row,a,result_b
Unnamed: 0_level_1,Char,Float64
1,n,840.585
2,f,835.259
3,w,704.913
4,d,528.96
5,e,640.146
6,o,1081.7
7,h,782.246
8,s,725.98
9,j,951.152
10,t,543.426


In [41]:
dt4 = leftjoin(dt3, DataFrame(a=rand('a':'z', 100), d=rand(100)), on=:a)
first(fetch(dt4), 10)

Row,a,b,c,d
Unnamed: 0_level_1,Char,Float64,Int64,Float64?
1,l,0.443905,8,0.782203
2,l,0.443905,8,0.732828
3,l,0.443905,8,0.67292
4,l,0.443905,8,0.057573
5,l,0.171912,8,0.782203
6,l,0.171912,8,0.732828
7,l,0.171912,8,0.67292
8,l,0.171912,8,0.057573
9,l,0.85424,8,0.782203
10,l,0.85424,8,0.732828


You'll notice that you need to call `fetch` to get a `DataFrame` or other non-`DTable` object back - this is quite similar to the `DArray` and the same idea as Dagger tasks. Additionally, we can also look at the `DTable`'s internals to confirm that, like the `DArray`, it's basically just a container around a bunch of Dagger tasks:

In [42]:
dt4.chunks

39-element Vector{Union{Dagger.EagerThunk, Dagger.Chunk}}:
 EagerThunk (finished)
 EagerThunk (finished)
 EagerThunk (finished)
 EagerThunk (finished)
 EagerThunk (finished)
 EagerThunk (finished)
 EagerThunk (finished)
 EagerThunk (finished)
 EagerThunk (finished)
 EagerThunk (finished)
 ⋮
 EagerThunk (finished)
 EagerThunk (finished)
 EagerThunk (finished)
 EagerThunk (finished)
 EagerThunk (finished)
 EagerThunk (finished)
 EagerThunk (finished)
 EagerThunk (finished)
 EagerThunk (finished)

In [43]:
fetch(dt4.chunks[4])

Row,a,b,c,d
Unnamed: 0_level_1,Char,Float64,Int64,Float64?
1,n,0.853733,8,0.632694
2,n,0.853733,8,0.455454
3,n,0.853733,8,0.833754
4,n,0.853733,8,0.151643
5,n,0.808339,8,0.632694
6,n,0.808339,8,0.455454
7,n,0.808339,8,0.833754
8,n,0.808339,8,0.151643
9,n,0.932381,8,0.632694
10,n,0.932381,8,0.455454


## Addendum

Dagger is a powerful toolkit for parallel programming, with a variety of useful APIs for doing useful computations and analyses, and which integrate nicely with Julia's large and growing ecosystem. However, there are definitely some areas that are a work in progress that I wanted to mention before we wrap up:
- The `DArray` and `DTable` are missing some useful operators, especially linear algebra (for the `DArray`) and sorting and aggregation functions (for the `DTable`)
- Dagger typically uses more memory than hand-rolled solutions
- Dagger is missing support for in-place (imperative) operations

All of these are on the Todo list for the next few months, and should be significantly improved as more contributions to Dagger and friends are made by maintainers and contributors.