2021-06-15

个人作业：
复现讲义“2  异步编程概要”其中编码。
作业要求：新建一个jupyter notebook，只保留第2部分，可以删除大量文字讲解，但要保留核心标题（如2.2和2.3和2.4小节的标题），重新在自己的计算机中运行这部分相关的代码，比较运行结果，最终将整体耗时整理成表格。

小组作业：
讨论并复现讲义“3.3  示例：爬虫程序”。
作业要求：类比上面的要求。
截止时间：6月16日周三晚上10点前

# 2  异步编程概要

## 2.2 异步I/O进化之路

### 2.2.1  同步阻塞方式

In [5]:
import socket

import time

def blocking_way():
    sock = socket.socket()
    # blocking
    sock.connect(('baidu.com', 80))
    request = 'GET / HTTP/1.0\r\nHost: baidu.com\r\n\r\n'
    sock.send(request.encode('ascii'))
    response = b''
    chunk = sock.recv(4096)
    while chunk:
        response += chunk
        # blocking
        chunk = sock.recv(4096)
    return response


def sync_way():
    res = []
    for i in range(10):
        res.append(blocking_way())
    return len(res)

if __name__ == "__main__": 
    start = time.time()
    print(sync_way())
    print(time.time() - start)

10
1.3751330375671387


### 2.2.2  改进方式：多进程

In [8]:
import socket
from concurrent import futures

import time

def blocking_way():
    sock = socket.socket()
    # blocking
    sock.connect(('baidu.com', 80))
    request = 'GET / HTTP/1.0\r\nHost: baidu.com\r\n\r\n'
    sock.send(request.encode('ascii'))
    response = b''
    chunk = sock.recv(4096)
    while chunk:
        response += chunk
        # blocking
        chunk = sock.recv(4096)
    return response



# def process_way():
#     workers = 10
#     with futures.ProcessPoolExecutor(workers) as executor:
#         futs = {executor.submit(blocking_way) for i in range(10)}
#     return len([fut.result() for fut in futs])


if __name__ == "__main__": 
    import time
    start = time.time()
    workers = 10
    with futures.ProcessPoolExecutor(workers) as executor:
        futs = {executor.submit(blocking_way) for i in range(10)}
        
    print(futs)
    print(time.time() - start)

{<Future at 0x285ab83ce10 state=finished raised BrokenProcessPool>, <Future at 0x285ab83c080 state=finished raised BrokenProcessPool>, <Future at 0x285ab83c898 state=finished raised BrokenProcessPool>, <Future at 0x285ab8412b0 state=finished raised BrokenProcessPool>, <Future at 0x285ab83ecc0 state=finished raised BrokenProcessPool>, <Future at 0x285ab83ccf8 state=finished raised BrokenProcessPool>, <Future at 0x285ab83cef0 state=finished raised BrokenProcessPool>, <Future at 0x285ab83c710 state=finished raised BrokenProcessPool>, <Future at 0x285ab83ef28 state=finished raised BrokenProcessPool>, <Future at 0x285ab83c5c0 state=finished raised BrokenProcessPool>}
0.40366101264953613


Exception in thread Thread-8:
Traceback (most recent call last):
  File "D:\Miniconda\envs\GPU\lib\threading.py", line 916, in _bootstrap_inner
    self.run()
  File "D:\Miniconda\envs\GPU\lib\threading.py", line 864, in run
    self._target(*self._args, **self._kwargs)
  File "D:\Miniconda\envs\GPU\lib\concurrent\futures\process.py", line 295, in _queue_management_worker
    shutdown_worker()
  File "D:\Miniconda\envs\GPU\lib\concurrent\futures\process.py", line 253, in shutdown_worker
    call_queue.put_nowait(None)
  File "D:\Miniconda\envs\GPU\lib\multiprocessing\queues.py", line 129, in put_nowait
    return self.put(obj, False)
  File "D:\Miniconda\envs\GPU\lib\multiprocessing\queues.py", line 83, in put
    raise Full
queue.Full



In [9]:
!python process_way.py

10
0.8138792514801025


### 2.2.3  继续改进：多线程

In [10]:
import socket
from concurrent import futures
import time

def blocking_way():
    sock = socket.socket()
    # blocking
    sock.connect(('baidu.com', 80))
    request = 'GET / HTTP/1.0\r\nHost: baidu.com\r\n\r\n'
    sock.send(request.encode('ascii'))
    response = b''
    chunk = sock.recv(4096)
    while chunk:
        response += chunk
        # blocking
        chunk = sock.recv(4096)
    return response


def thread_way():
    workers = 10
    with futures.ThreadPoolExecutor(workers) as executor:
        futs = {executor.submit(blocking_way) for i in range(10)}
    return len([fut.result() for fut in futs])

if __name__ == "__main__": 
    start = time.time()
    print(thread_way())
    print(time.time() - start)

10
0.24358892440795898


### 2.2.4  非阻塞方式

In [13]:
import socket
import time

def nonblocking_way():
    sock = socket.socket()
    sock.setblocking(False)
    try:
        sock.connect(('baidu.com', 80))
    except BlockingIOError:
        # 非阻塞连接过程中也会抛出异常
        pass
    request = 'GET / HTTP/1.0\r\nHost: baidu.com\r\n\r\n'
    data = request.encode('ascii')
    # 不知道socket何时就绪，所以不断尝试发送
    while True:
        try:
            sock.send(data)
            # 直到send不抛异常，则发送完成
            break
        except OSError:
            pass

    response = b''
    while True:
        try:
            chunk = sock.recv(4096)
            while chunk:
                response += chunk
                chunk = sock.recv(4096)
            break
        except OSError:
            pass

    return response


def sync_way():
    res = []
    for i in range(10):
        res.append(nonblocking_way())
    return len(res)

if __name__ == "__main__": 
    start = time.time()
    print(sync_way())
    print(time.time() - start)

10
1.1244540214538574


### 2.2.5  非阻塞改进

#### 2.2.5.2  回调(Callback)

In [14]:
import time

import socket
from selectors import DefaultSelector, EVENT_WRITE, EVENT_READ

selector = DefaultSelector()
stopped = False
urls_todo = {'/', '/1', '/2', '/3', '/4', '/5', '/6', '/7', '/8', '/9'}


class Crawler:
    def __init__(self, url):
        self.url = url
        self.sock = None
        self.response = b''

    def fetch(self):
        self.sock = socket.socket()
        self.sock.setblocking(False)
        try:
            self.sock.connect(('baidu.com', 80))
        except BlockingIOError:
            pass
        selector.register(self.sock.fileno(), EVENT_WRITE, self.connected)

    def connected(self, key, mask):
        selector.unregister(key.fd)
        get = 'GET {0} HTTP/1.0\r\nHost: baidu.com\r\n\r\n'.format(self.url)
        self.sock.send(get.encode('ascii'))
        selector.register(key.fd, EVENT_READ, self.read_response)

    def read_response(self, key, mask):
        global stopped
        # 如果响应大于4KB，下一次循环会继续读
        chunk = self.sock.recv(4096)
        if chunk:
            self.response += chunk
        else:
            selector.unregister(key.fd)
            urls_todo.remove(self.url)
            if not urls_todo:
                stopped = True

#### 2.2.5.3  事件循环（Event Loop）

In [15]:
def loop():
    while not stopped:
        # 阻塞, 直到一个事件发生
        events = selector.select()
        for event_key, event_mask in events:
            callback = event_key.data
            callback(event_key, event_mask)

In [16]:
start = time.time()
for url in urls_todo:
    crawler = Crawler(url)
    crawler.fetch()
loop()
print(time.time() - start)

0.25475072860717773


## 2.3  Python 对异步I/O的优化之路

## 2.3.2  协程

### 2.3.2.1  基于生成器的协程

In [28]:
import socket
from selectors import DefaultSelector, EVENT_WRITE, EVENT_READ

selector = DefaultSelector()
stopped = False
urls_todo = {'/', '/1', '/2', '/3', '/4', '/5', '/6', '/7', '/8', '/9'}

class Future:
    def __init__(self):
        self.result = None
        self._callbacks = []

    def add_done_callback(self, fn):
        self._callbacks.append(fn)

    def set_result(self, result):
        self.result = result
        for fn in self._callbacks:
            fn(self)


In [29]:
class Crawler:
    def __init__(self, url):
        self.url = url
        self.response = b''

    def fetch(self):
        sock = socket.socket()
        sock.setblocking(False)
        try:
            sock.connect(('baidu.com', 80))
        except BlockingIOError:
            pass
        f = Future()

        def on_connected():
            f.set_result(None)

        selector.register(sock.fileno(), EVENT_WRITE, on_connected)
        yield f
        selector.unregister(sock.fileno())
        get = 'GET {0} HTTP/1.0\r\nHost: baidu.com\r\n\r\n'.format(self.url)
        sock.send(get.encode('ascii'))

        global stopped
        while True:
            f = Future()

            def on_readable():
                f.set_result(sock.recv(4096))

            selector.register(sock.fileno(), EVENT_READ, on_readable)
            chunk = yield f
            selector.unregister(sock.fileno())
            if chunk:
                self.response += chunk
            else:
                urls_todo.remove(self.url)
                if not urls_todo:
                    stopped = True
                break

In [30]:
class Task:
    def __init__(self, coro):
        self.coro = coro
        f = Future()
        f.set_result(None)
        self.step(f)

    def step(self, future):
        try:
            # send会进入到coro执行, 即fetch, 直到下次yield
            # next_future 为yield返回的对象
            next_future = self.coro.send(future.result)
        except StopIteration:
            return
        next_future.add_done_callback(self.step)

In [31]:
def loop():
    while not stopped:
        # 阻塞, 直到一个事件发生
        events = selector.select()
        for event_key, event_mask in events:
            callback = event_key.data
            callback()


if __name__ == '__main__':
    import time
    start = time.time()
    for url in urls_todo:
        crawler = Crawler(url)
        Task(crawler.fetch())
    loop()
    print(time.time() - start)

0.18496417999267578


### 2.3.2.2  用 yield from 改进生成器协程

In [38]:
def connect(sock, address):
    f = Future()
    sock.setblocking(False)
    try:
        sock.connect(address)
    except BlockingIOError:
        pass

    def on_connected():
        f.set_result(None)

    selector.register(sock.fileno(), EVENT_WRITE, on_connected)
    yield from f
    selector.unregister(sock.fileno())

In [39]:
def read(sock):
    f = Future()

    def on_readable():
        f.set_result(sock.recv(4096))

    selector.register(sock.fileno(), EVENT_READ, on_readable)
    chunk = yield from f
    selector.unregister(sock.fileno())
    return chunk


def read_all(sock):
    response = []
    chunk = yield from read(sock)
    while chunk:
        response.append(chunk)
        chunk = yield from read(sock)
    return b''.join(response)

In [40]:
class Crawler:
    def __init__(self, url):
        self.url = url
        self.response = b''

    def fetch(self):
        global stopped
        sock = socket.socket()
        yield from connect(sock, ('baidu.com', 80))
        get = 'GET {0} HTTP/1.0\r\nHost: baidu.com\r\n\r\n'.format(self.url)
        sock.send(get.encode('ascii'))
        self.response = yield from read_all(sock)
        urls_todo.remove(self.url)
        if not urls_todo:
            stopped = True

In [41]:
import socket
from selectors import DefaultSelector, EVENT_WRITE, EVENT_READ

selector = DefaultSelector()
stopped = False
urls_todo = {'/', '/1', '/2', '/3', '/4', '/5', '/6', '/7', '/8', '/9'}

class Future:
    def __init__(self):
        self.result = None
        self._callbacks = []

    def add_done_callback(self, fn):
        self._callbacks.append(fn)

    def set_result(self, result):
        self.result = result
        for fn in self._callbacks:
            fn(self)

    def __iter__(self):
        yield self
        return self.result

In [42]:
class Task:
    def __init__(self, coro):
        self.coro = coro
        f = Future()
        f.set_result(None)
        self.step(f)

    def step(self, future):
        try:
            # send会进入到coro执行, 即fetch, 直到下次yield
            # next_future 为yield返回的对象
            next_future = self.coro.send(future.result)
        except StopIteration:
            return
        next_future.add_done_callback(self.step)


def loop():
    while not stopped:
        # 阻塞, 直到一个事件发生
        events = selector.select()
        for event_key, event_mask in events:
            callback = event_key.data
            callback()



if __name__ == '__main__':
    import time
    start = time.time()
    for url in urls_todo:
        crawler = Crawler(url)
        Task(crawler.fetch())
    loop()
    print(time.time() - start)

0.1895143985748291


## 2.3.4  asyncio和原生协程初体验

In [None]:
!python asyncio_way.py

|程序|耗时（单位：秒）|
|:---|:---|
|同步阻塞方式|1.3751330375671387|
|改进方式：多进程|0.8138792514801025|
|继续改进：多线程|0.24358892440795898|
|非阻塞方式|1.1244540214538574|
|非阻塞改进|0.25475072860717773|
|基于生成器的协程|0.18496417999267578|
|用 yield from 改进生成器协程|0.1895143985748291|
|asyncio和原生协程初体验||