<h1><center>From Desk to Dawn!!</center></h1>

*****

# Julia concurrency

## Overview

* Coroutines:
    - Suspend and resume without intefacing with the operating system's scheduler
    - Communication and data synchronization through Channels
    - Sending and recieving on channels are blocking operations
    - Concurrency is not parallelism (Rob Pike)
    - We don't deal with if something happening in multi cores or at the CPU level
    - Similar to Goroutines
* Multi-threading
    - Base.Threads
    - Julia is not yet fully thread-safe.
* Multi-core processing
    - Harnessing the power of multiple CPUs
        - The speed of the CPUs
        - The speed of theri access to their memory
        - Julia provides a multiprocessing environment based on message passing
        - Distributed programming in Julia is built on two primitives:
            - remote references(Future, remote channels) and;
            - remote calls.


## A contrived example (Fibonacci)

```julia
const jobs = Channel{Int}(50);
const results = Channel{Int}(50);

function worker()
    for job_id in jobs
        put!(results, fib(job_id))
    end
end

function make_jobs(n)
    for i in 1:n
        put!(jobs, i)
    end
end

function fib(n::Int)
    if n<=1
        return n
    else
        return fib(n-1)+fib(n-2)
    end
end

n = 50;

@async make_jobs(n); # feed the jobs channel with "n" jobs

for i in 1:4 # start 4 tasks to process requests in parallel
    @async worker()
end

elapsed = @elapsed while n > 0 # print out results
    fib_num = take!(results)
    println(fib_num)
    global n = n - 1
end
println(elapsed)

```

## LightML Concurrency

We have some work to do:

- Take a set of locations
- update as many models as possible in a given window
- Save the models

### Scene 1: sequential

This is easy to think of sequentually. Essentiall this becomes a loop with some control logic to check if an hour has passed.

### Scene 2: concurrent

Some of the high level steps described above depend on each other. For example, you can't update the models until you have the locations. Similarly, you cannot save the models until they have been updated.

Each location is used to update the model. Once the model is updated, it can then be saved. There's a hint toward concurrency here, where the the stages are sequential, but we can parallalise the work done for each of the locations since there is no dependence between their associated stages.

In [2]:
import jupyter_manim
%env OMP_NUM_THREADS=4

env: OMP_NUM_THREADS=4


In [3]:
%%manim Fresh
from manimlib.imports import *

class Julia(VMobject):
    def __init__(self, code, *args, **kwargs):
        VMobject.__init__(self, *args, **kwargs)
        self.code = code
        self.set_level(0)
                
    def set_level(self, level):
        self.level = level
        for statement in self.code:
            if type(statement) == Julia:
                statement.set_level(level+1)

    def get_code(self):
        code = [] # List of TextMobjects that have relative positions set
        for statement in self.code:
            if type(statement) == Julia:
                o = VGroup(*statement.get_code())
            elif type(statement) == str:
                o = TextMobject(f"\\tt{{{statement}}}")
            else:
                raise Exception(f"Must be string or Julia, got {type(statement)}")
            if len(code) > 0:
                o.next_to(code[-1], DOWN)
                o.align_to(code[0], LEFT)
            if type(statement) == Julia:
                o.shift(RIGHT)
            code.append(o)
        return code
    
    def render(self):
        group = VGroup(*self.get_code())
        self.add(group)
        #self.scale(0.50)
        return self

class Channel(VMobject):
    def __init__(self, name, *args, **kwargs):
        VMobject.__init__(self, *args, **kwargs)
        self.empty = True
        self.original_fill_color = BLACK
        self.square = Square(color=self.get_color(), fill_color=self.original_fill_color)
        self.label = TextMobject(name, color=self.get_color())
        self.label.next_to(self.square, UP)
        self.add(self.square, self.label)
        self.set_opacity(1)
        
    def put(self, source):
        # If the slot is empty, then this is an error
        if self.empty == False:
            raise Exception("Put on non-empty channel not yet implemented, but will soon display an error")
        self.data = source
        self.square.set_fill(self.get_color())
        self.empty = False
        return ApplyMethod(self.data.move_to, self)
        
    def take(self, destination, side=LEFT):
        # If the slot is empty, then this is an error
        if self.empty == True:
            raise Exception("Take from empty channel not yet implemented, but will soon display an error")
        self.square.set_fill(self.original_fill_color)
        self.empty = True
        return ApplyMethod(self.data.next_to, destination, side)
    
    def is_empty(self):
        return self.empty
        
        
class Routine(VMobject):
    def __init__(self, title, program, *args, **kwargs):
        VMobject.__init__(self, *args, **kwargs)
        self.rect = RoundedRectangle(fill_color=BLACK)
        self.rect.set_opacity(1)
        self.code = program.render()
        self.code.move_to(self.rect)
        self.rect.surround(self.code, dim_to_match=1, buff=LARGE_BUFF) # Fix the height
        self.label = TextMobject(title)
        self.label.next_to(self.rect, UP)
        self.add(self.label)
        self.add(self.rect)
        self.add(self.code)
        
    def block(self):
        self.rect.set_color(RED)
        self.add(self.rect)
        #self.become(self.rect) # This is required if used in self.play(...)
        return self
    
class Fresh(Scene):
    ##########################
    ### Play counter setup ###
    ##########################
    def __init__(self, *args, **kwargs):
        self.play_count = 0
        self.counter = TexMobject(self.play_count)
        self.counter.to_edge(UP+LEFT)
        super().__init__(*args, **kwargs)
        
    def play(self, *args, **kwargs):
        self.play_count += 1
        new = TexMobject(self.play_count)
        new.to_edge(UP+LEFT)
        super().play(ReplacementTransform(self.counter, new), *args, *kwargs)
        self.counter = new
    ##########################
    ##########################
    
    def get_main_program(self):
        return Julia([
            "jobs = Channel(0)",
            "results = Channel(0)",
            "@async create\_jobs()",
            "for i in 1:4",
            Julia([
                "@async do\_work()",
            ]),
            "end",
            "while m = take!(result)",
            Julia([
                "@async save\_model(m)",
            ]),
            "end",
        ])
    
    def get_create_jobs_program(self):
        return Julia([
            "function create\_jobs()",
            Julia([
                "locs = get\_locations\_from\_PRISM()",
                "for loc in locs",
                Julia([
                    "put!(jobs, loc)",
                ]),
                "end",
            ]),
            "end",
        ])
    
    def get_do_work_program(self):
        return Julia([
            "function do\_work()",
            Julia([
                "while loc = take!(jobs)",
                Julia([
                    "result = update\_model(loc)",
                    "put!(results, result)"
                ]),
                "end",
            ]),
            "end",
        ])
    
    def get_save_model_program(self):
        return Julia([
            "function save\_model(model)",
            Julia([
                "write\_model\_to\_BSON(model)",
                "...",
                "...",
            ]),
            "end",
        ])
    
    def construct(self):
        SCALE = 0.4

        main = Routine("main", self.get_main_program())
    
        jobs_channel = Channel("jobs", color=BLUE)
        jobs_channel.to_edge(UP)
        jobs_channel.scale(SCALE)
        
        results_channel = Channel("results", color=GREEN)
        results_channel.to_edge(DOWN)
        results_channel.scale(SCALE)
        
        create_jobs = Routine("create\_jobs", self.get_create_jobs_program())
        create_jobs.scale(SCALE)
        create_jobs.to_edge(UP+RIGHT)
        
        do_work = []
        for i in range(0,4):
            text = ""
            if i == 0: text = "do\_work"
            worker = Routine(text, self.get_do_work_program())
            worker.scale(SCALE)
            worker.to_edge(RIGHT)
            #if i > 0:
            worker.shift(0.1*i*(DOWN+LEFT))
                
            do_work.append(worker)

        self.play(ShowCreation(main))
        self.play(ApplyMethod(main.scale, SCALE))
        self.play(ApplyMethod(main.to_edge, LEFT))
        
        self.play(ReplacementTransform(main.copy(), jobs_channel))
        self.play(ReplacementTransform(main.copy(), results_channel))
        
        self.play(ReplacementTransform(main.copy(), create_jobs))
        
        for i in range(0,4)[::-1]:
            self.play(ReplacementTransform(main.copy(), do_work[i]))       
        
        
        data = []
        BLOCK = RED
        UNBLOCK = WHITE
        self.play(ApplyMethod(main.set_stroke, BLOCK))

        # 0. Load in 4 jobs (one taken by each worker in sequence)
        for i in range(0,4):
            dot = Dot()
            dot.next_to(create_jobs, LEFT)
            data.append(dot)
            self.play(ShowCreation(data[i])) # create_jobs puts a job into the job channel
            self.play(jobs_channel.put(data[i]))
            self.play(ApplyMethod(create_jobs.set_stroke, BLOCK))
            self.play(jobs_channel.take(do_work[i])) # take a job to the worker.
#             TODO: send to i'th worker
            self.play(
                ApplyMethod(create_jobs.set_stroke, UNBLOCK),
                ApplyMethod(do_work[i].set_stroke, BLOCK)
            )
            
        # -- At this point, all 4 data dots are at the workers, and the jobs channel is empty
        # 1. Fill the jobs channel
        dot = Dot()
        dot.next_to(create_jobs, LEFT)
        data.append(dot)
        self.play(jobs_channel.put(data[-1]))
        self.wait(3)
        
        # 2. Welp, a worker just finished updating the model and is about to produce a result
        self.play(results_channel.put(data[0]), ApplyMethod(do_work[0].set_stroke, UNBLOCK))
        self.wait(3)
        
        # 3. The freed up worker from 2 can now take the pending job
        self.play(
            jobs_channel.take(do_work[0]), # take the 5th job to the worker.
            ApplyMethod(do_work[0].set_stroke, BLOCK),
            ApplyMethod(main.set_stroke, UNBLOCK),
            results_channel.take(main, RIGHT) #
        )
        
        self.wait(3)
        # 4. main took a value from the results channel, and can now spawn a save_model coroutine,
#         then return to blocking (awaiting the next take)
        save_model = Routine("save\_model", self.get_save_model_program())
        save_model.scale(SCALE)
        save_model.to_edge(DOWN+RIGHT)
        dot = Dot() 
        dot.next_to(create_jobs, LEFT)
        data.append(dot)
        #self.play(jobs_channel.put(data[-1]))
        self.play(
            ReplacementTransform(main.copy(), save_model),
            jobs_channel.put(data[-1]),
            ApplyMethod(create_jobs.set_stroke, BLOCK)
        )
        self.play(ApplyMethod(main.set_stroke, BLOCK))

## Takeaways

- Fish & Chips
- Burger Wisconsin 
- ... just kidding
