# INTRO

## Who am I?

<img src="./assets/my_background.png" width="800" height="400">

---
## All about Accure

<img src="./assets/USP.png" width="800" height="400">


- Accure uses Battery expertise & cloud-powered software development to make batteries safer, longer-lasting, and cheaper
- "Spin off" from the [RWTH Acchen ISEA Institute](https://www.isea.rwth-aachen.de/cms/ISEA/Forschung/Einrichtungen/~peqw/Labor-fuer-Batteriealterung/?lidx=1) in mid 2020
- Now employs nearly 60 battery experts, software engineers, sales reps, ect...
- [We're Hiring!](https://www.accure.net/careers)


<img src="./assets/method.png" width="800" height="400">

---
## Where Dask / Coiled fit in

* Dask is great (but not perfect)
* Coiled gives us Dask, so that's great
* But we don't use Coiled for all Dask use-cases
* And we don't exclusively use Dask

<img src="./assets/stack.png" width="800" height="400">

### Accure Utilities

* **Accure-IO**: Input/output to data storages
* **Accure-Log**: Cross-platform logging to ELK
* **Accure-Events**: Rapid event-driven service development
* **Accure-Distributed**: Distributed-computing framework
    * "On top" of Dask
    * Adds work avoidance
    * Adds artifact management
    * Adds failover contigencies
    * Simplifies cross-platform execution

---
# Task Generalization

In [1]:
from typing import Callable, Dict, List
import time
import pandas as pd
import numpy as np
import dask
from distributed import Client, LocalCluster
from coiled import Cluster


In [2]:
# We define an standard task framework
class ScaledTask:
    def __init__(
        self, 
        failover: Callable,
        # loader: Callable # Work Avoidance!
    ):
        self.failover = failover
    
    def __call__(self, func):
        def wrapper(*args, task_name="no_task_name", task_id='no_task_id', runtime=None, **kwargs):
            ###
            # Pre-set-up stuff in the "Pipeline Worker". For example...
            #   - Entry logging
            #   - Checking for duplicate Task-names / Task-IDs
            #   - Waiting for Dask workers, if Dasky
            #   - Checking for common inputs, scattering if needed
            #   - Setup task-runtime
            runtime = runtime or {}
            quiet = runtime.get("quiet", False)
            
            task_runtime = dict(
                task_name=task_name,
                task_id=task_id,
                run_id = runtime.get("run_id", 'no-run-id'),
                product = runtime.get("product", 'no-product'),
                allow_failure = runtime.get("allow_failure", False),
                quiet=quiet
            )
            
            if not quiet:
                print(f"{task_name}-{task_id}: Submitting")
            
            ###
            # Submit Task, and handle error
            all_task_kwargs = dict(func_args=args, func_kwargs=kwargs, runtime=task_runtime)
            
            client = runtime.get('client', None)
            if isinstance(client, Client):
                # Execute in cluster
                output = client.submit(_remote_execution, wrapped_func=func, failover=self.failover, **all_task_kwargs)
            else:
                # Execute locally
                output = _remote_execution(wrapped_func=func, failover=self.failover, **all_task_kwargs)
              
            ###
            # Do task clean up. For example...
            #   - Writing artifacts to AWS S3 / RDS
            #   - Exit logging
            if not quiet:
                print(f"{task_name}-{task_id}: Cleaning up")
            
            # All done!
            return output
        return wrapper
    
def _remote_execution(wrapped_func, failover, func_args, func_kwargs, runtime):
    # Pre-set-up stuff in the "Remote Worker". For example...
    #   - Checking for artifacts on local / S3 (work avoidance!)
    #   - Determine recomputation needs
    #   - Open a tempdir to place local side-effects
    #   - Entry logging
    #   - Checking for common inputs, scattering if needed
    quiet = runtime.get("quiet", False)
    if not quiet:
        print(f"{runtime['task_name']}-{runtime['task_id']}: Executing")
    
    ###
    # Invoke function
    try:
        output = wrapped_func(*func_args, runtime=runtime, **func_kwargs)
    except Exception as exc:
        # Do standard logging!
        if not quiet:
            print(f"{runtime['task_name']}-{runtime['task_id']}: Failure")

        if not runtime["allow_failure"]:
            raise  exc
        else:
            output = failover(*func_args, runtime=runtime, **func_kwargs)
    else:
        if not quiet:
            print(f"{runtime['task_name']}-{runtime['task_id']}: Success")
    
    ###
    # Do Remote cleanup
    #  - Delete any local side effects (tempdir)
    #  - Writing artifacts to S3
    #  - Exit logging
    if not quiet:
        print(f"{runtime['task_name']}-{runtime['task_id']}: Finished")
    
    ###
    # Done!
    return output


In [3]:
def do_a_thing_failover(arg:int, runtime:Dict)->pd.DataFrame:
    """Something which runs when the 'real' task fails, and returns some meaningful response which fits"""
    return pd.DataFrame({"a":[np.nan], "b":["so sad :("]})

@ScaledTask(failover=do_a_thing_failover)
def do_a_thing(arg:int, runtime:Dict)->pd.DataFrame:
    # pretend to work reeealy hard ;)
    time.sleep(arg%10+1)
    
    # Simulate a failure
    if (arg)%7==0:
        raise RuntimeError("Everything is terrible")
    
    # Formulate an output
    data = pd.DataFrame({"a":[arg], "b":["hi" if arg%2==0 else "boo"]})
    
    # Return like normal
    return data


In [4]:
# Simple execution
do_a_thing(5)

no_task_name-no_task_id: Submitting
no_task_name-no_task_id: Executing
no_task_name-no_task_id: Success
no_task_name-no_task_id: Finished
no_task_name-no_task_id: Cleaning up


Unnamed: 0,a,b
0,5,boo


In [5]:
# Failing execution
do_a_thing(7)

no_task_name-no_task_id: Submitting
no_task_name-no_task_id: Executing
no_task_name-no_task_id: Failure


RuntimeError: Everything is terrible

In [30]:
# Failing execution, but with failover
do_a_thing(7, runtime={"allow_failure":True})

no_task_name-no_task_id: Submitting
no_task_name-no_task_id: Executing
no_task_name-no_task_id: Failure
no_task_name-no_task_id: Finished
no_task_name-no_task_id: Cleaning up


Unnamed: 0,a,b
0,,so sad :(


In [5]:
# We define an standard pipeline framework
class ScaledPipeline:
    def __init__(self, product):
        self.product=product
    
    def __call__(self, func):
        def wrapper(*args, cluster=None, runtime:Dict=None, **kwargs):
            # Pre-set-up stuff. For example...
            #   - Setting defaults
            #   - Initializing (or conencting to) dask clusters
            #   - Adapting to run-context
            #   - Setup logging
            run_id = "random_badger_14"
            runtime = runtime or {}
            runtime.update(dict(
                client = Client(cluster) if cluster else None,
                run_id = run_id,
                product = self.product
            ))
            
            # Execute Pipeline
            output = func(*args, runtime=runtime, **kwargs)
            
            # Do pipeline clean up. For example...
            #   - Ensuring worker shutdown
            #   - Writing artifacts to AWS S3 / RDS
            print("Cleaning up...")
            
            # All done!
            return output
        return wrapper

In [8]:
def aggregate_futures(futures:List[pd.DataFrame], runtime:Dict=None, gather_chunking=50)->pd.DataFrame:
    """Aggregates many futures into one. In case there's alot, use a cascading algorithm"""
    runtime = runtime or {}
    client = runtime.get("client", None)
    if client is None:
        # No Dask, just concat like normal
        return pd.concat(futures, ignore_index=True)
        
    if len(futures) <= gather_chunking:
        # Yes Dask, but few futures. Concat in a job
        return client.submit(pd.concat, futures, ignore_index=True)
    
    # Yes Dask, and lot's of futures. Gather cascade!
    n_chunks = len(futures) // gather_chunking
    if len(futures) % gather_chunking > 0:
        n_chunks+=1
    
    grouped_future_set = [
        futures[
            i*gather_chunking: (i+1)*gather_chunking
        ] for i in range(n_chunks)
    ]

    recursive_futures = [
        client.submit(
            pd.concat, 
            grouped_futures,
            ignore_index=True
        ) for grouped_futures in grouped_future_set
    ]

    return aggregate_futures(recursive_futures, runtime)

def gather(future, runtime:Dict=None):
    """Calles client.gather, if one is available, otherwise returns `future` directly"""
    runtime = runtime or {}
    client = runtime.get("client", None)
    if client is None:
        return future
    
    return client.gather(future)
    


@ScaledPipeline(product= "nice-product")
def do_a_lot_of_things(n_ids:int=10, runtime:Dict=None):
    print("Pipeline: Beginning")
    runtime = runtime or {}    
    
    # Iterate over a list of things and store multiple "futures"
    print("Pipeline: Creating futures")
    futures = [
        do_a_thing(
            arg=i,                    
            task_name="do_a_thing",                
            task_id=i,
            runtime=runtime
        ) for i in range(n_ids)
    ]
    
    
    # Pass all of those futures into a single task, and deal with them 
    #  - We only run this task once in this pipeline, so it's okay to not specify a `task_name` (i.e. defaults to the function name) or `task_id` 
    print("Pipeline: Combining futures")
    combined_future = aggregate_futures(futures, runtime)

    # Call runtime.gather to collect the results of the calculations
    print("Pipeline: Gathering final future")
    data = gather(combined_future, runtime)
    
    # Return the overall result
    print("Pipeline: Finshed")
    return data
    

In [9]:
do_a_lot_of_things(5, runtime={"allow_failure":True} )

Pipeline: Beginning
Pipeline: Creating futures
do_a_thing-0: Submitting
do_a_thing-0: Executing
do_a_thing-0: Failure
do_a_thing-0: Finished
do_a_thing-0: Cleaning up
do_a_thing-1: Submitting
do_a_thing-1: Executing
do_a_thing-1: Success
do_a_thing-1: Finished
do_a_thing-1: Cleaning up
do_a_thing-2: Submitting
do_a_thing-2: Executing
do_a_thing-2: Success
do_a_thing-2: Finished
do_a_thing-2: Cleaning up
do_a_thing-3: Submitting
do_a_thing-3: Executing
do_a_thing-3: Success
do_a_thing-3: Finished
do_a_thing-3: Cleaning up
do_a_thing-4: Submitting
do_a_thing-4: Executing
do_a_thing-4: Success
do_a_thing-4: Finished
do_a_thing-4: Cleaning up
Pipeline: Combining futures
Pipeline: Gathering final future
Pipeline: Finshed
Cleaning up...


Unnamed: 0,a,b
0,,so sad :(
1,1.0,boo
2,2.0,hi
3,3.0,boo
4,4.0,hi


In [10]:
cluster = LocalCluster(n_workers=4, threads_per_worker=10)

In [12]:
do_a_lot_of_things(200, runtime={"allow_failure":True, "quiet":True}, cluster=cluster )

Pipeline: Beginning
Pipeline: Creating futures
Pipeline: Combining futures
Pipeline: Gathering final future
Pipeline: Finshed
Cleaning up...


Unnamed: 0,a,b
0,,so sad :(
1,1.0,boo
2,2.0,hi
3,3.0,boo
4,4.0,hi
...,...,...
195,195.0,boo
196,,so sad :(
197,197.0,boo
198,198.0,hi


In [13]:
cluster.close()

In [None]:
coiled_cluster = Cluster(
    name="sevberg-webinar",
    software="coiled/default-py38",
    account="accure",
    n_workers=5,
    worker_vm_types=["m5.xlarge"],
    worker_options={"n_workers":4, "threads_per_worker":10},
)

Output()

In [15]:
do_a_lot_of_things(200, runtime={"allow_failure":True, "quiet":True}, cluster=coiled_cluster )


+-------------+---------------+----------------+----------------+
| Package     | client        | scheduler      | workers        |
+-------------+---------------+----------------+----------------+
| dask        | 2022.6.1      | 2022.6.0       | 2022.6.0       |
| distributed | 2022.6.1      | 2022.6.0       | 2022.6.0       |
| lz4         | None          | 4.0.0          | 4.0.0          |
| msgpack     | 1.0.4         | 1.0.3          | 1.0.3          |
| numpy       | 1.22.4        | 1.21.6         | 1.21.6         |
| pandas      | 1.3.5         | 1.4.2          | 1.4.2          |
| python      | 3.8.8.final.0 | 3.8.13.final.0 | 3.8.13.final.0 |
| tornado     | 6.2           | 6.1            | 6.1            |
+-------------+---------------+----------------+----------------+
Notes: 
-  msgpack: Variation is ok, as long as everything is above 0.6


Pipeline: Beginning
Pipeline: Creating futures
Pipeline: Combining futures
Pipeline: Gathering final future


2022-08-09 13:38:08,335 - distributed.protocol.core - CRITICAL - Failed to deserialize
Traceback (most recent call last):
  File "/opt/homebrew/anaconda3/envs/development/lib/python3.8/site-packages/distributed/protocol/core.py", line 158, in loads
    return msgpack.loads(
  File "msgpack/_unpacker.pyx", line 194, in msgpack._cmsgpack.unpackb
  File "/opt/homebrew/anaconda3/envs/development/lib/python3.8/site-packages/distributed/protocol/core.py", line 138, in _decode_default
    return merge_and_deserialize(
  File "/opt/homebrew/anaconda3/envs/development/lib/python3.8/site-packages/distributed/protocol/serialize.py", line 497, in merge_and_deserialize
    return deserialize(header, merged_frames, deserializers=deserializers)
  File "/opt/homebrew/anaconda3/envs/development/lib/python3.8/site-packages/distributed/protocol/serialize.py", line 426, in deserialize
    return loads(header, frames)
  File "/opt/homebrew/anaconda3/envs/development/lib/python3.8/site-packages/distributed/

AttributeError: Can't get attribute '_unpickle_block' on <module 'pandas._libs.internals' from '/opt/homebrew/anaconda3/envs/development/lib/python3.8/site-packages/pandas/_libs/internals.cpython-38-darwin.so'>