# Python concurrence and parallel

![python](https://upload.wikimedia.org/wikipedia/commons/c/c3/Python-logo-notext.svg)
 <center>--by wang liyao (leo) email: leo.y.wang@nokia-sbell.com<center> 

## agenda  ##

- why we need concurrence or parallel
- glossary explanation
- Parallel Handling
- practice
- asynchronous programming
- Q&A


## why we need concurrence or parallel

![%E5%9B%BE%E7%89%87.png](attachment:%E5%9B%BE%E7%89%87.png)


## glossary explanation

### concurrency vs parallelism
- 并发:一个处理器同时处理多个任务。
- 并行: 多个处理器或者是多核的处理器同时处理多个不同的任务.
> 前者是逻辑上的同时发生（simultaneous），而后者是物理上的同时发生．
>
> 来个比喻：并发和并行的区别就是一个人同时吃三个馒头和三个人同时吃三个馒头。
        

![%E5%9B%BE%E7%89%87.png](attachment:%E5%9B%BE%E7%89%87.png)

- 下图反映了一个包含8个操作的任务在一个有两核心的CPU中创建四个线程运行的情况。假设每个核心有两个线程.
![%E5%9B%BE%E7%89%87.png](attachment:%E5%9B%BE%E7%89%87.png)

- 那么每个CPU中两个线程会交替并发，两个CPU之间的操作会并行运算。单就一个CPU而言两个线程可以解决线程阻塞造成的不流畅问题，其本身运行效率并没有提高，多CPU的并行运算才真正解决了运行效率问题，这也正是并发和并行的区别

> concurrency: 要continue吗?　要的话就拿currency来吧. 竞价排名. 轮流调度
>
> parallelism: llel, 平行线, 追求平等.同时同等待遇. 资源足够, 共产主义时期啊 

> Python 中没有真正的并行，只有并发
>
> 无论你的机器有多少个CPU, 同一时间只有一个Python解析器执行。这也和大部分解释型语言一致， 都不支持并行。这应该是python设计的先天缺陷。
>
> javascript也是相同的道理， javascript早起的版本只支持单任务，后来通过worker来支持并发。

### multiprocessing vs thread
> 所谓进程，简单的说就是一段程序的动态执行过程，是系统进行资源分配和调度的一个基本单位。
> - 进程是程序的一次执行过程。若程序执行两次甚至多次，则需要两个甚至多个进程。
> - 进程是是正在运行程序的抽象。它代表运行的CPU，也称进程是对CPU的抽象。（虚拟技术的支持，将一个CPU变幻为多个虚拟的CPU）
> - 系统资源（如内存、文件）以进程为单位分配。
> - 操作系统为每个进程分配了独立的地址空间
> - 操作系统通过“调度”把控制权交给进程。


### multiprocessing vs thread
>　一个进程中又可以包含若干个独立的执行流，我们将这些执行流称为线程，线程是CPU调度和分配的基本单位。同一个进程的线程都有自己的专有寄存器，但内存等资源是共享的。

> - 应用的需要。比如打开一个浏览器，我想一边浏览网页，一边下载文件，一边播放音乐。如果一个浏览器是一个进程，那么这样的需求需要线程机制。
> - 开销的考虑。在进程内创建、终止线程比创建、终止进程要快。同一进程内的线程间切换比进程间的切换要快,尤其是用户级线程间的切换。线程之间相互通信无须通过内核（同一进程内的线程共享内存和文件）
> - 性能的考虑。多个线程中，任务功能不同（有的负责计算，有的负责I/O）,如果有多个处理器，一个进程就可以有很多的任务同时在执行。



- [大牛言论](http://www.ruanyifeng.com/blog/2013/04/processes_and_threads.html)

### blocking and non-blocking

#### blocking(阻塞)
> 程序未得到所需计算资源时被挂起的状态。
>
> 程序在等待某个操作完成期间，自身无法继续干别的事情，则称该程序在该操作上是阻塞的。
>
> 常见的阻塞形式有：网络I/O阻塞、磁盘I/O阻塞、用户输入阻塞等。
>
> 阻塞是无处不在的，包括CPU切换上下文时，所有的进程都无法真正干事情，它们也会被阻塞。（如果是多核CPU则正在执行上下文切换操作的核不可被利用。）

#### non-blocking
> 程序在等待某操作过程中，自身不被阻塞，可以继续运行干别的事情，则称该程序在该操作上是非阻塞的。
>
> 非阻塞并不是在任何程序级别、任何情况下都可以存在的。
>
> 仅当程序封装的级别可以囊括独立的子程序单元时，它才可能存在非阻塞状态


### Synchronous and asynchronous

#### Synchronous(同步)
> 不同程序单元为了完成某个任务，在执行过程中需靠某种通信方式以协调一致，称这些程序单元是同步执行的。
>
> 例如购物系统中更新商品库存，需要用“行锁”作为通信信号，让不同的更新请求强制排队顺序执行，那更新库存的操作是同步的。
>
> 简言之，同步意味着有序

#### asynchronous(异步)
> 为完成某个任务，不同程序单元之间过程中无需通信协调，也能完成任务的方式。
>
> 不相关的程序单元之间可以是异步的。
> 例如，爬虫下载网页。调度程序调用下载程序后，即可调度其他任务，而无需与该下载任务保持通信以协调行为。不同网页的下载、保存等操作都是无关的，也无需相互通知协调。这些异步操作的完成时刻并不确定。
>
> 简言之，异步意味着无序。


### glossary summary

- 并行是为了利用多核加速多任务完成的进度
- 并发是为了让独立的子任务都有机会被尽快执行，但不一定能加速整体进度
- 非阻塞是为了提高程序整体执行效率
- 异步是高效地组织非阻塞任务的方式
- 要支持并发，必须拆分为多任务，不同任务相对而言才有阻塞/非阻塞、同步/异步。所以，并发、异步、非阻塞三个词总是如影随形

## Parallel Handling


### non-parallell

In [None]:
# fetch content size from a series of web sites
import urllib
import time

urls = ['http://tdlte-report-server.china.nsn-net.net',
        'http://10.140.161.16/ta_doc/',
        'http://pypi.ute.inside.nsn.com']
begin = time.time()
for url in urls:
    print len(urllib.urlopen(url, proxies={}).read())
end = time.time()
print 'used time:', end-begin

### Thread

In [None]:
# introduce thread
from threading import Thread
import urllib
import time

urls = ['http://tdlte-report-server.china.nsn-net.net',
        'http://10.140.161.16/ta_doc/',
        'http://pypi.ute.inside.nsn.com']

class UrlFetchThread(Thread):
    def __init__(self, url, *args):
        super(UrlFetchThread, self).__init__(*args)
        self._url = url
        
    def run(self):
        print len(urllib.urlopen(self._url).read())

begin = time.time()
threads = map(UrlFetchThread, urls)
for t in threads:
    t.start()
    t.join()
end = time.time()
print 'used time:', end-begin

### multiprocess

In [None]:
# introduce multi process
from multiprocessing import Process
import urllib
import time

urls = ['http://tdlte-report-server.china.nsn-net.net',
        'http://10.140.161.16/ta_doc/',
        'http://pypi.ute.inside.nsn.com']

class UrlFetchProcess(Process):
    def __init__(self, url, *args):
        super(UrlFetchProcess, self).__init__(*args)
        self._url = url
        
    def run(self):
        print len(urllib.urlopen(self._url).read())
begin = time.time()        
processes = map(UrlFetchProcess, urls)
for p in processes:
    p.start()
    p.join()
end = time.time()
print 'used time:', end - begin

### pool

In [None]:
# use Pool
import time
from multiprocessing import Pool
from multiprocessing.dummy import Pool as ThreadPool

urls = ['http://tdlte-report-server.china.nsn-net.net',
        'http://10.140.161.16/ta_doc/',
        'http://pypi.ute.inside.nsn.com']

def fetch_content(url):
    print len(urllib.urlopen(url).read())

print 'begin test!'
procss_start_time = time.time()
pool = Pool()
pool.map(fetch_content, urls)
pool.close()
pool.join()
procss_end_time = time.time()
print 'proces use time:', procss_end_time - procss_start_time

# -----------------------------------------
thread_start_time = time.time()
thread_pool = ThreadPool()
thread_pool.map(fetch_content, urls)
thread_pool.close()
thread_pool.join()
thread_end_time = time.time()
print 'thread use time:', thread_end_time - thread_start_time

### Queue

In [None]:
# Queue
from multiprocessing import Process, Queue
import time

def f(q, num):
    q.put([num, None, 'hello'])

if __name__ == '__main__':
    q = Queue()
    begin  = time.time()
    p1 = Process(target=f, args=(q, 12))
    p2 = Process(target=f, args=(q, 24))
    p1.start()
    p2.start()
    print q.get()
    print q.get()
    p1.join()
    p2.join()
    end = time.time()
    print 'used time:', end - begin

### gevent

In [None]:
# introduce gevent
import gevent
from gevent import monkey
import time
monkey.patch_all()

urls = ['http://tdlte-report-server.china.nsn-net.net',
        'http://10.140.161.16/ta_doc/',
        'http://pypi.ute.inside.nsn.com']

def fetch_content(url):
    print len(urllib.urlopen(url).read())
begin = time.time()    
[gevent.spawn(fetch_content, url) for url in urls]

gevent.wait()
end = time.time()
print 'used time:', end - begin

### Practice


#### 完成btslog 抓syslog的功能
- 需求, 提供一个命令行终端输入模式, 如: ./syslog -l syslog_1.txt, 执行命令, 终端提示:输入stop, 停止抓log 

## Fibonacci program

> 斐波那契数列（意大利语： Successione di Fibonacci)，又称黄金分割数列、费波那西数列、费波拿契数、费氏数列，指的是这样一个数列：0、1、1、2、3、5、8、13、21、……在数学上，斐波纳契数列以如下被以递归的方法定义：F0=0，F1=1，Fn=Fn-1+Fn-2（n>=2，n∈N*），用文字来说，就是斐波那契数列列由 0 和 1 开始，之后的斐波那契数列系数就由之前的两数相加
>
> 要求:提供 fabnacci 函数module, 函数输入值是:序列第n位, 输出为 第n位的值，　如：f(0) = 0, f(1) = 1, f(2) =1, f(3) = 2, f(4) =3
>
>提供一个 socket server, 为client提供 fabnacci计算,返回给client正确的值

## asynchronous programming 异步I/O进化之路

《约束理论与企业优化》中指出：“除了瓶颈之外，任何改进都是幻觉。”

CPU告诉我们，它自己很快，而上下文切换慢、内存读数据慢、磁盘寻址与取数据慢、网络传输慢……
总之，离开CPU 后的一切，除了一级高速缓存，都很慢。我们观察计算机的组成可以知道，主要由运算器、控制器、存储器、输入设备、输出设备五部分组成。运算器和控制器主要集成在CPU中，除此之外全是I/O，包括读写内存、读写磁盘、读写网卡全都是I/O。I/O成了最大的瓶颈

### The Traditional Approach

In [None]:
import socket
import time
def blocking_way():
    sock = socket.socket()
    sock.connect(('baidu.com', 80))
    request = 'GET / HTTP/1.0\r\nHost: baidu.com\r\n\r\n'
    sock.send(request.encode('ascii'))
    response = b''
    chunk = sock.recv(4096)
    while chunk:
        response += chunk
        chunk = sock.recv(4096)
    return response

def sync_way():
    result = []
    for i in range(10):
        result.append(blocking_way())
    return result
start = time.time()
result = sync_way()
end = time.time()
print end - start

### The Process Approach

In [None]:
import socket
import time
from concurrent import futures
def blocking_way():
    sock = socket.socket()
    sock.connect(('baidu.com', 80))
    request = 'GET / HTTP/1.0\r\nHost: baidu.com\r\n\r\n'
    sock.send(request.encode('ascii'))
    response = b''
    chunk = sock.recv(4096)
    while chunk:
        response += chunk
        chunk = sock.recv(4096)
    return response

def process_way():
    works = 10
    with futures.ProcessPoolExecutor(works) as executor:
        futs = [executor.submit(blocking_way) for i in range(works)]
    result = [fut.result for fut in futs]
    return result
start = time.time()
result = process_way()
end = time.time()
print end - start

### The Thread Approach

In [None]:
import socket
import time
from concurrent import futures
def blocking_way():
    sock = socket.socket()
    sock.connect(('baidu.com', 80))
    request = 'GET / HTTP/1.0\r\nHost: baidu.com\r\n\r\n'
    sock.send(request.encode('ascii'))
    response = b''
    chunk = sock.recv(4096)
    while chunk:
        response += chunk
        chunk = sock.recv(4096)
    return response

def thread_way():
    works = 10
    with futures.ThreadPoolExecutor(works) as executor:
        futs = [executor.submit(blocking_way) for i in range(works)]
    result = [fut.result for fut in futs]
    return result
start = time.time()
result = thread_way()
end = time.time()
print end - start

### non-blocking way

In [None]:
import socket
#from socket import BlockingIOError
import time
def non_blocking_way():
    sock = socket.socket()
    sock.setblocking(False)
    try:
        sock.connect(('baidu.com', 80))
    except:
        pass
        print 'blocking IO error!'
    request = 'GET / HTTP/1.0\r\nHost: baidu.com\r\n\r\n'
    send_data = request.encode('ascii')
    response = b''
    while True:
        try:
            sock.send(send_data)
            break
        except:
            pass
            print 'bocking send data'
    while True:
        try:
            chunk = sock.recv(4096)
            while chunk:
                response += chunk
                chunk = sock.recv(4096)
            break
        except:
            pass
            print 'blocking receive data'
    return response

def sync_way():
    result = []
    for i in range(10):
        result.append(non_blocking_way())
    return result
start = time.time()
result = sync_way()
end = time.time()
print end - start

### non-blocking improve - select

判断非阻塞调用是否就绪如果 OS 能做，是不是应用程序就可以不用自己去等待和判断了，就可以利用这个空闲去做其他事情以提高效率。

所以OS将I/O状态的变化都封装成了事件，如可读事件、可写事件。并且提供了专门的系统模块让应用程序可以接收事件通知。这个模块就是select。让应用程序可以通过select注册文件描述符和回调函数。当文件描述符的状态发生变化时，select 就调用事先注册的回调函数。

select因其算法效率比较低，后来改进成了poll，再后来又有进一步改进，BSD内核改进成了kqueue模块，而Linux内核改进成了epoll模块。这四个模块的作用都相同，暴露给程序员使用的API也几乎一致，区别在于kqueue 和 epoll 在处理大量文件描述符时效率更高。

鉴于 Linux 服务器的普遍性，以及为了追求更高效率，所以我们常常听闻被探讨的模块都是 epoll 。

In [None]:
import socket
import select
#from socket import BlockingIOError
import time
def create_non_blocking_socket():
    sock = socket.socket()
    sock.setblocking(False)
    try:
        sock.connect(('baidu.com', 80))
    except:
        pass
        print 'blocking IO error!'
    return sock

def send_data(con):
    request = 'GET / HTTP/1.0\r\nHost: baidu.com\r\n\r\n'
    send_data = request.encode('ascii')
    print '++++send data', send_data 
    con.send(send_data)

def receive_data(con):
    response = b''
    chunk = sock.recv(4096)
    while chunk:
        response += chunk
        chunk = sock.recv(4096)
        break
    print '-----', response
    return response

def sync_way():
    inputs = []
    outputs = []
    result = []
    for i in range(10):
        inputs.append(create_non_blocking_socket())
    outputs = inputs[:]
    r_list, w_list, e_list = select.select(inputs, outputs, inputs, 1)
    count = 0
    while True:
        for r_conn in r_list:
            data_bytes = receive_data(r_conn)
            count += 1
            r_list.remove(r_conn)
            result.append(data_bytes)
        for w_conn in w_list:
            send_data(w_conn)
            count +=1
            w_list.remove(w_conn)
        if count == 20:
            break
    return result

start = time.time()
result = sync_way()
end = time.time()
print end - start, result

In [None]:
# -*- coding: utf-8 -*-
import socket
from selectors import DefaultSelector, EVENT_WRITE, EVENT_READ

selector = DefaultSelector()

stopped = False


class Future:
    def __init__(self):
        self.result = None
        self._callbacks = []

    def add_done_callback(self, fn):
        self._callbacks.append(fn)

    def set_result(self, result):
        self.result = result
        for fn in self._callbacks:
            fn(self)

class Crawler(object):
    counter = 0
    def __init__(self):
        self.url = '10.56.78.25'
        self.sock = None
        self.response = b''

    def fetch(self):
        self.sock = socket.socket()
        self.sock.setblocking(False)
        try:
            self.sock.connect(('10.56.78.25', 80))
        except:
            pass
        selector.register(self.sock.fileno(), EVENT_WRITE, self.connected)

    def connected(self, key, mask):
        selector.unregister(key.fd)
        get = 'GET / HTTP/1.0\r\nHOST: 10.56.78.25\r\n\r\n.'
        self.sock.send(get.encode('ascii'))
        selector.register(key.fd, EVENT_READ, self.read_response)

    def read_response(self, key, mask):
        global stopped
        chunk = self.sock.recv(4096)
        if chunk:
            self.response += chunk
        else:
            selector.unregister(key.fd)
            #urls_todo.remove(self.url)
            Crawler.counter += 1
            #print("response:",self.response)
            if Crawler.counter == 10:
                stopped = True


def loop():
    while not stopped:
        events = selector.select()
        for event_key, event_mask in events:
            callback = event_key.data
            callback(event_key, event_mask)

if __name__ == '__main__':
    import time
    start = time.time()
    for i in range(10):
        crawler = Crawler()
        crawler.fetch()
    loop()
    print(time.time() - start)

### Co-routine(协程)
- 协程(Co-routine)，Coroutine其实是corporate routine的缩写, 即是协作式的例程。

在linux系统中，线程就是轻量级的进程，而我们通常也把协程称为轻量级的线程即微线程。

它是非抢占式的多任务子例程的概括，可以允许有多个入口点在例程中确定的位置来控制程序的暂停与恢复执行


所以子程序调用是通过栈实现的，一个线程就是执行一个子程序。

子程序调用总是一个入口，一次返回，调用顺序是明确的。而协程的调用和子程序不同。

协程看上去也是子程序，但执行过程中，在子程序内部可中断，然后转而执行别的子程序，在适当的时候再返回来接着执行。

注意，在一个子程序中中断，去执行其他子程序，不是函数调用，有点类似CPU的中断。比如子程序A、B：
```python
def A():
    print('1')
    print('2')
    print('3')

def B():
    print('x')
    print('y')
    print('z')
```
在执行A的过程中，可以随时中断，去执行B，B也可能在执行过程中中断再去执行A，结果可能是：
```
1
2
x
y
3
z
```


#### example
- Python对协程的支持是通过generator实现的。

In [None]:
def simple_coroutine():
    print('-> start')
    x = yield
    print('-> recived', x)

sc = simple_coroutine()

next(sc)
sc.send('test!')

- consumer and producer

In [None]:
def consumer():
    r = ''
    while True:
        n = yield r
        print('[CONSUMER] Consuming %s...' % n)
        r = '200 OK'

def produce(c):
    c.send(None)
    n = 0
    while n < 5:
        n = n + 1
        print('[PRODUCER] Producing %s...' % n)
        r = c.send(n)
        print('[PRODUCER] Consumer return: %s' % r)
    c.close()

c = consumer()
produce(c)

#### 基于生成器的协程
- asyncio是Python 3.4版本引入的标准库，直接内置了对异步IO的支持。

- asyncio的编程模型就是一个消息循环。我们从asyncio模块中直接获取一个EventLoop的引用，然后把需要执行的协程扔到EventLoop中执行，就实现了异步IO。

#### asyncio "hello world" example

```python
import asyncio

@asyncio.coroutine
def hello():
    print("Hello world!")
    # 异步调用asyncio.sleep(1):
    r = yield from asyncio.sleep(1)
    print("Hello again!")

# 获取EventLoop:
loop = asyncio.get_event_loop()
# 执行coroutine
loop.run_until_complete(hello())
loop.close()
```
@asyncio.coroutine把一个generator标记为coroutine类型，然后，我们就把这个coroutine扔到EventLoop中执行。

hello()会首先打印出Hello world!，然后，yield from语法可以让我们方便地调用另一个generator。由于asyncio.sleep()也是一个coroutine，所以线程不会等待asyncio.sleep()，而是直接中断并执行下一个消息循环。当asyncio.sleep()返回时，线程就可以从yield from拿到返回值（此处是None），然后接着执行下一行语句。

把asyncio.sleep(1)看成是一个耗时1秒的IO操作，在此期间，主线程并未等待，而是去执行EventLoop中其他可以执行的coroutine了，因此可以实现并发执行。

#### asyncio add task example

In [None]:
import threading
import asyncio

@asyncio.coroutine
def hello():
    print('Hello world! (%s)' % threading.currentThread())
    yield from asyncio.sleep(1)
    print('Hello again! (%s)' % threading.currentThread())

loop = asyncio.get_event_loop()
tasks = [hello(), hello()]
loop.run_until_complete(asyncio.wait(tasks))
loop.close()

#### refactor the before example code

In [None]:
import asyncio
import time

@asyncio.coroutine
def wget(host):
    #print('wget %s...' % host)
    connect = asyncio.open_connection(host, 80)
    reader, writer = yield from connect
    header = 'GET / HTTP/1.0\r\nHost: %s\r\n\r\n' % host
    writer.write(header.encode('utf-8'))
    yield from writer.drain()
    while True:
        line = yield from reader.readline()
        if line == b'\r\n':
            break
        #print('%s header > %s' % (host, line.decode('utf-8').rstrip()))
    # Ignore the body, close the socket
    writer.close()

start_time = time.time()
loop = asyncio.get_event_loop()
tasks = [wget(host='10.56.78.25') for i in range(10)]
loop.run_until_complete(asyncio.wait(tasks))
loop.close()
print("use time:", time.time() - start_time)

## Reference

* https://wiki.python.org/moin/Generators
* https://docs.python.org/2/library/threading.html
* https://docs.python.org/2/library/multiprocessing.html
* https://www.cnblogs.com/zhaof/p/7536569.html
* https://blog.csdn.net/drdairen/article/details/69487643
* https://blog.csdn.net/u011019726/article/details/78191193?locationNum=6&fps=1
* https://www.reddit.com/r/Python/comments/1s7eae/asyncio_vs_gevent/
* http://www.ruanyifeng.com/blog/2013/04/processes_and_threads.html
* https://blog.csdn.net/andybegin/article/details/77884645
* https://www.liaoxuefeng.com/wiki/0014316089557264a6b348958f449949df42a6d3a2e542c000/001432090954004980bd351f2cd4cc18c9e6c06d855c498000
* https://www.liaoxuefeng.com/wiki/0014316089557264a6b348958f449949df42a6d3a2e542c000/001432090171191d05dae6e129940518d1d6cf6eeaaa969000

## Q & A