# 并发和并行
并发是多个事件在同一时间段内发生  
并行是多个事件在同一时刻发生  
并行是多核，并发则不一定是多核，单核也能够实现并发。  

进程 线程 协程 异步
并发编程（不是并行）目前有四种方式：多进程、多线程、协程。

多进程编程在python中有类似C的os.fork,更高层封装的有multiprocessing标准库  
多线程编程python中有Thread和threading  
协程在linux下主+要有三种实现select，poll，epoll  
协程在python中通常会说到yield，关于协程的库主要有greenlet,stackless,gevent,eventlet等。

Golang中的协程可以利用多核，Python中的协程无法利用多核，就像线程一样，只不过它比线程轻量很多。

gevent是Python协程库。
* 基于libev的快速事件循环(Linux上epoll，FreeBSD上kqueue）。
* 基于greenlet的轻量级执行单元。
* API的概念和Python标准库一致(如事件，队列)。
* 可以配合socket，ssl模块使用。
* 能够使用标准库和第三方模块创建标准的阻塞套接字(gevent.monkey)。
* 默认通过线程池进行DNS查询,也可通过c-are(通过GEVENT_RESOLVER=ares环境变量开启）。
* TCP/UDP/HTTP服务器
* 子进程支持（通过gevent.subprocess）


Greenlet的状态
greenlet的状态通常是一个依赖于时间的参数：

* started – Boolean, 指示此Greenlet是否已经启动
* ready() – Boolean, 指示此Greenlet是否已经停止
* successful() – Boolean, 指示此Greenlet是否已经停止而且没抛异常
* value – 任意值, 此Greenlet代码返回的值，使用get()方法可以获取value
* exception – 异常, 此Greenlet内抛出的未捕获异常


**Python是一门胶水语言，它把各种网络请求得到的数据粘合在一起**

In [1]:
from gevent import monkey

# 使用下面这一句话就能够让全部同步变成异步，性能得到巨大提升
monkey.patch_socket()

In [2]:
import gevent
"""
使用join可以发现这个进程是同步执行的
"""

def f(n):
    for i in range(n):
        print(gevent.getcurrent(), i)


g1 = gevent.spawn(f, 5)
g2 = gevent.spawn(f, 5)
g3 = gevent.spawn(f, 5)
g1.join()
g2.join()
g3.join()

<Greenlet at 0x7fba10495170: f(5)> 0
<Greenlet at 0x7fba10495170: f(5)> 1
<Greenlet at 0x7fba10495170: f(5)> 2
<Greenlet at 0x7fba10495170: f(5)> 3
<Greenlet at 0x7fba10495170: f(5)> 4
<Greenlet at 0x7fb9f05f3710: f(5)> 0
<Greenlet at 0x7fb9f05f3710: f(5)> 1
<Greenlet at 0x7fb9f05f3710: f(5)> 2
<Greenlet at 0x7fb9f05f3710: f(5)> 3
<Greenlet at 0x7fb9f05f3710: f(5)> 4
<Greenlet at 0x7fb9f05f35f0: f(5)> 0
<Greenlet at 0x7fb9f05f35f0: f(5)> 1
<Greenlet at 0x7fb9f05f35f0: f(5)> 2
<Greenlet at 0x7fb9f05f35f0: f(5)> 3
<Greenlet at 0x7fb9f05f35f0: f(5)> 4


In [1]:
import gevent
import requests

"""
使用joinall等待全部协程结束
使用这种方式写爬虫，速度非常快
"""
res=[]
def f(url):
    print('GET: %s' % url)
    resp = requests.get(url)
    data = resp.text
    print('%d bytes received from %s.' % (len(data), url))
    res.append(data)

gevent.joinall([
        gevent.spawn(f, 'https://www.baidu.com/'),
        gevent.spawn(f, 'https://www.zhihu.com/'),
        gevent.spawn(f, 'https://weiyinfu.cn/'),
])
print('total length',sum(len(i)for i in res))

GET: https://www.baidu.com/
2443 bytes received from https://www.baidu.com/.
GET: https://www.zhihu.com/
170 bytes received from https://www.zhihu.com/.
GET: https://weiyinfu.cn/
688 bytes received from https://weiyinfu.cn/.
total length 3301


In [29]:
from gevent import socket
jobs=gevent.joinall([
        gevent.spawn(socket.gethostbyname, 'www.baidu.com'),
        gevent.spawn(socket.gethostbyname, 'weiyinfu.cn'),
],timeout=3)# 此处可以使用timeout参数设置超时
# 使用get和value是一样的效果
print([i.value for i in jobs])
print([i.get() for i in jobs])

['123.206.21.187', '61.135.185.32']
['123.206.21.187', '61.135.185.32']


In [3]:
import gevent
import requests
import time
"""
使用joinall等待全部协程结束
使用time.sleep并不能够利用多核
"""
res=[]
def f(name):
    print(name)
    time.sleep(3)
    res.append(name)
begin_time=time.time()
gevent.joinall([
        gevent.spawn(f, 'one'),
        gevent.spawn(f, 'two'),
        gevent.spawn(f, 'three'),
])
end_time=time.time()
print(res)
print(end_time-begin_time)

one
two
three
['one', 'two', 'three']
9.007352113723755


In [5]:
import gevent
import requests
import time
"""
使用joinall等待全部协程结束
使用time.sleep并不能够利用多核，因为sleep是系统调用
"""
res=[]
def f(name):
    print(name)
    gevent.sleep(3)
    res.append(name)
begin_time=time.time()
gevent.joinall([
        gevent.spawn(f, 'one'),
        gevent.spawn(f, 'two'),
        gevent.spawn(f, 'three'),
])
end_time=time.time()
print(res)
print(end_time-begin_time)

one
two
three
['one', 'two', 'three']
3.0049569606781006


打补丁

In [None]:
from gevent import monkey; monkey.patch_socket()
from gevent import monkey; monkey.patch_all()
from gevent import monkey; monkey.patch_select()

In [25]:
#主进程结束，协程立马退出，协程是一个deamon
res=[]
def f(name):
    print(name)
    gevent.sleep(13)
    res.append(name)
gevent.spawn(f, 'one')
print(res)

[]


# 锁和信号量
信号量是一个允许greenlet相互合作，限制并发访问或运行的低层次的同步原语。 信号量有两个方法，acquire和release。在信号量是否已经被 acquire或release，和拥有资源的数量之间不同，被称为此信号量的范围 (the bound of the semaphore)。如果一个信号量的范围已经降低到0，它会 阻塞acquire操作直到另一个已经获得信号量的greenlet作出释放。

In [33]:
from gevent import sleep
from gevent.pool import Pool
from gevent import lock
sem = lock.BoundedSemaphore(2)
def worker1(n):
    sem.acquire()
    print('Worker %i acquired semaphore' % n)
    sleep(0)
    sem.release()
    print('Worker %i released semaphore' % n)
def worker2(n):
    with sem:
        print('Worker %i acquired semaphore' % n)
        sleep(0)
    print('Worker %i released semaphore' % n)
pool = Pool()
pool.map(worker1, range(0,2))

Worker 0 acquired semaphore
Worker 1 acquired semaphore
Worker 0 released semaphore
Worker 1 released semaphore


[None, None]

In [31]:
dir(gevent)

['Greenlet',
 'GreenletExit',
 'Timeout',
 '__abstract_linkable',
 '__all__',
 '__builtins__',
 '__cached__',
 '__doc__',
 '__file__',
 '__greenlet_primitives',
 '__hub_local',
 '__hub_primitives',
 '__ident',
 '__imap',
 '__loader__',
 '__name__',
 '__package__',
 '__path__',
 '__semaphore',
 '__spec__',
 '__version__',
 '__waiter',
 '_abstract_linkable',
 '_compat',
 '_config',
 '_event',
 '_greenlet',
 '_greenlet_primitives',
 '_hub_local',
 '_hub_primitives',
 '_ident',
 '_imap',
 '_queue',
 '_semaphore',
 '_signal_class',
 '_signal_metaclass',
 '_signal_module',
 '_socket3',
 '_socketcommon',
 '_tblib',
 '_threading',
 '_util',
 '_version_info',
 '_waiter',
 'absolute_import',
 'config',
 'event',
 'exceptions',
 'fork',
 'get_hub',
 'getcurrent',
 'getswitchinterval',
 'greenlet',
 'hub',
 'idle',
 'iwait',
 'joinall',
 'kill',
 'killall',
 'libev',
 'lock',
 'monkey',
 'namedtuple',
 'os',
 'pool',
 'queue',
 'reinit',
 'resolver',
 'setswitchinterval',
 'signal',
 'signal_handl

# 协程局部变量

In [34]:
import gevent
from gevent.local import local
stash = local()
def f1():
    stash.x = 1
    print(stash.x)
def f2():
    stash.y = 2
    print(stash.y)
    try:
        stash.x
    except AttributeError:
        print("x is not local to f2")
g1 = gevent.spawn(f1)
g2 = gevent.spawn(f2)
gevent.joinall([g1, g2])

1
2
x is not local to f2


[<Greenlet at 0x7f86489c4cb0: _run>, <Greenlet at 0x7f86489c4ef0: _run>]

# 子进程

In [35]:
import gevent
from gevent.subprocess import Popen, PIPE
def cron():
    while True:
        print("cron")
        gevent.sleep(0.2)
g = gevent.spawn(cron)
sub = Popen(['sleep 1; uname'], stdout=PIPE, shell=True)
out, err = sub.communicate()
g.kill()
print(out.rstrip())

cron
cron
cron
cron
cron
cron
b'Darwin'


# 简单实现server

In [36]:
from gevent.server import StreamServer
def handle(socket, address):
    socket.send("Hello from a telnet!\n")
    for i in range(5):
        socket.send(str(i) + '\n')
    socket.close()
server = StreamServer(('127.0.0.1', 5000), handle)
server.serve_forever()

KeyboardInterrupt
2020-11-12T11:22:36Z


KeyboardInterrupt: 

# Actor模型
actor模型是一个由于Erlang变得普及的更高层的并发模型。 简单的说它的主要思想就是许多个独立的Actor，每个Actor有一个可以从 其它Actor接收消息的收件箱。Actor内部的主循环遍历它收到的消息，并根据它期望的行为来采取行动。

Gevent没有原生的Actor类型，但在一个子类化的Greenlet内使用队列， 我们可以定义一个非常简单的。


In [None]:
import gevent
from gevent.queue import Queue
from gevent import Greenlet
class Actor(gevent.Greenlet):
    def __init__(self):
        self.inbox = Queue()
        Greenlet.__init__(self)
    def receive(self, message):
        """
        Define in your subclass.
        """
        raise NotImplemented()
    def _run(self):
        self.running = True
        while self.running:
            message = self.inbox.get()
            self.receive(message)
class Pinger(Actor):
    def receive(self, message):
        print(message)
        pong.inbox.put('ping')
        gevent.sleep(0)
class Ponger(Actor):
    def receive(self, message):
        print(message)
        ping.inbox.put('pong')
        gevent.sleep(0)
ping = Pinger()
pong = Ponger()
ping.start()
pong.start()
ping.inbox.put('start')
gevent.joinall([ping, pong])

In [1]:
import gevent

In [11]:
import time
def haha():
    time.sleep(3)
    print('haha')
x=gevent.spawn(haha)
x.start()

In [12]:
x.get()

haha
