##### map
`map(self, fn, *iterables, **kwargs)`

`map`方法的实例我们上面已经实现过，值得注意的是，返回的results列表是有序的，顺序和`*iterables`迭代器的顺序一致。
这里我们使用`with`操作符，使得当任务执行完成之后，自动执行`shutdown`函数，而无需编写相关释放代码。

In [2]:
def gcd(*pair):
    a, b = pair
    if max(a,b)%min(a,b) == 0:
        return min(a,b)
    div, mod = divmod(a,b)
    return gcd(b, mod)
gcd(*[2,6])

2

In [None]:
import time
from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor, Executor
start = time.time()
with ProcessPoolExecutor(max_workers=2) as pool:
    results = list(pool.map(gcd, numbers))
print 'results: %s' % results
end = time.time()

##### submit

`submit(self, fn, *args, **kwargs)`
`submit`方法用于提交一个可并行的方法，`submit`方法同时返回一个`future`实例。
`future`对象标识这个线程/进程异步进行，并在未来的某个时间执行完成。`future`实例表示线程/进程状态的回调。

In [None]:
start = time.time()
futures = list()
with ProcessPoolExecutor(max_workers=2) as pool:
    for pair in numbers:
        future = pool.submit(gcd, pair)
        futures.append(future)
print('results: %s' % [future.result() for future in futures])
end = time.time()

##### future
submit函数返回future对象，future提供了跟踪任务执行状态的方法。比如判断任务是否执行中future.running()，判断任务是否执行完成future.done()等等。

as_completed方法传入futures迭代器和timeout两个参数

默认timeout=None，阻塞等待任务执行完成，并返回执行完成的future对象迭代器，迭代器是通过yield实现的。 

timeout>0，等待timeout时间，如果timeout时间到仍有任务未能完成，不再执行并抛出异常TimeoutError

In [None]:
start = time.time()
with ProcessPoolExecutor(max_workers=2) as pool:
    futures = [ pool.submit(gcd, pair) for pair in numbers]
    for future in futures:
        print '执行中:%s, 已完成:%s' % (future.running(), future.done())
    print '#### 分界线 ####'
    for future in as_completed(futures, timeout=2):
        print '执行中:%s, 已完成:%s' % (future.running(), future.done())
end = time.time()

##### wait
wait方法接会返回一个tuple(元组)，tuple中包含两个set(集合)，一个是completed(已完成的)另外一个是uncompleted(未完成的)。

使用wait方法的一个优势就是获得更大的自由度，它接收三个参数FIRST_COMPLETED, FIRST_EXCEPTION和ALL_COMPLETE，默认设置为ALL_COMPLETED。

In [None]:
from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor, 
Executor, as_completed, wait, ALL_COMPLETED, FIRST_COMPLETED, FIRST_EXCEPTION

start = time.time()
with ProcessPoolExecutor(max_workers=2) as pool:
    futures = [ pool.submit(gcd, pair) for pair in numbers]
    for future in futures:
        print '执行中:%s, 已完成:%s' % (future.running(), future.done())
    print '#### 分界线 ####'
    done, unfinished = wait(futures, timeout=2, return_when=ALL_COMPLETED)
    for d in done:
        print '执行中:%s, 已完成:%s' % (d.running(), d.done())
        print d.result()
end = time.time()
'''如果我们将配置改为FIRST_COMPLETED，wait会等待直到第一个任务执行完成，
返回当时所有执行成功的任务。这里并没有做并发控制。
'''

In [3]:
data = {'girls;kg':[40.9, 38.5, 44.3, 42.2, 45.2, 41.7, 44.5, 
                    38.0, 40.6,44.5],
'girls;m':[1.6, 1.51, 1.4, 1.3, 1.41, 1.39, 1.33, 1.46, 1.45, 1.43],
'boys;kg':[39.0, 40.8, 43.2, 40.8, 43.1, 38.6, 41.4, 40.6, 36.3],
'boys;m':[1.38, 1.5, 1.32, 1.25, 1.37, 1.48, 1.25, 1.49, 1.46],}

from collections import namedtuple
Result = namedtuple('Result', 'count, average')
def averager():
    total = 0.
    count = 0
    average = None
    while True:
        term = yield # main 函数中的客户代码发送的各个值绑定到这里的 term 变量上
        if term is None:
#至关重要的终止条件。如果不这么做，使用 yield from 调用这个协程的生成器会永远阻塞。
            break
        total += term
        count += 1
        average = total/ count
    # 返回的 Result 会成为 grouper 函数中 yield from 表达式的值。
    return Result(count, average)
#grouper 是委派生成器。
def grouper(results, key):
    # 这个循环每次迭代时会新建一个 averager 实例；每个实例都是作为协程使用的生成器对象。
    while True:
'''grouper 发送的每个值都会经由 yield from 处理，通过管道传给 averager 实
例。grouper 会在 yield from 表达式处暂停，等待 averager 实例处理客户端发来的
值。averager实例运行完毕后，返回的值绑定到 results[key] 上。while 循环会不断
创建 averager实例，处理更多的值。'''
        results[key] = yield from averager()
# main 函数是客户端代码。这是驱动一切的函数。
def main(data):
        results = {}
        for key, values in data.items():
'''group 是调用 grouper 函数得到的生成器对象，传给 grouper 函数的第一个参数是
results，用于收集结果；第二个参数是某个键。group 作为协程使用。'''
            group = grouper(results, key)
            next(group)
    #其中next(sfib)相当于sfib.send(None)，可以使得sfib运行至第一个yield处返回。
            for value in values:
    '''把各个value传给grouper。传入的值最终到达 averager 函数中 
    term = yield那一行；grouper 永远不知道传入的值是什么。'''
                group.send(value)
'''把 None 传入 grouper，导致当前的 averager 实例终止，也让 grouper 继续运行，
再创建一个 averager 实例，处理下一组值。'''
            group.send(None)
'''外层for循环每次迭代会新建一个grouper实例，赋值给group变量；group是委派生成器。
调用next(group)，预激委派生成器grouper，此时进入while True循环，
调用子生成器averager后， yield from表达式处暂停。内层for循环调用group.send(value)，
直接把值传给子生成器averager。同时，当前的grouper实例group在yield from表达式
处暂停。内层循环结束后，group实例依旧在yield from表达式处暂停，因此，
grouper函数定义体中为results[key]赋值的语句还没有执行。
如果外层for循环的末尾没有group.send(None)，那么averager子生成器永远不会终止，
委派生成器group永远不会再次激活，因此永远不会为results[key]赋值。
外层for循环重新迭代时会新建一个grouper实例，然后绑定到group变量上。
前一个grouper实例以及它创建的尚未终止的averager子生成器实例被垃圾回收程序回收。'''
        report(results)

def report(results):
    for key, result in sorted(results.items()):
        group, unit = key.split(';')
        print('{:2}{:5}averaging{:.2f}{}'.format(
            result.count, group, result.average, unit))

In [None]:
import queue
import random
SEARCH_DURATION = 5
TRIP_DURATION = 20

Event = namedtuple('Event', 'time, proc, action')
def taxi_process(ident, trips, start_time=0):
    time = yield Event(start_time, ident, 'leave garage')
    for i in range(trips): 
        time = yield Event(time, ident, 'pick up passenger') 
        time = yield Event(time, ident, 'drop off passenger') 
    yield Event(time, ident, 'going home')
class Simulator:
    def __init__(self, procs_map):
        self.events = queue.PriorityQueue()
        self.procs = dict(procs_map)
    def run(self, end_time): 
        """排定并显示事件，直到时间结束"""
        # 排定各辆出租车的第一个事件
        for _, proc in sorted(self.procs.items()): 
            first_event = next(proc) 
            self.events.put(first_event) 
        # 这个仿真系统的主循环
        sim_time = 0 
        while sim_time < end_time: 
            if self.events.empty(): 
                print('*** end of events ***')
                break
            current_event = self.events.get() 
            sim_time, proc_id, previous_action = current_event
            print('taxi:', proc_id, proc_id * ' ', current_event)
            active_proc = self.procs[proc_id] 
            next_time = sim_time + compute_duration(previous_action) 
            try:
                next_event = active_proc.send(next_time) 
            except StopIteration:
                del self.procs[proc_id] 
            else:
                self.events.put(next_event) 
        else: 
            msg = '*** end of simulation time: {} events pending ***'
            print(msg.format(self.events.qsize()))
            
def compute_duration(previous_action):
    """Compute action duration using exponential distribution"""
    if previous_action in ['leave garage', 'drop off passenger']:
        # new state is prowling
        interval = SEARCH_DURATION
    elif previous_action == 'pick up passenger':
        # new state is trip
        interval = TRIP_DURATION
    elif previous_action == 'going home':
        interval = 1
    else:
        raise ValueError('Unknown previous_action: %s' % previous_action)
    return int(random.expovariate(1/interval)) + 1

`@asyncio.coroutine`把一个generator标记为coroutine类型，然后，我们就把这个coroutine扔到EventLoop中执行。

`hello()`会首先打印出Hello world!，然后，yield from语法可以让我们方便地调用另一个generator。由于`asyncio.sleep()`也是一个coroutine，所以线程不会等待`asyncio.sleep()`，而是直接中断并执行下一个消息循环。当`asyncio.sleep()`返回时，线程就可以从yield from拿到返回值（此处是None），然后接着执行下一行语句。

把`asyncio.sleep(1)`看成是一个耗时1秒的IO操作，在此期间，主线程并未等待，而是去执行EventLoop中其他可以执行的coroutine了，因此可以实现并发执行。

In [None]:
import threading
import asyncio

@asyncio.coroutine
def hello():
    print('Hello world! (%s)' % threading.currentThread())
    yield from asyncio.sleep(1)
    print('Hello again! (%s)' % threading.currentThread())
'''
把@asyncio.coroutine替换为async；
把yield from替换为await
'''
async def hello():
    print("Hello world!")
    r = await asyncio.sleep(1)
    print("Hello again!")
    
loop = asyncio.get_event_loop()
tasks = [hello(), hello()]
loop.run_until_complete(asyncio.wait(tasks))
loop.close()

In [None]:
import asyncio

@asyncio.coroutine
def wget(host):
    print('wget %s...' % host)
    connect = asyncio.open_connection(host, 80)
    reader, writer = yield from connect
    header = 'GET / HTTP/1.0\r\nHost: %s\r\n\r\n' % host
    writer.write(header.encode('utf-8'))
    yield from writer.drain()
    while True:
        line = yield from reader.readline()
        if line == b'\r\n':
            break
        print('%s header > %s' % (host, line.decode('utf-8').rstrip()))
    # Ignore the body, close the socket
    writer.close()

loop = asyncio.get_event_loop()
tasks = [wget(host) for host in ['www.sina.com.cn', 'www.sohu.com', 'www.163.com']]
loop.run_until_complete(asyncio.wait(tasks))
loop.close()