In [None]:
from dask.distributed import Client, Scheduler, Worker

In [None]:
# Close the previous scheduler and all associated workers if present
try:
    client.shutdown()
    client.close()
except:
    print('Could not shut down old client, was there any to begin with?')

# Start a client in local cluster mode and expose the underlying scheduler
client = Client()
scheduler = client.cluster.scheduler
print('Dashboard available at', client.dashboard_link)

In [None]:
import logging

no_value = "--no-value-sentinel--"
logger = logging.getLogger(__name__)

# Create an accelerated worker class based on the original worker class
class AcceleratedWorker(Worker):
    pass
    # def add_task(
    #     self,
    #     key,
    #     function=None,
    #     args=None,
    #     kwargs=None,
    #     task=no_value,
    #     who_has=None,
    #     nbytes=None,
    #     priority=None,
    #     duration=None,
    #     resource_restrictions=None,
    #     actor=False,
    #     **kwargs2,
    # ):
    #     logger.info('TTTTESTSETSTESTSTETS')
    #     super(AcceleratedWorker, self).add_task(
    #         key,
    #         function,
    #         args,
    #         kwargs,
    #         task,
    #         who_has,
    #         nbytes,
    #         priority,
    #         duration,
    #         resource_restrictions,
    #         actor,
    #         **kwargs2,
    #     )


In [None]:
def install_signal_handlers(loop=None, cleanup=None):
    """
    Install global signal handlers to halt the Tornado IOLoop in case of
    a SIGINT or SIGTERM.  *cleanup* is an optional callback called,
    before the loop stops, with a single signal number argument.
    """
    import signal

    loop = loop or IOLoop.current()

    old_handlers = {}

    def handle_signal(sig, frame):
        async def cleanup_and_stop():
            try:
                if cleanup is not None:
                    await cleanup(sig)
            finally:
                loop.stop()

        loop.add_callback_from_signal(cleanup_and_stop)
        # Restore old signal handler to allow for a quicker exit
        # if the user sends the signal again.
        signal.signal(sig, old_handlers[sig])

    for sig in [signal.SIGINT, signal.SIGTERM]:
        old_handlers[sig] = signal.signal(sig, handle_signal)

In [None]:
# Start a new worker based on the AcceleratedWorker class
# This worker automatically connects to the scheduler and gets added to the worker pool
# accelerated_worker = await Worker(
#     scheduler.address,
#     validate=True,
#     nthreads=1,
#     memory_limit=4e9,
#     dashboard=True,
#     name='accelerated'
# )

from tornado.ioloop import IOLoop
import asyncio
import sys
import signal

kwargs = {
    'preload': (),
    'memory_limit': '0',
    'preload_argv': (),
    'interface': None,
    'protocol': None,
    'reconnect': True,
    'local_directory': None,
    'death_timeout': None,
    'lifetime': None,
    'lifetime_stagger': '0 seconds',
    'lifetime_restart': False
}

loop = IOLoop.current()

accelerated_worker = await Worker(
    scheduler.address,
    scheduler_file=None,
    nthreads=1,
    loop=loop,
    resources=None,
    security={},
    contact_address=None,
    host=None,
    port=None,
    dashboard=True,
    dashboard_address=':0',
    name='accelerated',
    **kwargs
)

nannies = [accelerated_worker]

nanny = True

async def close_all():
    # Unregister all workers from scheduler
    if nanny:
        await asyncio.gather(*[n.close(timeout=2) for n in nannies])

signal_fired = False

def on_signal(signum):
    nonlocal signal_fired
    signal_fired = True
    if signum != signal.SIGINT:
        logger.info("Exiting on signal %d", signum)
    return asyncio.ensure_future(close_all())

async def run():
    await asyncio.gather(*nannies)
    await asyncio.gather(*[n.finished() for n in nannies])

install_signal_handlers(loop, cleanup=on_signal)

try:
    loop.run_sync(run)
except TimeoutError:
    # We already log the exception in nanny / worker. Don't do it again.
    if not signal_fired:
        logger.info("Timed out starting worker")
    sys.exit(1)
except KeyboardInterrupt:
    pass
finally:
    logger.info("End worker")

In [None]:
workers = scheduler.workers
for worker in workers:
    print(workers[worker].nanny)

In [None]:
# Remove the non-accelerated workers
workers = scheduler.workers
for worker in workers:
    if worker != accelerated_worker.address:
        await scheduler.remove_worker(address=worker)

In [None]:
# Define a simple function and
# submit a future on the client

# Increment integer values by 1
def inc(x):
    return x + 1

# Y holds the future to the result
y = client.submit(inc, 1211)

In [None]:
# Print the result of the future when it is ready
y.result()