# Twisted up in a distributed tornado - a beginners guide to async frameworks in python

Andrew Stretton

Software Engineer @ https://zegami.com


* astretton@zegami.com
* https://github.com/strets123


## Setup:

Install MongoDB and a few packages

https://www.mongodb.com/download-center#community

conda install distributed luigi

pip install aiohttp  timeout-decorator motor

Slides are here:

https://github.com/strets123/beginners-async-python



# What are we going to cover???



## Use all of your processors when running a script

We will demonstrate how to max out your processors and network by running an image processing algorithm over this dataset:

https://demo.zegami.com/ashmolean%20paintings

## Consume social media data as it is created

We will demonstrate how to read the wikipedia changes feed 

https://stream.wikimedia.org/v2/stream/recentchange

## Write to a database and move on to the next task without waiting for confirmation

We will save the events to MongoDB

## Create a super simple task scheduler without an ugly while True loop

We will create a simple clock on the terminal and perhaps go on to make it work on a web page

## Call a function and throw an exception if it takes too long

We will demonstrate how tricky this is and go through all the options

## Understand how Jupyter notebooks work

We will look at the Jupyter architecture and then how we can reproduce parts of it with websockets and Tornado

## First a little theory... definitions

### Concurrency / Parallelism

Running more than one piece of code at one time




### Asynchronous

* Used to present the impression of concurrent or parallel tasking 

* For a process that needs to do work away from the current application 

* Where we don't want to wait and block our application awaiting the response.




### Futures
 
Traditional thread and process handling requires a "join" step to bring the processes or threads back together. Arguably more elegant is to use the concept of `Futures`.

Called Promises in JavaScipt, Futures are an IOU for the result of a function.

![](http://sharedstory.org/sites/default/files/stories/user-154/story_profile_pic.jpg)


### Executors

Executors are threads or processes that exist separately to the task that is being run. 

You can think of an executor as being like a road. 

![](https://encrypted-tbn0.gstatic.com/images?q=tbn:ANd9GcSgvO4FY_kEwF6-BmNH7BoZsordtULkTa-SIkznhhnXD14uGUd8Pg)



## It can carry all kinds of vehicule (task) to where they need to go. 


## It has numbers of lanes which determines its total capacity.


## It does not itself change state or move around, it is still there when the cars have all driven through



## Two types of Executor in the standard library 

* `ThreadPoolExecutor` which manages threads and `ProcessPoolExecutor` which manages processes.



Here we use a ThreadPoolExecutor to open some threads and then we run a single function on one of those threads.

In [1]:
from concurrent.futures import ThreadPoolExecutor
from time import sleep

def return_after_5_secs(message):
    sleep(5)
    return message

pool = ThreadPoolExecutor(3)  #The ThreadPoolExecutor manages the 3 open threads

future = pool.submit(return_after_5_secs, ("hello"))
print(future.done())

False


In [3]:
print(future.done())
print(future.result())

True
hello


After waiting for a bit the task is now done. We have not had to do anything in our business logic to say that processing is complete, `future.done()` is handled for us.

### Executor.map for processing lists of data

Executor.map allows us to spread a list of data points over ProcessPool and run a function over them.

In [4]:
import concurrent.futures
import math
 
PRIMES = [
    112272535095293,
    112582705942171,
    112272535095293,
    115280095190773,
    115797848077099,
    1099726899285419,
    ]
 
def is_prime(n):
    if n % 2 == 0:
        return False
 
    sqrt_n = int(math.floor(math.sqrt(n)))
    for i in range(3, sqrt_n + 1, 2):
        if n % i == 0:
            return False
    return True
 
def main():
    with concurrent.futures.ProcessPoolExecutor() as executor:
        for number, prime in zip(PRIMES, executor.map(is_prime, PRIMES)):
            print('%d is prime: %s' % (number, prime))
main()

112272535095293 is prime: True
112582705942171 is prime: True
2897 is prime: True
112272535095293 is prime: True
115280095190773 is prime: True
115797848077099 is prime: True
1099726899285419 is prime: False


### as_completed

The `as_completed` function allows us to iterate a list of futures as they are completed.

![](https://encrypted-tbn2.gstatic.com/images?q=tbn:ANd9GcRXO5DbttPr5NFffK9KQnNzOaxCKlbz0EVd0-2qG0hV9cQafkgB)

In [5]:
from concurrent.futures import ThreadPoolExecutor, wait, as_completed
from time import sleep
from random import randint
 
def return_after_5_secs(num):
    sleep(randint(1, 5))
    return "Return of {}".format(num)
 
pool = ThreadPoolExecutor(5)
futures = []
for x in range(5):
    futures.append(pool.submit(return_after_5_secs, x))
 
for x in as_completed(futures):
    print(x.result())

Return of 0
Return of 1
Return of 4
Return of 3
Return of 2


### Excersise 1: t-SNE Analysis of an art gallery dataset

[Zegami](https://demo.zegami.com) is used to tag and analyse numerous image-centric data sets. 

In this example we use Zegami help museums to annotate and digitise paintings. One option for clusting images is the t-SNE (t-distributed stochastic neighbor embedding) algorithm as implemented in Scikit Learn. More about the algorithm here: https://www.youtube.com/watch?v=RJVL80Gg3lA. The steps involved in the data processing are:

1. Download and parse a CSV file with the image IDs
1. Download the images 
1. Resize the images and convert into linear arrays of pixels
1. Perform t-SNE on the list of arrays to give an X and Y coordinate for each image

At http://localhost:8888/edit/ashmolean.py is some single-threaded example code. In this tutorial we are going to first rewrite the code to run in a `ProcessPoolExecutor`. We will later optimise the http code using async python.

In [28]:
TTF_URL = "http://tutorial.zegami.net.s3.eu-west-2.amazonaws.com/ashmolean/data.tab"

IMAGE_URL = "http://tutorial.zegami.net.s3.eu-west-2.amazonaws.com/ashmolean/{}.jpg"

# TSNE on images
import os
import io
import requests
import time
from PIL import Image
import numpy as np
import pandas as pd
from sklearn.manifold import TSNE
from distributed.client import Client,   wait
from distributed import worker_client, LocalCluster
from concurrent.futures import ThreadPoolExecutor, as_completed
pd.set_option('display.max_columns', None)
pd.set_option('display.max_rows', None)
output_file = "zegami_tsne.tab"

# setup a standard image size; this will distort some images but will get everything into the same shape
STANDARD_SIZE = (100, 100)


def img_data_to_matrix(image_data, verbose=False):
    """
    takes a jpeg or oher image bytes object and turns it into a numpy array of RGB pixels
    """

    bio = io.BytesIO(image_data)
    bio.seek(0)
    img = Image.open(bio)
    img = img.resize(STANDARD_SIZE)
    img = list(img.getdata())
    img = np.array(img)

    return img


def flatten_image(img):
    """
    takes in an (m, n) numpy array and flattens it
    into an array of shape (1, m * n)
    """
    s = img.shape[0] * img.shape[1]
    img_wide = img.reshape(1, s)
    return img_wide[0]


def get_data():
    """
    Requests the CSV data over the web and returns it as a cleaned pandas dataframe
    """
    resp = requests.get(TTF_URL, stream=True)
    data = pd.read_csv(io.StringIO(resp.content.decode("latin-1")),
                       sep="\t",
                       header=0)
    return data.dropna(subset=["id"])


def get_pca(data):
    """
    Runs the TSNE algorithm over a set of image data
    """
    pca = TSNE()

    X = pca.fit_transform(data)
    results = pd.DataFrame({"x": X[:, 0], "y": X[:, 1]})
    return results


def image_data_from_id(image_id):
    """
    Take the image id and return the bytes content of the image via an http request
    """
    if str(image_id) == "nan":
        raise Exception("ID should not be nan")
    image_url = IMAGE_URL.format(image_id)
    image_data = requests.get(image_url, timeout=60).content
    return image_data


def process_image_bytes(image_bytes):
    """
    Reshape the image pixels into a flat list of tuples of pixel values. The list should be of equla size for all images
    """
    matrix = img_data_to_matrix(image_bytes)
    return flatten_image(matrix)





def chunks(l, n):
    """Yield successive n-sized chunks from l."""
    for i in range(0, len(l), n):
        yield l[i:i + n]


def get_tsne(df, data):
    """
    Given a flat list of tuples of pixel values write out the TSNE CSV output file
    """
    results = get_pca(data)
    df['x'] = results['x']
    df['y'] = results['y']
    print("==========================")
    # print str(df)
    print("Appended x and y coordinates and created " + output_file + ".")
    df.to_csv(output_file, sep="\t", index=False)
    print("==========================")

def run():
    """
    main script runner for the synchronous version of this ashmolean data processing script
    """
    df = get_data()[:10]
    images = df["id"]
    data = []
    labels = []
    bytes_data = [image_data_from_id(image_id) for image_id in images]
    data = [process_image_bytes(image_bytes) for image_bytes in bytes_data]

    #pca = TSNE()
    # or can do PCA
    get_tsne(df, data)

run()

Appended x and y coordinates and created zegami_tsne.tab.


### Review

* We are able to run data processing across multiple machines
* `concurrent.futures` allows us to do this with little need to change the code
* We are able to open a large number of threads in a ThreadPoolExecutor to retrieve all of the images


### What is asyncronous python???

Asynchronous python represents a change to the flow control of a program. As this can be somewhat tricky to understand let's try an analogy...

#### Frying eggs and making toast for all the family

Let's break down the tasks... 
 
1. Toast bread
1. Butter toast
1. Heat up frying pan
1. Add oil
1. Break eggs into pan
1. Wait until the right moment
1. Flip the eggs carefully for a few seconds
1. Put on more toast
1. Make sure everyone is at the table
1. Server the eggs on toast

## What are the possible ways we could do these tasks?

### 1) Procedural

Let's say my toaster is one of these:

![](https://www.picclickimg.com/00/s/MTA2NlgxNjAw/z/RGEAAOSwNSxVXQQw/$/Antique-Westinghouse-Flip-Up-Chrome-Toaster-WORKS-ORIG-CORD-_1.jpg)

And my kitchen is really badly laid out - in fact there are no sockets in my kitchen and so the toaster is in the dining room.

How am I going to get everyone's toast cooked as well as their eggs?

One way to avoid burning anything would be:

Half cook toast on each side then leave in the toaster to keep warm.

Cook eggs

Finish cooking toast

Combine and serve



### 2) Multi process

Another way of getting round these constraints is just to ask for help. In this case two of us can get everything done in a coordinated way but it relys on two people being happy to cook the breakfast.





### 3) Multi threaded

In this case I have my toaster next to my frying pan and I can keep my hands and eyes on both at once. In this way I get everything cooked and I can slow down or speed up the eggs to make sure the eggs and the toast are all just right. I use my multiple arms to acheive this. My brain and eyes switch between the two tasks to keep an eye on them.




### 4) Asynchrony

In this case I decide to throw away my old toaster and get a new one which pops up and also detects when the bread is just right. 

[http://news.bbc.co.uk/1/hi/england/cambridgeshire/4291816.stm]

I also get myself the amazing "Heston Blumenthal BEG800SIL One Degree Precision Poacher"

[https://www.go-electrical.co.uk/sage-heston-blumenthal-beg800sil-the-one-degree-precision-poacher.html?gclid=CJqwia-nk9MCFY8Q0wodLqcL4Q]

Now I can simply set off my toast and eggs at the appropriate time and also have time to do some other things, perhaps get out my autonymous vacuum cleaner and send it to trip someone up.

## Cooking Breakfast Programatically

So what is the programatic equivalent of turning on the toaster then leaving it and waiting for the sound of it popping up?

1. Send task to be run which requires no processing on my machine
2. Get back an acknowledgement and wait for task completion (we wait using a Future object)
3. Do some other stuff in the meantime
4. Hear an event and go back to the task to do whatever needs finishing off.

### As discussed above `concurrent.futures` allows me to wait on tasks 

### Asyncio and Tornado take this one step further by doing things in a single thread

## Asyncio is the async library from the python standard library 

### Example: Sleep for 8 hours in 10 seconds using coroutine decorator:

* Async programming has a different the execution model  
* I can "sleep" without the CPU waiting
* I can hand back control of the single thread while sleeping by using `yield from`

In [None]:
import asyncio
import time

t = time.time()

@asyncio.coroutine
def bar():

    yield from asyncio.sleep(8)   # Note that the sleep function myust be asyncronous"

ioloop = asyncio.get_event_loop()
tasks = [ioloop.create_task(bar()) for i in range(0,3600)]
wait_tasks = asyncio.wait(tasks)
ioloop.run_until_complete(wait_tasks)
ioloop.close()

print(time.time()-t)

### Example: Sleep for 8 hours in 10 seconds using the new `async` keyword:

In [None]:
import asyncio
import time

t = time.time()


async def  bar():

    await asyncio.sleep(8)




ioloop = asyncio.get_event_loop()
tasks = [ioloop.create_task(bar()) for i in range(0,3600)]
wait_tasks = asyncio.wait(tasks)
ioloop.run_until_complete(wait_tasks)
ioloop.close()

print(time.time()-t)

### What are the key features of these functions?

* The function we execute is an asynchronous coroutine
* It is wrapped in a "Task" which is a type of "Future" - this is a placeholder for the result of the function
* The sleep function is also an asynchronous coroutine (time.sleep() would not work here)
* The tasks are submitted to an event loop
* `yield from` or `await` are used to send back control to the event loop so it can do other things.

These feautures will crop up again when we use other asynchronous libraries

### Excersise 2: 

Implement the sleep for 8 hours function tornado and python 3


In [None]:
import time
from tornado.ioloop import IOLoop
from tornado import    gen

@gen.coroutine
def task():
    yield gen.sleep(8)


@gen.coroutine
def main():
    yield [task() for i in range(0,3600)]

if __name__ == "__main__":
    loop = IOLoop.instance()
    loop.run_sync(main)

### Excersise 3:

Do the same thing with the ThreadPoolExecutor and concurrent.futures

In [None]:
from concurrent.futures import ThreadPoolExecutor, as_completed
from time import sleep, time

def return_after_8_secs(message):
    sleep(8)
    return message

t = time()
pool = ThreadPoolExecutor(3600)  #The ThreadPoolExecutor manages the 3 open threads


futures = [pool.submit(return_after_8_secs, (i)) for i in range (0,500)]

for fut in as_completed(futures):
    print(fut.result())

print(t - time())

# Review

* In python 2 we can only break the flow control of the application by using an exception. 
* Various new language features have been added in python 3 which give us more options.
* On linux, creating threads can be faster than using an async function 
* There is not a large performance difference between waiting for many things in many threads and waiting in one thread asynchronously

# Modern CPUs are really good at context switching between threads!


## Lots more tutorials from Yeray Diaz Diaz here:

https://github.com/yeraydiazdiaz/asyncio-ftwpd

# We keep using asyncios `event_loop` and Tornado's `IOLoop`... 

What is an event loop???

"In computer science, the event loop, message dispatcher, message loop, message pump, or run loop is a programming construct that waits for and dispatches events or messages in a program."

Events were introduced in Python 3 as we have seen, Python 2 does not have anything that can break the flow of a program apart from exceptions. 

If we first consider a loop that is waiting for only one kind of event, that is simpler to conceptualise... 
 


## Reading Social Media data as it is created

From the following URL I can open a connection and then follow every change to Wikipedia.
 
https://stream.wikimedia.org/v2/stream/recentchange

We can read data synchonously from the URL like this:

In [None]:
import requests
url = 'https://stream.wikimedia.org/v2/stream/recentchange'
import json
r = requests.get(url, stream=True)
for line in r.iter_lines(decode_unicode=True):
    if line:
        if line.startswith("data:"):
            datapoint = line[5:]
            data = json.loads(datapoint)
            print(data)

### Question... 

Is this compatible with a Tornado/ Asyncio event loop? Can we use it to send events and have them processed?

### Answer:

No! The generator response.iter_lines is blocking. Therefore we need to read the http request in a nonblocking way OR use another process/thread.

### Non-blocking read with `tornado`


In [None]:
from tornado import ioloop, gen, httpclient
    
@gen.coroutine
def main():
    http_client = httpclient.AsyncHTTPClient()

    def streaming_callback(data):
        print(data)

    yield http_client.fetch('https://stream.wikimedia.org/v2/stream/recentchange', 
                            streaming_callback=streaming_callback, 
                            request_timeout=3600)


if __name__ == '__main__':
    ioloop.IOLoop.instance().run_sync(main)

OK, so that works but now we are recieving packets that may be partial data points, therefore we need to store state between packets and collect them up.

In [None]:
from tornado import ioloop, gen, httpclient
import json

class DataReciever(object):
    def __init__(self):
        self.partial_data = None

    # def data_from_line(self, line):

    def streaming_callback(self, input):
        data = input.decode("utf-8")
        lines = data.splitlines()
        for line in lines:
            if self.partial_data is not None:
                line = self.partial_data + line
                self.partial_data = None
            if len(line) > 0  and len(line) < 100 and "data:".startswith(line[:4]):
                self.partial_data = line
            if line.startswith("data:"):
                try:
                    data = json.loads(line[5:])
                    print(data)
                except (ValueError, IndexError):
                    self.partial_data = line






@gen.coroutine
def main():
    http_client = httpclient.AsyncHTTPClient()
    dr = DataReciever()
    yield http_client.fetch(
        'https://stream.wikimedia.org/v2/stream/recentchange',
        streaming_callback=dr.streaming_callback,
        request_timeout=3600
    )


if __name__ == '__main__':
    ioloop.IOLoop.instance().run_sync(main)

Great! so now we are receiving JSON data in a tornado coroutine, 

How do we save these in a database?

In [None]:
from tornado import ioloop, gen, httpclient
import json
import motor

class DataReciever(object):
    def __init__(self):
        self.partial_data = None
        self.partial_chars = None
        self.client = motor.motor_tornado.MotorClient()
        self.db = self.client['test_database']

    # def data_from_line(self, line):
    def my_callback(self, result, error):
        print('result %s' % repr(result.inserted_id))

    def streaming_callback(self, input):

        data = input.decode("utf-8")

        lines = data.splitlines()
        for line in lines:
            if self.partial_data is not None:
                line = self.partial_data + line
                self.partial_data = None
            if len(line) > 0  and len(line) < 100 and "data:".startswith(line[:4]):
                self.partial_data = line
            if line.startswith("data:"):
                try:
                    data = json.loads(line[5:])
                    self.db.test_collection.insert_one(data, callback=self.my_callback)
                except (ValueError, IndexError):
                    self.partial_data = line






@gen.coroutine
def main():
    http_client = httpclient.AsyncHTTPClient()
    dr = DataReciever()
    yield http_client.fetch(
        'https://stream.wikimedia.org/v2/stream/recentchange',
        streaming_callback=dr.streaming_callback,
        request_timeout=3600
    )


if __name__ == '__main__':
    ioloop.IOLoop.instance().run_sync(main)

This is all very well but more complex than the requests example.
           
There are also two callbacks! We were supposed to be writing cleaner code which avoids callbacks. We are also having to handle the raw http data.

Additionally, it is harder to debug errors as they will not have a full stack trace, for example:

In [None]:
#tornado example with an error

from tornado import ioloop, gen, httpclient
import json
import motor

class DataReciever(object):
    def __init__(self):
        self.partial_data = None
        self.client = motor.motor_tornado.MotorClient()
        self.db = self.client['test_database']

    # def data_from_line(self, line):
    def my_callback(self, result, error):
        print('result %s' % repr(result.inserted_id))

    def streaming_callback(self, input):
        data = input.decode("ascii")
        lines = data.splitlines()
        for line in lines:
            if self.partial_data is not None:
                line = self.partial_data + line
                self.partial_data = None
            if len(line) > 0  and len(line) < 100 and "data:".startswith(line[:4]):
                self.partial_data = line
            if line.startswith("data:"):
                try:
                    data = json.loads(line[5:])
                    self.db.test_collection.insert_one(data, callback=self.my_callback)
                except (ValueError, IndexError):
                    self.partial_data = line


@gen.coroutine
def main():
    http_client = httpclient.AsyncHTTPClient()
    dr = DataReciever()
    yield http_client.fetch(
        'https://stream.wikimedia.org/v2/stream/recentchange',
        streaming_callback=dr.streaming_callback,
        request_timeout=3600
    )


if __name__ == '__main__':
    ioloop.IOLoop.instance().run_sync(main)

### Enter aiohttp...

Asyncio has an http library, we need to install it with pip and the docs are not as good as Tornado but we can make use of some interesting new language features..

In [None]:
import asyncio
import aiohttp
ioloop = asyncio.get_event_loop()
async def getdata():
     async with aiohttp.ClientSession() as session:
         async with session.get('https://stream.wikimedia.org/v2/stream/recentchange') as resp:
             async for line in resp.content:
                 print(line)
ioloop.run_until_complete(getdata())

### Excersise 4: Use [the tornado asyncio bridge]( http://www.tornadoweb.org/en/stable/asyncio.html#tornado.platform.asyncio.AsyncIOMainLoop) to write the data to MongoDB using motor.


### Async HTTP review

We have learned that:

* Asyncio and Tornado work well for external HTTP requests and other things where an event is waited for
* A wrapper Task class is used to wrap the result of a function in a future
* Where blocking code needs to be run, a ProcessPoolExecutor can be used



### Excersise 5: Take this terminal clock example and reimplement it using asyncio or tornado to provide the timer

http://www.tornadoweb.org/en/stable/ioloop.html#tornado.ioloop.PeriodicCallback


In [None]:
import time

def overprint(stringdata):
    print(stringdata , end="\r")


def get_time():
    return time.strftime("%d-%m-%Y %H:%M:%S")


while True:
    start = time.time()
    overprint(get_time())
    sleep_for = 1 - (start - time.time())
    time.sleep(sleep_for)

### Running a task and timing out if it takes too long

This seemingly simple question is in fact pretty difficult in python. HTTP libraries like aiohttp, and tornado implement timeout functions, but to actually timeout a piece of blocking code there are further difficulties...

### With ProcessPoolExecutor and asyncio

The first port of call is to combine the `ProcessPoolExecutor` and asyncio event loop then cancel the future. This fails to work correctly as the future cancels and the task is left running.

In [None]:
import asyncio
import time
from concurrent.futures import ProcessPoolExecutor

def block_then_print():
    time.sleep(10)
    print("done")

@asyncio.coroutine
def run_executor_in_loop(loop, ex):

    yield from loop.run_in_executor(ex, block_then_print)  # This can be interrupted.

ex = ProcessPoolExecutor(2)
loop = asyncio.get_event_loop()
loop.run_until_complete(asyncio.wait_for(coro(run_executor_in_loop, ex), 1))

### Timeout tasks with signals (Unix)


Using raw processes it is possible to timeout a task by by using signals as demonstrated by this decorator:


In [None]:
import time
import timeout_decorator

@timeout_decorator.timeout(5)
def mytest():
    print ("Start")
    for i in range(1,10):
        time.sleep(1)
        print ("%d seconds have passed" % i)

if __name__ == '__main__':
    mytest()

### Timeout tasks with `subprocess.call`

Equally we can also add a timeout when we shell out to a subprocess using `subprocess.run` new in python 3.5.


In [None]:
from subprocess import STDOUT, check_output, TimeoutExpired
try:
    output = check_output(["ping", "google.com"], stderr=STDOUT, timeout=2)
except TimeoutExpired as e:
    print(e.output.decode("utf-8"))

# Review

This would work fine for small scripts but use of signals, shelling out and raw processes are not reccomended in larger projects due to:

* Platform differences
* Possibility of zombie processes
* Lack of control over how many processes are open


So how should we proceed???

A few of the python task fromeworks suggest that they support task cancellation but upon closer inspection `Celery` and `Dask/Distributed` only support cancellation before execution starts not during execution.

Often people may want to check if the task needs cancelling during execution, dask/distributed will check between subtasks but a timeout means killing the task dead!

## Luigi to the rescue!

Luigi also uses some tornado and event loops for communication but also helps keep track of subprocesses within workers. This gives all the flexibility of the subprocess example above along with the stability of a proper data processing framework.

    import luigi
    from time import sleep
    class MyTask(luigi.Task):
        worker_timeout = 5
        def run(self):
            sleep(10)
            return luigi.LocalTarget("result", "done")  
            
The task can be run like this:

    PTHYONPATH=$(pwd) luigi --module luigi_timeout MyTask --local-scheduler --workers=


## Back to our image processing task

We now seem to have the building blocks that we need to request our image data asynchonously and process it synchronously:

1. Get the CSV file as a dataframe
1. Start a tornado event loop
1. Send HTTP requests to get the image data using tornado
1. As the futures are completed, submit the data to a ProcessPoolExecutor

BUT... we don't really want tornado to be driving the whole app, quite fiddly to do, perhaps there is a framwork that gives us access to the ioloop from a separate thread...

# Learnings so far

### Good things:

* All of our attempts at running the image processing algorithm were faster than doing it synchronously
* The simple combination of `ThreadPoolExecutor` for HTTP requests and `ProcessPoolExecutor` for data processing worked fine
* Aiohttp or Requests can both be used succesfully for streaming endpoints
* Asyncio and Tornado provide easy ways to create timers in python
* Tornado can clearly be helpful for inter-process communication in projects like Luigi



### Less good things

* ProcessPoolExecutor cannot scale beyond a single machine
    * We need a way to use Tornado to communicate between processes or computers
* The as_completed function blocks when we run it
    * We want to be able to just declare how the functions should be called and then go for a cup of weak herbal tea while it runs on the cluster
* Running everything from a central IOLoop can be fiddly...

### Dask/Distributed a framework built on `Tornado` and `ThreadPoolExecutor`

* Works similarly to the `ThreadPoolExecutor` 
* It is used to access either a set of local subprocesses or a cluster of worker machines
* Access to a running IOLoop

https://github.com/dask/distributed



### "Run later" features of Dask/Distributed

* Futures can be used as arguments to `distributed.Client().submit`
* `worker_client` can be used to process data remotely on the worker
* Data structures like [Dask.delayed, array, bag and DataFrame](http://dask.pydata.org/en/latest/index.html) help build up computations
* [Queues] can be passed to `distributed.Client().map`

## How can we make this run on a cluster?

Dask/Distributed allows us to run a workflow like this across multiple machines. The steps involved are:

1. Start the Dask/Distributed cluster running locally
1. Get the CSV file as a dataframe
1. Send the image ids in batches to each of the distributed workers
1. Use the IOLoop instance on the worker_client of distributed to pull down the images
1. As the futures are completed process the images
1. Return a list of pixels and run the TSNE plot

see http://localhost:8888/edit/distrunner.py
and http://localhost:8888/edit/distrunner_tornado.py

## How Jupyter notebooks work

Jupyter notebooks have a similar executor to the dask/distributed cluster. The flow of events looks something like this:
    
1. User gets HTML page
1. Websockets opened for communication with browser
1. When code is added and run a message is sent by websockets
1. Tornado handles this message and passes the Executor over the 0mq protocol
1. When the task is done a message is sent back to tornado and then in tunr back to the browser



![](http://jupyter.readthedocs.io/en/latest/_images/notebook_components.png)

### Why Tornado for systems like this?

The single threaded nature of Tornado makes it ideal for keeping multiple browser tabs up to date - it is impossible to append to a stream with multiple threads - we must use a single threaded server like Tornado or Node.js.


## A Web-based vidiprinter



See tornado_vidiprinter.py

from tornado import gen, ioloop, web

    @web.stream_request_body
    class Handler(web.RequestHandler):
        @gen.coroutine
        def get(self):
            self.write('start<br/>')
            yield self.flush()
            yield gen.sleep(5)
            self.write('finish\n')
            yield self.flush()


    application = web.Application([(r'/', Handler)])
    application.listen(8889)
    ioloop.IOLoop.current().start()

We can start a streaming response, the real power though comes from using websockets with Tornado.

### Excersise 5 Websockets task

Use websockets to create an ascii art clock in the browser or a live visualisation of the countries of the most recent wikipedia contributors

If doing the visualisation, you can use the answer to Excersise 

The repo https://github.com/albertobeta/UberSimpleWebsockets is a really great starting point.

# Summary

In this tutorial we have learned:

* How to use `ProcessPoolExecutor` and `ThreadPoolExcecutor` to run tasks which we can watch the status of with `Futures`
* How to make HTTP requests asynchronously
* How to follow a streaming HTTP request asynchronously and save it to a database
* How to timeout the running of a functiuon
* How to combine asynchronous execution of HTTP requests with synchronous concurrent data processing and to distribute that across multiple machines.

## Thanks

* Zegami (Talk to me if you wnat to use it, work for us etc!!!)
* Matthew Wrocklin - developer of dask/distributed
* Everyone for listening


## Andrew Stretton

* astretton@zegami.com
* https://github.com/strets123

