In [1]:
import dask
from dask.distributed import Client
from dask_kubernetes import KubeCluster, make_pod_spec
dask.config.set({"kubernetes.scheduler-service-type": "LoadBalancer"})
worker_template = make_pod_spec(image='holdenk/dask:latest',
                         memory_limit='8G', memory_request='8G',
                         cpu_limit=1, cpu_request=1)
scheduler_template = make_pod_spec(image='holdenk/dask:latest',
                         memory_limit='4G', memory_request='4G',
                         cpu_limit=1, cpu_request=1)
cluster = KubeCluster(pod_template = worker_template, scheduler_pod_template = scheduler_template)
cluster.adapt()    # or create and destroy workers dynamically based on workload
from dask.distributed import Client
client = Client(cluster)

Creating scheduler pod on cluster. This may take some time.


2022-06-01 18:16:56,656 - distributed.deploy.adaptive - INFO - Adaptive scaling started: minimum=0 maximum=inf


In [2]:
cluster.adapt(minimum=1, maximum=10)

2022-06-01 18:22:04,327 - distributed.deploy.adaptive_core - INFO - Adaptive stop
2022-06-01 18:22:04,328 - distributed.deploy.adaptive - INFO - Adaptive scaling started: minimum=1 maximum=10
2022-06-01 18:22:04,328 - distributed.deploy.adaptive_core - INFO - Adaptive stop


<distributed.deploy.adaptive.Adaptive at 0x7effaca4faf0>

In [3]:
client.scheduler_comm.comm.handshake_info()

{'compression': 'lz4', 'python': (3, 8, 6), 'pickle-protocol': 5}

In [4]:
import dask.array as da

In [5]:
# Create a large array and calculate the mean
array = da.ones((1000, 1000, 1000))
print(array.mean().compute())  # Should print 1.0|

1.0


So now we know the cluster is doing ok :)

In [6]:
class Counter:
    """ A simple class to manage an incrementing counter """
    n = 0

    def __init__(self):
        self.n = 0

    def increment(self):
        self.n += 1
        return self.n

    def add(self, x):
        self.n += x
        return self.n
    
    def value(self):
        return self.n


future = client.submit(Counter, actor=True)  # Create a Counter on a worker
counter = future.result()     

In [7]:
counter

<Actor: Counter, key=Counter-cba9a5b0-ab0c-4f5a-be42-bf493967b24a>

In [8]:
counter.increment()

<ActorFuture>

In [9]:
counter.value().result()

1

In [10]:
import dask.bag as db
b = db.from_sequence(range(0, 10))

In [11]:
#tag::result_future_not_ser[]
def inc(x):
    import time
    time.sleep(x)
    f = counter.add(x)
    # Note: the actor (in this case `counter`) is serelizable, however the future we get back from it is not
    # this is likely because the future contains a network connection to the actor, so need to get it's
    # concrete value here. If we don't need the value you can avoid blocking and it will still execute.
    return f.result()
#end::result_future_not_ser[]

In [12]:
c = client
futures = list(map(lambda x: c.submit(inc, x), range(10)))

In [13]:
futures

[<Future: finished, type: int, key: inc-3ff4b7331b0a49792933bc22d8633ed1>,
 <Future: pending, key: inc-e125592e8f81119a8312af9e6c0be8e6>,
 <Future: pending, key: inc-1254cb1c2334d37f8c4249a2837abfb8>,
 <Future: pending, key: inc-7cbbb37e68d1b3fb8beebb315cfb47d3>,
 <Future: pending, key: inc-d9d47846dce3864f143865f9681aa9fe>,
 <Future: pending, key: inc-5b92e0da31d406193dfba147cc22321f>,
 <Future: pending, key: inc-f6780821931ffb3d0921ec037a84a5e7>,
 <Future: pending, key: inc-364cd258c135891bd9362e554423563f>,
 <Future: pending, key: inc-2fe52468e41b3fb1cde763b5be5c980f>,
 <Future: pending, key: inc-b99ba4d3aaab3ff7a055b7001d3a37d8>]

In [14]:
counter.value().result()

1

In [15]:
futures[5].result()

2022-06-01 18:22:49,341 - distributed.deploy.adaptive - INFO - Retiring workers [4, 5]


33

In [16]:
counter.value().result()

33

In [17]:
#tag::make_account[]
class BankAccount:
    """ A bank account actor (similar to counter but with + and -)"""

    # 42 is a good start
    def __init__(self, balance=42.0):
        self._balance = balance

    def deposit(self, amount):
        if amount < 0:
            raise Exception("Can not deposit negative amount")
        self._balance += amount
        return self._balance

    def withdrawl(self, amount):
        if amount > self._balance:
            raise Exception("Please deposit more money first.")
        self._balance -= amount
        return self._balance

    def balance(self):
        return self._balance


account_future = client.submit(BankAccount, actor=True)  # Create a BankAccount on a worker
account = account_future.result()
#end::make_account[]

In [18]:
#tag::use_account[]
# Non-blocking
balance_future = account.balance()
# Blocks
balance = balance_future.result()
try:
    f = account.withdrawl(100)
    f.result() # throws an exception
except Exception as e:
    print(e)
#end::use_account[]

Please deposit more money first.


In [19]:
f = account.withdrawl(1)

In [20]:
f.result()

41.0

2022-06-01 18:22:51,340 - distributed.deploy.adaptive - INFO - Retiring workers [0, 1]
2022-06-01 18:23:15,340 - distributed.deploy.adaptive - INFO - Retiring workers [2]
2022-06-01 18:23:16,341 - distributed.deploy.adaptive - INFO - Retiring workers [3]
2022-06-08 12:32:34,145 - distributed.deploy.adaptive_core - ERROR - Adaptive stopping due to error
Traceback (most recent call last):
  File "/opt/conda/lib/python3.8/site-packages/tornado/iostream.py", line 867, in _read_to_buffer
    bytes_read = self.read_from_fd(buf)
  File "/opt/conda/lib/python3.8/site-packages/tornado/iostream.py", line 1140, in read_from_fd
    return self.socket.recv_into(buf, len(buf))
ConnectionResetError: [Errno 104] Connection reset by peer

The above exception was the direct cause of the following exception:

Traceback (most recent call last):
  File "/opt/conda/lib/python3.8/site-packages/distributed/core.py", line 863, in send_recv_from_rpc
    result = await send_recv(comm=comm, op=key, **kwargs)
  Fi