In [None]:
# 利用yield将生成器进化成协程，使用send方法通信

def simple_coroutine():
    print('-> coroutine started')
    x = yield # yield 没有右值，产出值是None，直到有send方法调用，x才会被复制
    print('-> coroutine received:', x)
    # 生成器抛出StopIteration异常

my_coro = simple_coroutine()
print(my_coro)
next(my_coro) # 先调用next函数，生成器启动，在yield语句处暂停
my_coro.send(42) # yield会得到42，赋值给x

# 协程状态可以通过inspect.getgeneratorstate()函数确定，得到
# - GEN_CREATED
# - GEN_RUNNING
# - GEN_SUSPENDED
# - GEN_CLOSE

# 只有处在GEN_SYSPEND状态的携程，才能调用send方法


In [None]:
from inspect import getgeneratorstate

def simple_coro2(a):
    print('-> started: a=', a)
    b = yield a
    print('-> received: b=', b)
    c = yield a + b
    print('-> received: c=', c)

my_coro2 = simple_coro2(14)
print(getgeneratorstate(my_coro2))
print(next(my_coro2)) # 到yield 语句暂停
print(getgeneratorstate(my_coro2))
print(my_coro2.send(28)) # yield 返回值是a，打印结果未14
try:
    print(my_coro2.send(99)) # yield返回值是a+b，打印结果是14+28=42
finally:
    print(getgeneratorstate(my_coro2))



In [None]:

def averager():
    total = 0.0
    count = 0
    average = None
    while True:  # 无限循环
        term = yield average  # yield返回平均值
        total += term
        count += 1
        average = total/count

coro_avg = averager()
next(coro_avg) # 预激携程，进入GEN_SUSPENDED状态
print(coro_avg.send(10))
print(coro_avg.send(30))
print(coro_avg.send(5))
print('----------')
# 使用预激协程的装饰器
from functools import wraps

def coroutine(func):
    """Decorator: primes `func` by advancing to first `yield`"""
    @wraps(func)
    def primer(*args,**kwargs):  # 把被装饰的生成器函数替换为primer，返回预激后的生成器
        gen = func(*args,**kwargs)  # 得到生成器对象
        next(gen)  # 预激协程
        return gen  # 返回生成器
    return primer


@coroutine  # 把装饰器应用到averager函数上
def averager():  # 与上面相同
    total = 0.0
    count = 0
    average = None
    while True:
        term = yield average
        total += term
        count += 1
        average = total/count

coro_avg = averager()
# 不用预激协程，在装饰器中完成了预激
print(getgeneratorstate(coro_avg))
print(coro_avg.send(10))
print(coro_avg.send(30))
print(coro_avg.send(5))

# yield from会自动预激协程

In [None]:
# 协程中未处理的异常会向上冒泡，传给next函数或者send方法的调用方（即触发协程的对象）
# 可以利用这个原理，停止协程。比如调用：
# coro_avg.send(None)
# coro_avg.send(Ellipsis) 甚至是
# coro_avg.send(StopIteration)

# 从Python 2.5开始，可以在生成器对象上调用两个方法(throw，close)，显式地把异常发给协程

class DemoException(Exception): # 自定义异常
    """An exception type for the demonstration."""

def demo_exc_handling():
    print('-> coroutine started')
    while True:
        try:
            x = yield
        except DemoException:  # 触发自定义异常
            print('*** DemoException handled. Continuing...')
        else:  # 否则打印收到的值
            print('-> coroutine received: {!r}'.format(x))
    raise RuntimeError('This line should never run.')  # 不会被执行

exc_coro = demo_exc_handling()
next(exc_coro)
exc_coro.send(11)
exc_coro.throw(DemoException) # 将指定异常传入协程可以处理，其他异常会导致失败
print(getgeneratorstate(exc_coro))
exc_coro.send(22)
exc_coro.close()  # 关闭协程，不会有StopIteration异常。
print(getgeneratorstate(exc_coro))
print('----------------')

def demo_finally():
    print('-> coroutine started')
    try: # 处理各种异常
        while True:
            try:
                x = yield
            except DemoException:
                print('*** DemoException handled. Continuing...')
            else:
                print('-> coroutine received: {!r}'.format(x))
    finally:
        print('-> coroutine ending') #其它异常也不处理，冒泡出去

exc_coro = demo_finally()
next(exc_coro)
exc_coro.send(11)
exc_coro.throw(DemoException) # 将指定异常传入协程可以处理
print(getgeneratorstate(exc_coro))
exc_coro.send(22)
exc_coro.throw(ZeroDivisionError) # 将其它异常传入协程
print(getgeneratorstate(exc_coro))


In [None]:
# 协程返回值
from collections import namedtuple

Result = namedtuple('Result', 'count average')

def averager():
    total = 0.0
    count = 0
    average = None
    while True:
        term = yield
        if term is None:
            break  # 为了返回值，协程必须正常终止
        total += term
        count += 1
        average = total/count
    return Result(count, average)  # 返回值

coro_avg = averager()
next(coro_avg)
coro_avg.send(10)
coro_avg.send(30)
coro_avg.send(6.5)
try:
    coro_avg.send(None)
except StopIteration as exc: # 捕获异常才能得到协程返回值
    result = exc.value
print(result)


In [None]:
# yield from 与for处理StopIteration异常的方式一样。
# 多用在嵌套的生成器。
# yield from的主要功能是打开双向通道，把最外层的调用方与最内层的子生成器连接起来，这样二者可以直接发送和产出值，还可以直接传入异常，
# 而不用在位于中间的协程中添加大量处理异常的样板代码。 用到的术语：
# - 委派生成器 包含yield from <iterable> 表达式的生成器函数
# - 子生成器 从yield from表达式中<iterable>部分获取的生成器。subgenerator
# - 调用方 调用委派生成器的客户端代码。


# the subgenerator
def averager():  # 子生成器
    total = 0.0
    count = 0
    average = None
    while True:
        term = yield  # 接收的值绑定到term上
        if term is None:  # 必须能退出，否则yield from调用这个协程的生成器会永远阻塞
            break
        total += term
        count += 1
        average = total/count
    return Result(count, average)  # 作为yield from表达式的值


# the delegating generator
def grouper(results, key):  # 委派生成器
    while True:  # 每次迭代会新建一个averager实例，每个实例都是作为协程使用的生成器对象
        results[key] = yield from averager()  # 在此处暂停，有averager处理客户端发来的值直到None，结果保存到results中


# the client code, a.k.a. the caller
def main(data):  # 客户端代码
    results = {}
    for key, values in data.items():
        group = grouper(results, key)  # 得到的生成器对象
        next(group)  # 预激group协程
        for value in values:
            group.send(value)  # 传值给grouper，由averager处理
        group.send(None)  # important! 通知average结束，返回结果给grouper

    # print(results)  # uncomment to debug
    report(results)


# output report
def report(results):
    for key, result in sorted(results.items()):
        group, unit = key.split(';')
        print('{:2} {:5} averaging {:.2f}{}'.format(
              result.count, group, result.average, unit))


data = {
    'girls;kg':
        [40.9, 38.5, 44.3, 42.2, 45.2, 41.7, 44.5, 38.0, 40.6, 44.5],
    'girls;m':
        [1.6, 1.51, 1.4, 1.3, 1.41, 1.39, 1.33, 1.46, 1.45, 1.43],
    'boys;kg':
        [39.0, 40.8, 43.2, 40.8, 43.1, 38.6, 41.4, 40.6, 36.3],
    'boys;m':
        [1.38, 1.5, 1.32, 1.25, 1.37, 1.48, 1.25, 1.49, 1.46],
}

main(data)


In [None]:
# 离散事件仿真 Discrete Event Simulation DES 把系统建模成一系列事件的仿真类型.

import sys
import random
import collections
import queue
import argparse

DEFAULT_NUMBER_OF_TAXIS = 3
DEFAULT_END_TIME = 80
SEARCH_DURATION = 4
TRIP_DURATION = 10
DEPARTURE_INTERVAL = 5

Event = collections.namedtuple('Event', 'time proc action')


def compute_delay(interval):
    """Compute action delay using exponential distribution"""
    return int(random.expovariate(1/interval)) + 1

# ident 出租车编号，trips 行程数量，start_time 离开车库的时间
def taxi_process(ident, trips, start_time=0):  # 每辆出租车调用一次函数，创建一个生成器，表示各辆出租车的运营过程。
    """Yield to simulator issuing event at each state change"""
    time = yield Event(start_time, ident, 'leave garage')  # 产出第一个Event是离开车库
    for i in range(trips):  # 每次行程都遍历这部分
        prowling_ends = time + compute_delay(SEARCH_DURATION)  # 计算空车时间
        time = yield Event(prowling_ends, ident, 'pick up passenger')  # 拉客事件
        trip_ends = time + compute_delay(TRIP_DURATION)  # 计算载客时间
        time = yield Event(trip_ends, ident, 'drop off passenger')  # 落客事件

    yield Event(time + 1, ident, 'going home')  # 指定行程完成后，回家
    # end of taxi process # 生成器对象抛出StopIteration异常

# 测试出租车协程
taxi = taxi_process(ident=13, trips=1, start_time=0)
print(next(taxi))     # 预激协程 产生第一个事件
print(taxi.send(7)) 
print(taxi.send(30))
print(taxi.send(35))
print(taxi.send(36))




In [None]:
class Simulator:

    def __init__(self, procs_map):
        self.events = queue.PriorityQueue()
        self.procs = dict(procs_map)


    def run(self, end_time):  # 运行仿真，指定结束时间
        """Schedule and display events until time is up"""
        # schedule the first event for each cab
        for _, proc in sorted(self.procs.items()):  # 按键值排列
            first_event = next(proc)  # 预激各个协程，执行到第一个yield表达式
            self.events.put(first_event)  # 将各个事件添加到PriorityQueue中

        # main loop of the simulation
        time = 0
        while time < end_time:  # 判定运行事件
            if self.events.empty():  # 判定没有未完事件则退出主循环
                print('*** end of events ***')
                break

            # get and display current event
            current_event = self.events.get()  # 获取优先队列中time属性最小的Event对象为当前事件
            print('taxi:', current_event.proc,  # 显示
                  current_event.proc * '   ', current_event)

            # schedule next action for current proc
            time = current_event.time  # 用当前事件的事件更新模拟时钟
            proc = self.procs[current_event.proc]  # 获得协程
            try:
                next_event = proc.send(time)  # 给协程发送时间
            except StopIteration:
                del self.procs[current_event.proc]  # 协程结束，删除
            else:
                self.events.put(next_event)  # 新事件入队列
        else:  # <14>
            msg = '*** end of simulation time: {} events pending ***'
            print(msg.format(self.events.qsize()))

def main(end_time=DEFAULT_END_TIME, num_taxis=DEFAULT_NUMBER_OF_TAXIS,
         seed=None):
    """Initialize random generator, build procs and run simulation"""
    if seed is not None:
        random.seed(seed)  # get reproducible results

    taxis = {i: taxi_process(i, (i+1)*2, i*DEPARTURE_INTERVAL)
             for i in range(num_taxis)}
    sim = Simulator(taxis)
    sim.run(end_time)


main(end_time=80, num_taxis=2, seed=50)

