# Introduction to Julia

In [9]:
Base.banner()

               [1m[32m_[0m
   [1m[34m_[0m       [0m_[0m [1m[31m_[1m[32m(_)[1m[35m_[0m     |  Documentation: https://docs.julialang.org
  [1m[34m(_)[0m     | [1m[31m(_)[0m [1m[35m(_)[0m    |
   [0m_ _   _| |_  __ _[0m   |  Type "?" for help, "]?" for Pkg help.
  [0m| | | | | | |/ _` |[0m  |
  [0m| | |_| | | | (_| |[0m  |  Version 1.9.0 (2023-05-14)
 [0m_/ |\__'_|_|_|\__'_|[0m  |  backports-release-1.9/222a9272bd (fork: 328 commits, 217 days)
[0m|__/[0m                   |



### Julia is a dynamic language...

In [9]:
function simple_sum(N, inc::T) where T
    x = zero(T) # Creates a 0 value of type T
    for i in 1:N
        x += inc
    end
    return x
end

@show simple_sum(1000, 1)::Int64;
@show simple_sum(1000, 1.0)::Float64;
@show simple_sum(1000, 1.0 + 1im)::ComplexF64;

simple_sum(1000, 1)::Int64 = 1000
simple_sum(1000, 1.0)::Float64 = 1000.0
simple_sum(1000, 1.0 + 1im)::ComplexF64 = 1000.0 + 1000.0im


### ...but it's also a compiled language

In [19]:
@code_native debuginfo=:none simple_sum(1000, 1)

	[0m.text
	[0m.file	[0m"simple_sum"
	[0m.globl	[0mjulia_simple_sum_1237           [90m# -- Begin function julia_simple_sum_1237[39m
	[0m.p2align	[33m4[39m[0m, [33m0x90[39m
	[0m.type	[0mjulia_simple_sum_1237[0m,[0m@function
[91mjulia_simple_sum_1237:[39m                  [90m# @julia_simple_sum_1237[39m
	[0m.cfi_startproc
[90m# %bb.0:                                # %top[39m
	[96m[1mpushq[22m[39m	[0m%rbp
	[0m.cfi_def_cfa_offset [33m16[39m
	[0m.cfi_offset [0m%rbp[0m, [33m-16[39m
	[96m[1mmovq[22m[39m	[0m%rsp[0m, [0m%rbp
	[0m.cfi_def_cfa_register [0m%rbp
	[96m[1mimulq[22m[39m	[0m%rdi[0m, [0m%rsi
	[96m[1mxorl[22m[39m	[0m%eax[0m, [0m%eax
	[96m[1mtestq[22m[39m	[0m%rdi[0m, [0m%rdi
	[96m[1mcmovgq[22m[39m	[0m%rsi[0m, [0m%rax
	[96m[1mpopq[22m[39m	[0m%rbp
	[0m.cfi_def_cfa [0m%rsp[0m, [33m8[39m
	[96m[1mretq[22m[39m
[91m.Lfunc_end0:[39m
	[0m.size	[0mjulia_simple_sum_1237[0m, [0m.Lfunc_end0-julia_simple_sum

### Julia has powerful abstractions for multitasking...

In [3]:
@sync for i in 1:10
    Threads.@spawn println("Hello from $i")
end
wait(Threads.@spawn println("All hello's have been said!"))

Hello from 3
Hello from 8
Hello from 9
Hello from 6
Hello from 10
Hello from 1
Hello from 7
Hello from 4
Hello from 5
Hello from 2
All hello's have been said!


### ...which automatically run across threads

In [2]:
@sync for i in 1:6
    Threads.@spawn println("Hello from thread $(Threads.threadid())")
end

Hello from thread 3
Hello from thread 5
Hello from thread 1
Hello from thread 6
Hello from thread 2
Hello from thread 4


### Julia has support for multiprocessing (distributed computing) too

In [4]:
using Distributed
ENV["JULIA_PROJECT"] = pwd() # Annoying but sometimes required
w2, w3 = addprocs(2)

2-element Vector{Int64}:
 2
 3

In [7]:
@spawnat w2 println("Hello from worker $(myid())");

      From worker 2:	Hello from worker 2


In [8]:
@spawnat w3 println("Hello from worker $(myid())");

      From worker 3:	Hello from worker 3


### But how do we tie these capabilities together?

Unfortunately, Julia's multitasking system doesn't have any built-in mechanism to span multiple servers; a task spawned on a server, stays on that server. Making multithreading and multiprocessing cooperate is quite cumbersome, and requires the implementation of all kinds of data structures, task runtimes, resource management, error reporting, etc.

Is there a solution to be had? Yes!

## Intro to Dagger

Dagger.jl is a Julia library which implements all of those annoying details, and makes it possible to use multitasking across multiple servers. At its core, Dagger uses a Directed Acyclic Graph (DAG) to model the user's computations and data dependencies, and has a set of cooperating schedulers to assign, execute, and manage tasks and their associated data.

Dagger has a number of very useful features that makes parallel programming productive. Most of these features rely on Julia's built-in features or on other Julia libraries, with Dagger acting as a wrapper around them. This might sound like unnecessary work, but it really makes Dagger into a one-stop shop for parallelism of all sorts.

For example, let's see how multithreading and multiprocessing can be used in Dagger. Here's some simple code that automatically runs tasks on any CPU thread of any server in our Julia cluster:

In [11]:
using Dagger

f = ()->println("Hello world from worker $(myid()), thread $(Threads.threadid())")

@sync for i in 1:10
    Dagger.@spawn f()
end

Hello world from worker 1, thread 2
      From worker 2:	Hello world from worker 2, thread 1
      From worker 2:	Hello world from worker 2, thread 3
      From worker 2:	Hello world from worker 2, thread 4
      From worker 2:	Hello world from worker 2, thread 2
      From worker 2:	Hello world from worker 2, thread 5
      From worker 2:	Hello world from worker 2, thread 3
      From worker 3:	Hello world from worker 3, thread 2
      From worker 3:	Hello world from worker 3, thread 3
      From worker 3:	Hello world from worker 3, thread 5


Conveniently, we didn't have to tell Dagger where to run our tasks, it just ran them on the first available processing resource that it had access to. We can keep throwing tasks onto Dagger, and it'll do its best to spread the load evenly.

Ok, so Dagger will load balance, but what happens if we need to move data between our tasks? Wouldn't load balancing cause lots of wasted data movement? Not to fear, Dagger is smart!

In [20]:
g = (i) -> begin
    println("Generating for $i on worker $(myid()), thread $(Threads.threadid())")
    return (myid(), rand(200, 200))
end
h = (i, (id, X)) -> begin
    println("Summing for $i on worker $(myid()) (was $id), thread $(Threads.threadid())")
    sum(X)
end

@sync for i in 1:10
    t = Dagger.@spawn g(i)
    Dagger.@spawn h(i, t)
end

Generating for 1 on worker 1, thread 4
Generating for 2 on worker 1, thread 6
Generating for 8 on worker 1, thread 2
      From worker 2:	Generating for 9 on worker 2, thread 6
      From worker 2:	Generating for 5 on worker 2, thread 3
      From worker 2:	Generating for 7 on worker 2, thread 4
      From worker 3:	Generating for 4 on worker 3, thread 5
      From worker 3:	Generating for 10 on worker 3, thread 6
      From worker 3:	Generating for 6 on worker 3, thread 1
      From worker 3:	Generating for 3 on worker 3, thread 4
Summing for 1 on worker 1 (was 1), thread 4
Summing for 8 on worker 1 (was 1), thread 3
Summing for 2 on worker 1 (was 1), thread 5
      From worker 2:	Summing for 9 on worker 2 (was 2), thread 2
      From worker 2:	Summing for 7 on worker 2 (was 2), thread 5
      From worker 2:	Summing for 5 on worker 2 (was 2), thread 4
      From worker 3:	Summing for 6 on worker 3 (was 3), thread 3
      From worker 3:	Summing for 4 on worker 3 (was 3), thread 2
     

Dagger models the costs of data movement when assigning tasks to compute resources, which makes data-intensive computations run efficiently while still being load balanced (and as we can see, Dagger still freely picks different threads on the same worker, because there's no data movement involved).

We can also see above how data typically moves within Dagger: data generated by one task can be trivially consumed as the argument to another task. Dagger will move the data between servers as necessary to ensure that tasks have all the data they need.

What if we have some data that we can't or don't want to move? Maybe the data has some internal pointers, or it's a mutable object that can't be serialized between processes. Easy enough, `Dagger.@mutable` to the rescue!

In [19]:
# Tell Dagger that our data should be treated as mutable data, local to this server
mydata = Dagger.@mutable (;tsk=current_task(), ref=Threads.Atomic{Int}(0))
mydata::Dagger.Chunk

j = data -> begin
    @assert myid() == 1 # We're locked to worker 1 (this server) because of `mydata`
    @assert !istaskdone(data.tsk) # Tasks can't be serialized
    Threads.atomic_add!(data.ref, 1) # Serializing this would be undefined behavior
end

@sync for i in 1:10
    Dagger.@spawn j(mydata)
end
println(fetch(mydata).ref[])

10


So we can lock data to a given server, cool enough. But that just gives us *one* piece of data; what if we want to have some data on *all* of our servers, like a cache or buffer? Not a problem; `Dagger.@shard` was made for exactly this purpose.

In [29]:
mycache = Dagger.@shard Threads.Atomic{Int}(0)

k = (cache, i) -> begin
    println("Adding $i on worker $(myid()), thread $(Threads.threadid())")
    Threads.atomic_add!(cache, i)
end

@sync for i in 1:10
    Dagger.@spawn k(mycache, i)
end

println(sum(map(fetch, map(c->c[], mycache))))
println(sum(1:10))

Adding 2 on worker 1, thread 5
Adding 1 on worker 1, thread 3
Adding 3 on worker 1, thread 6
Adding 4 on worker 1, thread 4
      From worker 2:	Adding 7 on worker 2, thread 3
      From worker 3:	Adding 6 on worker 3, thread 2
      From worker 2:	Adding 8 on worker 2, thread 4
      From worker 3:	Adding 9 on worker 3, thread 1
      From worker 3:	Adding 10 on worker 3, thread 4
      From worker 3:	Adding 5 on worker 3, thread 6
55
55


#### Future Work: Streaming

If Dagger is to become a useful component of EdgeRF, there are still features missing that we'll need to have. The first feature that we'll definitely need is streaming data support. Right now, Dagger tasks have a limited lifetime - they run just once, and then are done forever. This is a super inefficient model for processing an arbitrary or inifite amount of data coming in from sensors, the network, or storage - we'd be generating tons of little tasks that exist only to execute a little bit of code!

There's an obvious solution here: just pass a `Channel` or some other asynchronous buffer to a task and either `take!` from it or `put!` to it in a loop! And this is exactly what I think we should do, but with more explicit support from Dagger to ensure that this works seamlessly across the network and in the face of network or device failures (resets, packet loss, etc.).

#### Future Work: Decentralization

Similarly, running Dagger at the edge implies that there will need to be some amount of decentralized control. Sure, we might want a central planner to coordinate things, but to build a truly fault tolerant system, individual servers will need to be able to make some decisions for themselves. They should be able to keep running local streaming tasks, writing sensor data to disk, and communicate with any other reachable servers as necessary.

Dagger currently expects to have a single central scheduler that spawns and manages all other servers. It should be possible to relax this requirement by letting each server launch and manage its own tasks. Especially in the context of streaming tasks, it should be possible to allow Dagger to gracefully and temporarily pause only tasks which require communication across the network.