# Twisted

1）延迟项 和 延迟链

延迟项：是 Twisted 写出 **异步代码** 的最重要机制。Twisted APIs 使用延迟项让我们 --> 定义事件发生时，产生动作的顺序。

## Experiment 1

In [1]:
from twisted.internet import defer
d = defer.Deferred()  # d 这个实例对象就称为: 延迟项（Deferred）。

print(type(defer.Deferred))
print(d)

d.called  # 此时还不可调用；即 d.called == False 时，d.result 会报错。

<class 'type'>
<Deferred at 0x104b73390>


False

In [2]:
d.callback(3)
d.called

True

In [3]:
d.result  # 当我们触发 d 时（即，调用 d 的 callback方法时），延迟项的 called 状态变为True，result 属性变为调用的值。

3

In [4]:
d.callback(6)  # 触发后再添加就会报错

AlreadyCalledError: 

In [None]:
d

---

# Experiment 2
延迟项的最强大之处是：当值确定时，可以在延迟链上添加新的项。

In [None]:
def foo(v):
    print("foo called")
    return v+1

d = defer.Deferred()
d.addCallback(foo)  # addCallback 添加回调函数：使用 foo()函数 作为 延迟项d 的回调。

In [None]:
d.called

In [None]:
d.callback(3)  # 3 作为 刚刚添加的 回调函数foo() 的参数，函数foo()被调用并打印出信息。返回值作为 d 的最后结果。延迟项本质代表一个值。

In [None]:
d.called

In [None]:
d.result

In [None]:
d

---


# Experiment 3


In [None]:
def status(*defers):
    """打印延迟项的状态: 延迟项的值（有触发才有值，延迟项才有 result 属性） 和 回调个数；
       
       d.callback 不同于 d.callbacks，前者触发回调，后者返回所有回调函数。
    """
    
    return [(getattr(d, 'result', "N/A"), len(d.callbacks)) for d in defers]

def b_callback(arg):
    print("b_callback called with arg =", arg)
    return b  # 返回 接下来定义的 b 延迟项

def on_done(arg):
    print("on_done called with arg =", arg)
    return arg

a = defer.Deferred()
b = defer.Deferred()
a.addCallback(b_callback).addCallback(on_done)  # 为延迟项a，添加两个回调


In [None]:
status(a, b)  # 两个延迟项都没有被触发，第一个有两个调回，第二个没有调回。

---

# Experiment 3.a


In [None]:
a.callback(3) 

In [None]:
status(a, b)  # 延迟项 a 第一个回调 b_callback() 执行后，回调函数的结果返回了 延迟项 b; 
              # 此时可以认为，延迟项 b 就接管了 延迟项 a 的第二个未被执行的 回调 on_done()

In [None]:
b.callback(4)  # 延迟项 b 的 on_done() 回调被触发

In [None]:
status(a, b)  # 为啥 b.result == None ??????? 
              # 整个过程：可以理解为 a 的 第一个回调 执行后，返回 b，由于此时的 延迟项 b 尚未触发（即，b 无 result 属性），就没有值，
              #          就无法 触发 a 的第二个回调。直到延迟项 b 被触发了，b.result 有值了，且该值作为参数传递给 a 的第二个回调。
              # b.result 被用掉之后，值变为了 None。

In [None]:
a

In [None]:
b

---


# Experiment 3.b

在设 a 为 3 之前就先触发 b。


In [None]:
a = defer.Deferred()
b = defer.Deferred()
a.addCallback(b_callback).addCallback(on_done)

b.callback(4)  # 先触发b
status(a, b)  # b 先被触发（b 没有回调，就直接返回传入的值：4）

In [None]:
a.callback(3)  # 触发 b
               # 第一个回调 b_callback() 执行，返回值为：b -> <Deferred at 0x10df72560 current result: 4>
               # 第二个回调 on_done() 执行，传入的参数为：b 延迟项（注意！！：延迟项本质代表一个值，即参数为 b.result）

In [None]:
status(a, b)  # 是否 b 延迟项作为 a 的第二个回调的参数后，b 的值就置 None ???????????????????

---

## 总结

无论触发的顺序，结果都是一样的。
两者的区别是：
   >在第一种情况中，b的值被延迟更久，因为它是后触发的。
   >
   >而在第二种情况中，先触发b，然后它的值立即被使用。

### 我的理解：
   >1）当一个延迟项 a 添加了多个回调函数，触发该延迟项时，回调是按顺序依次执行的;
   >
   >2）每个回调函数的返回值，作为下一个回调函数的参数;
   >
   >3）若上一个回调的返回值 为另外一个 延迟项 b ，那么会有2种情况：
   1. 延迟项 b 未被触发（没有值）：那么当前回调只能等到 b 被触发后才能被执行。
   2. 延迟项 b 已经被触发（有值）：b.result 作为参数，且执行当前回调后 b.result 被置为 None。
   >
   >4）任何延迟项，触发后（执行 callback() ）就不能再继续触发，不能再继续添加回调，只能等待被另外一个延迟项调用（即，可以被另外的延迟项的回调函数所引用 or 作为参数）；
   >
   >5）延迟项作为另外一个延迟项的回调参数后（延迟项被调用），该延迟项将不再拥有值（延迟项.result == None）

---


# Experiment 4

延迟链：其触发取决于它的所有延迟项是否已经都触发。


In [None]:
def on_done(arg):
    print("on_done called with arg =", arg)
    return arg

deferreds = [defer.Deferred() for i in range(5)]  # 含5个延迟项的list
join = defer.DeferredList(deferreds)  # join -> <DeferredList at 0x10ded0b00> 延迟链
join.addCallback(on_done)


# Experiment 4.a

先触发部分延迟项


In [None]:
for i in range(4):
    deferreds[i].callback(i)  # 触发前4个延迟项，这几个延迟项都没有添加回调函数，触发时就直接把触发参数作为结果。
    print(deferreds[i])

print(deferreds[4])

In [None]:
deferreds[4].callback(4)  # 触发第五个延迟项，此时 join 的值中不再包含未被触发的延迟项。
                    # 当延迟链里 没有未触发的延迟项时，延迟链的回调函数就会被执行，而第一个回调函数的回调参数就为 延迟项组成的list。
                    # 回调参数即为：join.result => [(deferred_1.caller, deferred_1.result), (...), ...]

In [None]:
join.result

---

# Experiment 4.b

一次性触发全部延迟项。

In [None]:
deferreds = [defer.Deferred() for i in range(5)]
join = defer.DeferredList(deferreds)
join.addCallback(on_done)

for i in range(5):
    deferreds[i].callback(i)  # 全部触发
    if i == 4:
        print("最后一个延迟项也已经触发，上述打印了 join 对象的回调函数的执行结果")

    print(deferreds[i])

---
# twisted 的实际应用

##  1）一个不好的办法

In [None]:
# ~*~ Twisted - A Python tale ~*~
from time import sleep
import datetime

def install_wordpress(customer):
    """为一个人安装需要 3s"""

    print("Start installation for", customer)
    sleep(3)
    print("All done for", customer)

def developer_day(customers):
    """为所有人安装"""
    
    start_time = datetime.datetime.now()
    for customer in customers:
        install_wordpress(customer)

    end_time = datetime.datetime.now()
    print("cost = " + str(end_time - start_time))

developer_day(["Bill", "Elon", "Steve", "Mark"])  # 结果是顺序执行的。4名顾客，每人3秒，总和就是12秒。

## 2）另一个不怎么友好的办法

In [None]:
# 增加工人 ~ 添加线程

import threading

def developers_day(customers):
    lock = threading.Lock()
    start_time = datetime.datetime.now()

    def dev_day(id):
        print("Goodmorning from developer", id)
        lock.acquire()

        while customers:
            customer = customers.pop(0)  # 选一个顾客
            lock.release()
            install_wordpress(customer)
            lock.acquire()

        lock.release()
        print("Bye from developer", id)

    devs = [threading.Thread(target=dev_day, args=(i,)) for i in range(5)]  # 5个工人
    [dev.start() for dev in devs]
    [dev.join() for dev in devs]
    end_time = datetime.datetime.now()
    print("cost = " + str(end_time - start_time))

developers_day(["Customer %d" % i for i in range(15)])  # 安排15个顾客


你用5名工人线程并行执行。
15名顾客，每人3秒，单人处理要45秒，但是有5名工人的话，9秒就够了。  
但是代码有些复杂。不再关注于算法和逻辑，它只考虑并发。另外，输出结果变得混乱且可读性变差。

---

## 3）把简单的多线程代码写的好看也十分困难，现在我们 Twisted怎么来做


In [None]:
import datetime
from twisted.internet import reactor
from twisted.internet import defer
from twisted.internet import task

def schedule_install(customer):
    """返回一个延迟项：task.deferLater 的妙用"""

    def schedule_install_wordpress():
        def on_done():
            print("Callback: Finished installation for", customer)

        print("Scheduling: Installation for", customer)
        return task.deferLater(reactor, 3, on_done)  # 3秒后执行 on_done() 函数。返回值是一个延迟项。

    def all_done(_):
        print("All done for", customer)

    d = schedule_install_wordpress()  # d 是一个延迟项。
    d.addCallback(all_done)  # 为延迟项 d 添加一个回调函数（该函数仅起到提示这个人的工作已经完成的作用）
    return d  # 返回一个延迟项

def twisted_developer_day(customers):
    print("Goodmorning from Twisted developer")
    
    work = [schedule_install(customer) for customer in customers]  # 为所有的顾客创建延迟项（多线程了吧）
    join = defer.DeferredList(work)  # 用这些 延迟项 组成的list，创建 延迟链。
    join.addCallback(lambda _: reactor.stop())  # 为 延迟链 添加回调函数 （匿名函数：lambda _: reactor.stop() ）  
                                                # 当 延迟链 中的所有延迟项都触发后，就会执行回调函数。
    
    print("Bye from Twisted developer!")


In [None]:
twisted_developer_day(["Customer %d" % i for i in range(15)])  # 15个顾客作为参数
start_time = datetime.datetime.now()
reactor.run()  # 监控事件并触发回调
end_time = datetime.datetime.now()
print("cost = " + str(end_time - start_time))

我们没用线程就得到了十分漂亮的结果。  
我们并行处理了15名顾客，45秒的工作在3秒内完成。（开了15个线程）  
我们的方法是让阻塞的调用进行 `sleep()`，而采用 `task.deferLater()` 和调用函数。  
在其它地方进行处理时，我们可以轻松送出应付15名顾客。

#### 笔记：
   >上述提到 “在其它地方进行处理” ，是指：计算仍在CPU中进行。  
   >与磁盘和网络操作比起来，如今的CPU运算非常快。  
   >CPU接收发送数据或存储才是最花时间的。  
   >通过使用非阻塞I/O操作，我们为CPUs节省了这个时间。  
   >与 `task.deferLater()` 相似，当数据传输完毕时，触发再进行调用。  
  
 ---


另一个重点是 `"Goodmorning from Twisted developer"` 和 `"Bye from Twisted developer!"` 消息。  
当运行代码时，它们立即就被打印出来。如果代码到达此处这么早，应用什么时候真正运行起来的呢？  

>答案是 Twisted 应用全部都是在 `reactor.run()` 中运行的。

当你调用某个方法时，你必须有每个可能要用到的延迟项（相当于前面的故事里，在CRM系统中设定步骤和过程）。  
你的 reactor.run() 监控事件并触发回调。


#### 笔记：
   >反应器（reactor）的最主要规则是：只要是非阻塞操作就可以执行。

虽然没有线程了，调回函数还是有点不好看。看下面的例子：



# 证明 deferLater 的返回值是一个延迟项

In [None]:
a = task.deferLater(reactor, 3, lambda : 66666) 
a

---

# 4）最佳实践

@defer.inlineCallbacks 装饰器，让你的代码更具可读性

In [None]:
import datetime
from twisted.internet import reactor
from twisted.internet import defer
from twisted.internet import task

@defer.inlineCallbacks
def inline_install(customer):
    """模拟为一个顾客服务3秒"""
    
    print("Scheduling: Installation for", customer)
    
    yield task.deferLater(reactor, 3, lambda: None)  # yield 一个延迟项, 其添加了一个回调函数（模拟延迟3秒的功能）
    
    print("Callback: Finished installation for", customer)
    print("All done for", customer)


def twisted_developer_day(customers):

    print("Goodmorning from Twisted developer")
    
    work = [inline_install(customer) for customer in customers]  # 延迟项 list
    join = defer.DeferredList(work)  # 创建延迟链
    join.addCallback(lambda _: reactor.stop())  # 添加延迟链的回调函数
    
    print("Bye from Twisted developer!")

    
twisted_developer_day(["Customer %d" % i for i in range(15)])
start_time = datetime.datetime.now()
reactor.run()  # 这个东西到底是怎么触发延迟链里的延迟项的 ？？？？？？？？？
end_time = datetime.datetime.now()
print("cost = " + str(end_time - start_time))


这段代码的功能和之前的一样，但是好看很多。  
`inlineCallbacks` 装饰器用 Python 的机制暂停和继续 `inline_install()` 中的代码。  
`inline_install()` 变成了一个 ***延迟项***，而后对每名顾客并行执行。  
每次 `yield` 时，暂停当前的 `inline_install()`，被触发时再继续。

唯一的问题是：
   >当我们不是有15名顾客，而是10000名时，这段代码会同时发起10000个 进程（可以是HTTP请求、写入数据库等等）。
这可能可以运行，或者会产生严重的问题。在大并发应用中，我们通常会限制并发数。

---


在下面例子中。Scrapy使用了相似的机制，在 CONCURRENT_ITEMS 设置中限制并发数：



我们现在看到，一共有五个顾客的处理窗口。
只有存在空窗口时，才能服务新顾客。
因为处理每名顾客都是3秒，每批次可以处理5名顾客。
最终，我们只用一个线程就达到了相同的性能，而且代码很简单。




In [5]:
import datetime
from twisted.internet import reactor
from twisted.internet import defer
from twisted.internet import task

@defer.inlineCallbacks
def inline_install(customer):
    
    print("Scheduling: Installation for", customer)
    
    yield task.deferLater(reactor, 3, lambda: None)
    
    print("Callback: Finished installation for", customer)
    print("All done for", customer)

# 同上例

def twisted_developer_day(customers):
    
    print("Goodmorning from Twisted developer")
    work = [inline_install(customer) for customer in customers]
    work = iter(work)  # 不加的话 list 会不可迭代 ？
    
    coop = task.Cooperator()
    join = defer.DeferredList([coop.coiterate(work) for i in range(5)])  # 利用 task.Cooperator().coiterate 来限制并发
    
    join.addCallback(lambda _: reactor.stop())
    
    print("Bye from Twisted developer!")

twisted_developer_day(["Customer %d" % i for i in range(15)])
start_time = datetime.datetime.now()
reactor.run()
end_time = datetime.datetime.now()
print("cost = " + str(end_time - start_time))

Goodmorning from Twisted developer
Scheduling: Installation for Customer 0
Scheduling: Installation for Customer 1
Scheduling: Installation for Customer 2
Scheduling: Installation for Customer 3
Scheduling: Installation for Customer 4
Scheduling: Installation for Customer 5
Scheduling: Installation for Customer 6
Scheduling: Installation for Customer 7
Scheduling: Installation for Customer 8
Scheduling: Installation for Customer 9
Scheduling: Installation for Customer 10
Scheduling: Installation for Customer 11
Scheduling: Installation for Customer 12
Scheduling: Installation for Customer 13
Scheduling: Installation for Customer 14
Bye from Twisted developer!
Callback: Finished installation for Customer 0
All done for Customer 0
Callback: Finished installation for Customer 1
All done for Customer 1
Callback: Finished installation for Customer 2
All done for Customer 2
Callback: Finished installation for Customer 3
All done for Customer 3
Callback: Finished installation for Customer 4
A

我们现在看到，一共有五个顾客的处理窗口。  
只有存在空窗口时，才能服务新顾客。  
因为处理每名顾客都是3秒，每批次可以处理5名顾客。  
最终，我们只用一个线程就达到了相同的性能，而且代码很简单。  

