### FUTURES 

A future, or promise, is something that represents a pending opearion and returns straight away. One can then query their state of completion, or register callbacks to be called on successful completion or error.

Examples Adapted from Fluent Python.

In [None]:
import time, uuid, functools
def get_thing_maker(secs, item):
    time.sleep(secs)
    return str(uuid.uuid4())+str(item)
get_thing = functools.partial(get_thing_maker, 1)
def get_many(lot):
    counter=0
    for t in lot:
        thing = get_thing(t)
        counter += 1
    return counter
def serial_main(it):
    t0 = time.time()
    count = get_many(it)
    elapsed = time.time() - t0
    msg = '\n{} things got in {:.2f}s' 
    print(msg.format(count, elapsed))

#### Serial sleeping

In [None]:
serial_main(range(20))

#### concurrent sleeping using threads

In [None]:
from concurrent import futures
def get_many_threaded1(it):
    workers = 10
    with futures.ThreadPoolExecutor(max_workers=workers) as executor:
        res = executor.map(get_thing, it)
    return len(list(res))
def threaded_main1(it):
    t0 = time.time()
    count = get_many_threaded1(it)
    elapsed = time.time() - t0
    msg = '\n{} things got in {:.2f}s' 
    print(msg.format(count, elapsed))

In [None]:
threaded_main1(range(20))

One might think that the concurrent IO (or sleeping) case is limited by the GIL, but in both cases, the GIL is yielded. Thus there is no waiting around.

The GIL is harmless if code is being run in the context of python library IO or code running in properly coded C extensions like numpy.  The time.sleep() function also releases the GIL. Python threads are totally usable in I/O-bound applications.

### Threads

threads vs processes

On linux

- processes created by fork()
- have a primary thread
- thread is the unit of execution
- process is a container, can have more threads
- can be scheduled across different cores/cpus

```c
int pid;
int status = 0;
/* fork returns pid of child to parent and 0 to child*/
if (pid = fork()) {
    /* parent code */
    pid = wait(&status);
    /*wait returns child pid and status*/
} else {
    /* child  code*/
    exit(status);
} 
```

- threads in a process share same address space (share it entirely)
- thread abstraction decouples resource allocation from control
- defines a single sequential execution stream with PC, stack, register values
- process handles: address space, global variables, open files, child processes, pending alarms, signals and signal handlers, accounting info
- thread handles program counter, registers, stack, and state
- user vs kernel threads

In [None]:
def fib(n):
    return fib(n - 1) + fib(n - 2) if n > 1 else n

In [None]:
from threading import Thread
from time import sleep
from time import time


def sleepy(): #like io
    i=0
    while i < 10:
        print("{} -- {} Sleepy!".format(i, int(time())), flush=True)
        sleep(3)
        i += 1


def cpuy():
    for i in range(35):
        val = fib(i)
        print("fib({}) is {}".format(i, val))

def cpuy2():
    for i in range(35):
        val = fib(i)
        print("cpuy2 fib({}) is {}".format(i, val))
        
def main():
    # Second thread will print the hello message. Starting as a daemon means
    # the thread will not prevent the process from exiting.
    start = time()
    cpuy()
    cpuy2()
    print("serial elapsed:", time() - start)
    start=time()
    #t = Thread(target=sleepy)
    #t.start()
    t2 = Thread(target=cpuy2)
    t2.start()
    # Main thread will read and process input
    cpuy()
    print("thread elapsed:", time() - start)
if __name__ == '__main__':
    main()

### Processes with concurrent futures.

CPU based processing wont release the gil, and is thus best done in a separate process. For illustration, we show what this looks like.

In [None]:
import time
def get_many_process(it, workers=None):
    if workers:
        with futures.ProcessPoolExecutor(max_workers=workers) as executor:
            res = executor.map(get_thing, it)
    else:
        with futures.ProcessPoolExecutor() as executor:
            res = executor.map(get_thing, it)
    return len(list(res))

def process_main(it, workers=None):
    t0 = time.time()
    count = get_many_process(it, workers)
    elapsed = time.time() - t0
    msg = '\n{} things got in {:.2f}s' 
    print(msg.format(count, elapsed))

In [None]:
process_main(range(20))

In [None]:
process_main(range(20), workers=10)

In [None]:

print(__name__)

In [None]:
import multiprocessing
start = time.time()
p=multiprocessing.Process(target=cpuy2)
p.start()
cpuy()
p.join()
print("mp elapsed:", time.time() - start)

In [None]:
input('>')

### sockets

- distinction between "client socket" and "server socket"
- default `socket.socket(family=AF_INET, type=SOCK_STREAM, proto=0, fileno=None)`
- server socket sits and creates client sockets
- non-blocking sockets and the `select` system call

Read: https://docs.python.org/3.5/howto/sockets.html

### Writing a web page fetcher

We'll eventually use the asyncio module to play with web page fetching and crawling, but lets build up to that by writing a simple fetcher. We'll start with blocking, then move to non-blocking, and finally to co-routines, and even more finally to `yield from` based co-routines.

Adapted from http://aosabook.org/en/500L/a-web-crawler-with-asyncio-coroutines.html

#### Blocking fetch

In [None]:
import socket
def fetch(host, url):
    sock = socket.socket()
    sock.connect((host, 80))
    request = 'GET {} HTTP/1.0\r\nHost: {}\r\n\r\n'.format(url, host)
    print(request)
    sock.send(request.encode('ascii'))
    response = b''
    chunk = sock.recv(4096)
    while chunk:
        #print(chunk)
        response += chunk
        chunk = sock.recv(4096)
    return response

In [None]:
from IPython.display import HTML, IFrame
HTML(str(fetch("www.example.com","/")))
#bs4.BeautifulSoup(str(fetch("www.example.com","/")))

#### Basic non-blocking

In [None]:
host="www.example.com"
url="/"
request = 'GET {} HTTP/1.0\r\nHost: {}\r\n\r\n'.format(url, host)
encoded = request.encode('ascii')
sock = socket.socket()
sock.setblocking(False)
try:
    sock.connect(('xkcd.com', 80))
except BlockingIOError:
    pass
while True:
    try:
        sock.send(encoded)
        break  # Done.
    except OSError as e:
        pass

print('sent')

This has only been implemented partially. Notice how the `sock.send` spins in a loop.

This eats cycles. the solution is to use select/kqueue/epoll from a small number of connections to a large number of them. The basic idea behind `select` is to wait for an event to occur on a small set of non-blocking sokets.

We'll use python's `DefaultSelector`, an addition from python 3.4 that automatically chooses the "best" select like implementation on your system.


In [None]:
from selectors import DefaultSelector, EVENT_WRITE
from time import time
selector = DefaultSelector()
host="www.example.com"
sock = socket.socket()
sock.setblocking(False)
try:
    sock.connect((host, 80))
except BlockingIOError:
    pass

def connected():
    selector.unregister(sock.fileno())
    print('connected!', flush=True)

selector.register(sock.fileno(), EVENT_WRITE, connected)

`connected` is the **callback** run when the connection happens.

In [None]:

def loop():
    start = time()
    while True:
        if time() - start > 10:
            break
        events = selector.select()
        for event_key, event_mask in events:
            callback = event_key.data
            callback()

Such a loop is called an "event loop". An async frameworkhas two parts: (a) such an event loop and (b) non-blocking sockets. It all runs on one thread. This is a system, it should be obvious for I/O bound problems.

What have we demonstrated already? We showed how to begin an operation and execute a callback when the operation is ready. An async framework builds on the two features we have shown—non-blocking sockets and the event loop—to run concurrent operations on a single thread.

Guido:
>We have achieved "concurrency" here, but not what is traditionally called "parallelism". What asynchronous I/O is right for, is applications with many slow or sleepy connections with infrequent events.

In [None]:
loop() #loop will destruct after 10 secs

#### async with response reading

## Lab

Implement a URL fetcher using Beautiful Soup in the callback version. We will implement a similar one using coroutines on wednesday. 

The implimentation will extend the read_response method by parsing for URL's using `bs4` . Start by creating globals:
```
urls_todo = set(['/'])
seen_urls = set(['/'])
```

then:

```
links = self.parse_links()#write this
```
(using self.response)

Then use the set `difference` method  to add new links to `urls_todo` and recursively set up a `Fetcher` instance.

Now update the `seen_urls` and `urls_todo` thus:
```
seen_urls.update(links)
urls_todo.remove(self.url)
if not urls_todo:
    stopped = True
```

In [1]:
from selectors import DefaultSelector, EVENT_READ, EVENT_WRITE
from bs4 import BeautifulSoup
import socket
from urllib.parse import urlparse
from bs4 import BeautifulSoup
from urllib.parse import urlparse
import urllib
import socket
urls_todo = set() 
seen_urls = set()

selector = DefaultSelector()
stopped = False

In [79]:
from selectors import DefaultSelector, EVENT_READ, EVENT_WRITE

selector = DefaultSelector()
class Fetcher:
    def __init__(self, host, url, level=0):
        self.response = b''  # Empty array of bytes.
        self.host = host
        self.url = url
        self.sock = None
        
    # Method on Fetcher class.
    def fetch(self):
        self.sock = socket.socket()
        self.sock.setblocking(False)
        try:
            self.sock.connect((self.host, 80))
        except BlockingIOError:
            pass

        # Register next callback.
        selector.register(self.sock.fileno(),
                          EVENT_WRITE,
                          self.connected)

    def connected(self, key, mask):
        print('connected!', flush=True)
        selector.unregister(key.fd)
        request = 'GET {} HTTP/1.0\r\nHost: {}\r\n\r\n'.format(self.url, self.host)
        self.sock.send(request.encode('ascii'))

        # Register the next callback.
        selector.register(key.fd,
                          EVENT_READ,
                          self.read_response)
        
    def read_response(self, key, mask):
        global stopped
        
        chunk = self.sock.recv(128)  # USUALLY 4k chunk size, here small
        if chunk:
            print("read chunk", flush=True)
            self.response += chunk
        else:
            print("all read", flush=True)
            selector.unregister(key.fd)  # Done reading.
            stopped=True
            
stopped = False

def loop():
    while not stopped:
        events = selector.select()
        for event_key, event_mask in events:
            callback = event_key.data
            callback(event_key, event_mask)
        #do fibonacci

In [80]:
fetcher = Fetcher('xkcd.com', '/353/',1)
fetcher.fetch()
urls_todo.add((fetcher.host,fetcher.url))
loop()

connected!
read chunk
read chunk
read chunk
read chunk
read chunk
read chunk
read chunk
read chunk
read chunk
read chunk
read chunk
read chunk
read chunk
read chunk
read chunk
read chunk
read chunk
read chunk
read chunk
read chunk
read chunk
read chunk
read chunk
read chunk
read chunk
read chunk
read chunk
read chunk
read chunk
read chunk
read chunk
read chunk
read chunk
read chunk
read chunk
read chunk
read chunk
read chunk
read chunk
read chunk
read chunk
read chunk
read chunk
read chunk
read chunk
read chunk
read chunk
read chunk
read chunk
read chunk
read chunk
read chunk
read chunk
read chunk
read chunk
read chunk
read chunk
read chunk
read chunk
read chunk
read chunk
read chunk
read chunk
read chunk
all read


You can see how the control-flow is chained together by having the connected callback do the resposing. Beyond a 2-3 ladder, this gets confusing and onerous (see some node.js code). As compared to a blocking program, where the continuation of the program is stored and adressed via the instruction pointer in a sequential fashiom, here the cintinuation is stored by registering the callbacks.'

Since the current frame is popped out of the stack, exceptions have a hard time figuring the origin This is called stack-ripping.

So, even apart from the long debate about the relative efficiencies of multithreading and async, there is this other debate regarding which is more error-prone: threads are susceptible to data races if you make a mistake synchronizing them, but callbacks are stubborn to debug due to stack ripping. And within a bit, we get callback soup.

https://thesynchronousblog.wordpress.com/tag/stack-ripping/

Threads seem to offer a more natural way of programming as the programmer with all state in thread’s single stack.


So why not use them. As we said last time: synchronization and overhead. 

But we can do better with Coroutines!

Guido:
>We entice you with a promise. It is possible to write asynchronous code that combines the efficiency of callbacks with the classic good looks of multithreaded programming. This combination is achieved with a pattern called "coroutines". Using Python 3.4's standard asyncio library, and a package called "aiohttp", fetching a URL in a coroutine is very direct7:

    @asyncio.coroutine
    def fetch(self, url):
        response = yield from self.session.get(url)
        body = yield from response.read()
        
In 3.5 its even more clear:

async def fetch(self, url):
        response = await self.session.get(url)
        body = await response.read()

### Back to the Future with co-routines

In [None]:
from selectors import DefaultSelector, EVENT_READ, EVENT_WRITE
import socket
selector = DefaultSelector()

The future, as you might expect is something with callbacks...

In [None]:
class MyFuture:
    def __init__(self):
        self.result = None
        self._callbacks = []

    def add_done_callback(self, fn):
        self._callbacks.append(fn)

    def set_result(self, result):
        self.result = result
        for fn in self._callbacks:
            fn(self)

We need a "main" to yield to.

In [None]:
class Fetcher:
    
    def __init__(self, url, host):
        self.url = url
        self.host = host
        self.response = b''  # Empty array of bytes.

        
    def fetch(self):
        global stopped
        sock = socket.socket()

        sock.setblocking(False)
        try:
            sock.connect((self.host, 80))
        except BlockingIOError:
            pass

        f = MyFuture()

        #resolves the future by setting a result on it
        def on_connected():
            print('on connected cb ran', flush=True)
            f.set_result(None)
        
        
        
        selector.register(sock.fileno(),
                          EVENT_WRITE,
                          on_connected)
        print("about to yield connection future", flush=True)
        yield f#this makes it look like fetch has returned the "future"
        #bit we have not lost the state (or have to have carried it in obj)
        #a send in will continue us here
        print('we were connected! now back in gen', flush=True)
        selector.unregister(sock.fileno())
        request = 'GET {} HTTP/1.0\r\nHost: {}\r\n\r\n'.format(self.url, self.host)
        sock.send(request.encode('ascii'))
        while True:
            print("in loop")
            #now create a new future for the data-recieving call
            f = MyFuture()
            def on_response():
                chunky = sock.recv(4096)  # 4k chunk size.
                f.set_result(chunky)
            selector.register(sock.fileno(),
                              EVENT_READ,
                              on_response)
            #now to restart the gen, we will from the main
            #throw the data right back in
            print('=========')
            chunk = yield f
            
            selector.unregister(sock.fileno())
            if chunk:
                print("len(chunk)",len(chunk))
                self.response += chunk
            else:
                print("all read")
                stopped= True
                break


        
    

In [None]:
#But when the future resolves, what resumes the generator? We need a coroutine driver. Let us call it "task":
#(this is our main)
class Task:
    def __init__(self, coro):
        self.coro = coro
        f = MyFuture()
        print(">>sending none to initial future",f)
        f.set_result(None)
        print("...stepping")
        self.step(f)
        print(">>>after priming")

    def step(self, future):
        try:
            print("sending", type(future.result))
            next_future = self.coro.send(future.result)
            print('got next future', next_future)

        except StopIteration:
            print("si")
            return None
        next_future.add_done_callback(self.step)

In [None]:
stopped=False
def loop():
    while not stopped:
        events = selector.select()
        print('got events\n')
        for event_key, event_mask in events:
            print('calling back\n')
            callback = event_key.data
            print('callback name\n',callback.__name__)
            callback()

In [None]:
fetcher = Fetcher('/353/', 'xkcd.com')
Task(fetcher.fetch())
stopped=False
loop()

#### Refactoring using generators

In [None]:
#But when the future resolves, what resumes the generator? We need a coroutine driver. Let us call it "task":
#(this is our main)
class Task:
    def __init__(self, coro):
        self.coro = coro
        f = MyFuture()
        print(">>sending none to initial future",f)
        f.set_result(None)
        print("...stepping")
        self.step(f)
        print(">>>after priming")

    def step(self, future):
        try:
            print("sending", type(future.result))
            next_future = self.coro.send(future.result)
            print('got next future', next_future)

        except StopIteration:
            print("si")
            return None
        next_future.add_done_callback(self.step)

In [None]:
def read(sock):
    f = MyFuture()

    def on_readable():
        f.set_result(sock.recv(4096))

    selector.register(sock.fileno(), EVENT_READ, on_readable)
    chunk = yield f  # Read one chunk.
    selector.unregister(sock.fileno())
    return chunk

In [None]:
def read_all(sock):
    global stopped
    response = []
    # Read whole response.
    chunk = yield from read(sock)
    while chunk:
        response.append(chunk)
        chunk = yield from read(sock)
    stopped=True
    return b''.join(response)

>If you squint and make the yield from statements disappear it looks like  conventional functions doing blocking I/O. But in fact, read and read_all are coroutines. Yielding from read pauses read_all until the I/O completes. While read_all is paused, asyncio's event loop does other work and awaits other I/O events; read_all is resumed with the result of read on the next loop tick once its event is ready.

In [None]:
class Fetcher:
    
    def __init__(self, url, host):
        self.url = url
        self.host = host
        self.response = b''  # Empty array of bytes.

        
    def fetch(self):
        global stopped
        sock = socket.socket()

        sock.setblocking(False)
        try:
            sock.connect((host, 80))
        except BlockingIOError:
            pass

        f = MyFuture()

        def on_connected():
            print('on connected cb ran')
            f.set_result(None)
        
        
        
        selector.register(sock.fileno(),
                          EVENT_WRITE,
                          on_connected)
        print("about to yield connection future")
        yield f
        print('connected!')
        selector.unregister(sock.fileno())
        request = 'GET {} HTTP/1.0\r\nHost: xkcd.com\r\n\r\n'.format(self.url)
        sock.send(request.encode('ascii'))
        yield from read_all(sock)

In [None]:
fetcher = Fetcher('/353/', 'xkcd.com')
Task(fetcher.fetch())
stopped = False
loop()

![](http://aosabook.org/en/500L/crawler-images/yield-from.png)

There is one yield left amongst the yield froms. For consistency, this can be fixed...it also lets us change implementations under the hood..

In [None]:
def read(sock):
    f = MyFuture()

    def on_readable():
        f.set_result(sock.recv(4096))

    selector.register(sock.fileno(), EVENT_READ, on_readable)
    chunk = yield from f  # Read one chunk.
    selector.unregister(sock.fileno())
    return chunk

In [None]:
class MyFuture:
    def __init__(self):
        self.result = None
        self._callbacks = []

    def add_done_callback(self, fn):
        self._callbacks.append(fn)

    def set_result(self, result):
        self.result = result
        print("cblist", self._callbacks)
        for fn in self._callbacks:
            fn(self)
            
    def __iter__(self):
        yield self
        return self.result

In [None]:
class Fetcher:
    
    def __init__(self, url, host):
        self.url = url
        self.host = host
        self.response = b''  # Empty array of bytes.

        
    def fetch(self):
        global stopped
        sock = socket.socket()

        sock.setblocking(False)
        try:
            sock.connect((self.host, 80))
        except BlockingIOError:
            pass

        f = MyFuture()

        def on_connected():
            print('on connected cb ran')
            f.set_result(None)
        
        
        
        selector.register(sock.fileno(),
                          EVENT_WRITE,
                          on_connected)
        print("about to yield connection future")
        yield from f
        print('connected!')
        selector.unregister(sock.fileno())
        request = 'GET {} HTTP/1.0\r\nHost: xkcd.com\r\n\r\n'.format(self.url)
        sock.send(request.encode('ascii'))
        yield from read_all(sock)

In [None]:
fetcher = Fetcher('/353/','xkcd.com')
Task(fetcher.fetch())
stopped = False
loop()

In [None]:
import bs4

In [81]:
from selectors import DefaultSelector, EVENT_READ, EVENT_WRITE
from bs4 import BeautifulSoup
import socket
from urllib.parse import urlparse
import urllib
urls_todo = set() 
seen_urls = set()

selector = DefaultSelector()
stopped = False


class Fetcher:
    def __init__(self, host, url, level=0):
        self.response = b''  # Empty array of bytes.
        self.host = host
        self.url = url
        self.sock = None
        self.level = level
    # Method on Fetcher class.
    def fetch(self):
        self.sock = socket.socket()
        self.sock.setblocking(False)
        try:
            #print(self.host)
            self.sock.connect((self.host, 80))
        except BlockingIOError:
            #print(self.host,'========')
            pass

        # Register next callback.
        selector.register(self.sock.fileno(),
                          EVENT_WRITE,
                          self.connected)
    def parse_links(self):
        links = set()
        s = BeautifulSoup(self.response)
        
        for link in s.find_all('a',href = True):
            #print('found some url',link['href'],'\n')
            #print(link)
            url = urllib.parse.urlparse(link['href'])
            #print(url)
            #print(url)
            #rint(self.host)
            if (url.netloc)=='':
                #print(url.netloc)
                #self.host = url.netloc
                if (url.path) == '':
                    links.add((self.host,"/"))
                else:
                    links.add((self.host,url.path))
            elif (url.netloc)!='':
                if (url.path) == '':
                    links.add((url.netloc,"/"))
                else:
                    links.add((url.netloc, url.path))
                
        return set(links)
    

    def connected(self, key, mask):
        #print('connected!', flush=True)
        selector.unregister(key.fd)
        request = 'GET {} HTTP/1.0\r\nHost: {}\r\n\r\n'.format(self.url, self.host)
        self.sock.send(request.encode('ascii'))

        # Register the next callback.
        selector.register(key.fd,
                          EVENT_READ,
                          self.read_response)

    def read_response(self, key, mask):
        global stopped
        
        chunk = self.sock.recv(4096)  # USUALLY 4k chunk size, here small
        if chunk:
            print("read chunk", flush=True)
            self.response += chunk
        else:
            selector.unregister(key.fd)
            if self.level == 1:
            #print("all read", flush=True)
                #selector.unregister(key.fd)  # Done reading.
                links = self.parse_links() # return a set of links: (host,url) tuples
                links = links.difference(seen_urls) 
                #print(links)
                seen_urls.update(links) # // update the global links
                #print(seen_urls)
                #print(seen_urls)
                #selector.unregister(key.fd)  # Done reading.
                #stopped=True
                for h in links:
                    #print(h[0],h[1])
                    #print(len(h),h)
                    fetcher = Fetcher(h[0],h[1])
                    #print(h[0])
                    fetcher.fetch()
                    urls_todo.add((h[0],h[1]))
                #print(self.host,self.url)
            urls_todo.remove((self.host,self.url))
            #print(urls_todo)
            if not urls_todo:
                stopped = True
                print('done')
            #print(len(urls_todo))       


        
        
#stopped = False

def loop():
    while not stopped:
        events = selector.select()
        for event_key, event_mask in events:
            callback = event_key.data
            callback(event_key, event_mask)
        #do fibonacci

In [83]:
a = [1,2,3]
b = a[-1]
del a[-1]
b

3

In [82]:
fetcher = Fetcher('xkcd.com', '/353/',1)
fetcher.fetch()
urls_todo.add((fetcher.host,fetcher.url))
loop()

read chunk
read chunk
read chunk
read chunk
read chunk
read chunk
read chunk
read chunk
read chunk
read chunk
read chunk
read chunk
read chunk
read chunk
read chunk
read chunk
read chunk
read chunk
read chunk
read chunk
read chunk
read chunk
read chunk
read chunk
read chunk
read chunk
read chunk
read chunk
read chunk
read chunk
read chunk
read chunk
read chunk
read chunk
read chunk
read chunk
read chunk
read chunk
read chunk
read chunk
read chunk
read chunk
read chunk
read chunk
read chunk
read chunk
read chunk
read chunk
read chunk
read chunk
read chunk
read chunk
read chunk
read chunk
read chunk
read chunk
read chunk
read chunk
read chunk
read chunk
read chunk
read chunk
read chunk
read chunk
read chunk
read chunk
read chunk
read chunk
read chunk
read chunk
read chunk
read chunk
read chunk
read chunk
read chunk
read chunk
read chunk
read chunk
read chunk
read chunk
read chunk
read chunk
read chunk
read chunk
read chunk
read chunk
read chunk
read chunk
read chunk
read chunk
read chunk



 BeautifulSoup([your markup])

to this:

 BeautifulSoup([your markup], "lxml")

  markup_type=markup_type))
