# 第一章

## The parallel computing memory architecture
* Single instruction, single data (SISD)
![](./figs/SISD.png)
* Single instruction, multiple data (SIMD)

* Multiple instruction, single data (MISD)
![](./figs/MISD.png)
* Multiple instruction, multiple data (MIMD)
![](./figs/MIMD.png)

## Memory organization
![](figs/memory_MIMD.png)

## Shared memory
![](./figs/share_memory.png)

## Distributed memory
![](./figs/Distributed_memory.png)

## Parallel programming models
* The shared memory model
* The multithread model
* The distributed memory/message passing model
* The data parallel model

### The shared memory model
在此模型中，任务共享一个共享内存区域，其中对共享资源的访问（读取和写入数据）是异步的。 有一些机制允许程序员控制对共享内存的访问，例如，锁或信号灯。 这种模型的优点是程序员不必明确任务之间的通信。 在性能方面的一个重要缺点是，它变得更加难以理解和管理数据局部性。 将数据保留在使用该处理器的处理器本地，可以节省内存访问，高速缓存刷新以及在多个处理器使用同一数据时发生的总线流量。

### The multithread model

### The message passing model
消息传递模型通常适用于每个处理器都有自己的内存（分布式内存系统）的情况。 更多任务可以驻留在同一台物理计算机上或任意数量的计算机上。 程序员负责确定通过消息进行的并行性和数据交换。 此并行编程模型的实现需要使用要使用的（临时）软件库
在代码中。 线程/进程通信对应着消息传递模式.

### The data parallel model
![](./figs/data_parallel.png)

## How to evaluate the performance of a parallel program


### Speedup

$S = \frac{T_{S}}{T_{p}}$, p identical processing elements

* S = p is linear or ideal speedup
* S < p is real speedup
* S > p is superlinear speedup

## Start working with processes in Python

In [1]:
# import os
# import sys
# ##this is the code to execute
# program = "python"
# print("Process calling")
# arguments = ["called_Process.py"]
# ##we call the called_Process.py script
# os.execvp(program, (program,) + tuple(arguments))
# print("Good Bye!!")

## Start working with threads in Python

* 基于线程的并行性是编写并行程序的标准方法。 但是，Python解释器不是完全线程安全的。 为了支持多线程Python程序，使用了称为全局解释器锁（GIL）的全局锁。 这意味着只有一个线程可以同时执行Python代码。 短时间或某个线程执行可能需要一段时间的操作后，Python会自动切换到下一个线程。
* 使用线程时要注意的一个关键点是，必须始终确保永远不要让任何线程在后台运行。

In [2]:
from threading import Thread
##Also we use the sleep function to make the thread "sleep"
from time import sleep
## To create a thread in Python you'll want to make your class work as a thread.
## For this, you should subclass your class from the Thread class
class CookBook(Thread):
    def __init__(self):
        Thread.__init__(self)
        self.message = "Hello Parallel Python CookBook!!\n"
    ##this method prints only the message
    def print_message(self):
        print (self.message)
        ##The run method prints ten times the message
    def run(self):
        print ("Thread Starting\n")
        x=0
        while (x < 10):
            self.print_message()
            sleep(2)
            x += 1
        print ("Thread Ended\n")
#start the main process
print ("Process Started")

# create an instance of the HelloWorld class
hello_Python = CookBook()
# print the message...starting the thread
hello_Python.start()
#end the main process
print ("Process Ended")

Process Started
Thread Starting

Hello Parallel Python CookBook!!

Process Ended


## 并发例子

In [3]:
import multiprocessing

def make_data(queue, num, work_nums):
    for i in range(num):
        queue.put(i)
    for i in range(work_nums):
        queue.put(None)

def handle_data(queue, share_value, lock):
    while True:
        data = queue.get()
        if data is None:
            break
        lock.acquire()
        share_value.value = share_value.value + data
        lock.release()

if __name__ == "__main__":
    queue = multiprocessing.Queue()  # 进程间通信所用
    share_value = multiprocessing.Value("i", 0)  # 进程间共享所用
    lock = multiprocessing.Lock()  # 进程间共享内存时，采用锁同步机制
    num = 10000  #
    work_nums = 5  # work进程个数
    sub_process = []  # 处理数据进程集合

    master_process = multiprocessing.Process(target=make_data, args=(queue, num, work_nums, ))  # 生成数据进程
    for i in range(work_nums):
        sub_process1 = multiprocessing.Process(target=handle_data, args=(queue, share_value, lock,))
        sub_process.append(sub_process1)

    master_process.start()
    for p in sub_process:
        p.start()

    master_process.join()
    for p in sub_process:
        p.join()

    # 结果对比
    result = 0
    for i in range(num):
        result = result + i
    print("result should be " + str(result))
    print("fact is " + str(share_value.value))

result should be 49995000
fact is 49995000


## 并发(concurrency) vs 并行(parallellism)
* Concurrency is not Parallelism
* Concurrency enables parallelism & makes parallelism (and scaling and everything else) easy

* 当有多个线程在操作时，如果系统只有一个 CPU，则它根本不可能真正同时进行一个以上的线程，它只能把 CPU 运行时间划分成若干个时间段，再将时间段分配给各个线程执行，在一个时间段的线程代码运行时,其它线程处于挂起状态.这种方式我们称之为并发（Concurrent）。

* 当系统有一个以上 CPU 时，则线程的操作有可能非并发。当一个 CPU 执行一个线程时，另一个 CPU 可以执行另一个线程，两个线程互不抢占 CPU 资源，可以同时进行，这种方式我们称之为并行（Parallel）

![](./figs/concurrent_parallel.png)

## 阻塞与非阻塞:

* 阻塞是指调用线程或者进程被操作系统挂起。
* 非阻塞是指调用线程或者进程不会被操作系统挂起。

## 同步与异步:
* 同步是阻塞模式: 同步就是指一个进程在执行某个请求的时候，若该请求需要一段时间才能返回信息，那么这个进程将会一直等待下去，知道收到返回信息才继续执行下去；

* 异步是非阻塞模式:异步是指进程不需要一直等下去，而是继续执行下面的操作，不管其他进程的状态。当有消息返回式系统会通知进程进行处理，这样可以提高执行的效率。


## 协程
* 协程，又称微线程，纤程。英文名Coroutine。一句话说明什么是线程：协程是一种用户态的轻量级线程。协程不是被操作系统内核所管理，而完全是由程序所控制

协程的好处：
* 无需线程上下文切换的开销
* 无需原子操作锁定及同步的开销
* 方便切换控制流，简化编程模型

缺点：
* 无法利用多核资源：协程的本质是个单线程,它不能同时将 单个CPU 的多个核用上,协程需要和进程配合才能运行在多CPU上.当然我们日常所编写的绝大部分应用都没有这个必要，除非是cpu密集型应用。
* 进行阻塞（Blocking）操作（如IO时）会阻塞掉整个程序


In [4]:
# # python3.7 需要安装pip install tornado==4.5.3
# import asyncio,time

# @asyncio.coroutine #设为异步函数
# def func1(num):
#     print(num,'before---func1----')
#     yield from asyncio.sleep(5)
#     print(num,'after---func1----')

# task = [func1(1),func1(2)]

# if __name__ == "__main__":
#     begin = time.time()
#     loop = asyncio.get_event_loop() #进入事件循环
#     loop.run_until_complete(asyncio.gather(*task)) #将协同程序注册到事件循环中
#     loop.close()
#     end = time.time()
#     print(end-begin)

In [5]:
from greenlet import greenlet

def foo():
    print("foo")
    bar()

def bar():
    a = 3 + 1
    print(a)
    gr2.switch()
    print("end bar")


gr1 = greenlet(bar)
gr2 = greenlet(foo)
gr1.switch()

4
foo
4
end bar


# 第二章 Thread-based Parallelism

当前，用于软件应用程序并发管理的最广泛使用的编程范例是基于多线程的。 通常，应用程序是由单个进程创建的，该进程分为多个独立的线程，这些线程代表并行运行并相互竞争的不同类型的活动。

## Using the Python threading module
* The thread object
* The Lock object # 锁定对象
* The RLock object #  RLock对象
* The semaphore object # 信号量对象
* The condition object # 条件对象
* The event object # 事件对象

## How to define a thread

In [6]:
import threading
def function(i):
    print ("function called by thread %d\n" % i)
    return 

threads = []
for i in range(5):
    t = threading.Thread(target=function , args=(i,))
    threads.append(t)
    t.start()
    t.join()

function called by thread 0

function called by thread 1

function called by thread 2

function called by thread 3

function called by thread 4



## How to determine the current thread

In [7]:
import threading
import time
def first_function():
    print (threading.currentThread().getName()+\
    str(' is Starting \n'))
    time.sleep(2)
    print (threading.currentThread().getName()+\
    str( ' is Exiting \n'))
    return
def second_function():
    print (threading.currentThread().getName()+\
    str(' is Starting \n'))
    time.sleep(2)
    print (threading.currentThread().getName()+\
    str( ' is Exiting \n'))
    return
def third_function():
    print (threading.currentThread().getName()+\
    str(' is Starting \n'))
    time.sleep(2)
    print (threading.currentThread().getName()+\
    str( ' is Exiting \n'))
    return

t1 = threading.Thread(name='first_function', target=first_function)
t2 = threading.Thread(name='second_function', target=second_function)
t3 = threading.Thread(name='third_function',target=third_function)
t1.start()
t2.start()
t3.start()

first_function is Starting 

second_function is Starting 
third_function is Starting 




## How to use a thread in a subclass

In [8]:
import threading
import time


class myThread (threading.Thread):
    def __init__(self, threadID, name, counter):
        threading.Thread.__init__(self)
        self.threadID = threadID
        self.name = name
        self.counter = counter
    def run(self):
        print ("Starting " + self.name)
        print_time(self.name, self.counter, 5)
        print ("Exiting " + self.name)
def print_time(threadName, delay, counter):
    while counter:
        time.sleep(delay)
        print ("%s: %s" %(threadName, time.ctime(time.time())))
        counter -= 1
# Create new threads
thread1 = myThread(1, "Thread-1", 1)
thread2 = myThread(2, "Thread-2", 2)
# Start new Threads
thread1.start()
thread2.start()
print ("Exiting Main Thread")

Starting Thread-1
Starting Thread-2
Exiting Main Thread


## Thread synchronization with Lock and RLock
![](./figs/Deadlock.png)

In [9]:
import threading
shared_resource_with_lock = 0
shared_resource_with_no_lock = 0
COUNT = 100000
shared_resource_lock = threading.Lock()
####LOCK MANAGEMENT##
def increment_with_lock():
    global shared_resource_with_lock
    for i in range(COUNT):
        shared_resource_lock.acquire()
        shared_resource_with_lock +=1
        shared_resource_lock.release()
def decrement_with_lock():
    global shared_resource_with_lock
    for i in range(COUNT):
        shared_resource_lock.acquire()
        shared_resource_with_lock -=1
        shared_resource_lock.release()
####NO LOCK MANAGEMENT ##
def increment_without_lock():
    global shared_resource_with_no_lock
    for i in range(COUNT):
        shared_resource_with_no_lock +=1
def decrement_without_lock():
    global shared_resource_with_no_lock
    for i in range(COUNT):
        shared_resource_with_no_lock -=1

        
####the Main program
t1 = threading.Thread(target = increment_with_lock)
t2 = threading.Thread(target = decrement_with_lock)
t3 = threading.Thread(target = increment_without_lock)
t4 = threading.Thread(target = decrement_without_lock)
t1.start()
t2.start()
t3.start()
t4.start()
t1.join()
t2.join()
t3.join()
t4.join()
print ("the value of shared variable with lock management is %s"%shared_resource_with_lock)
print ("the value of shared variable with race condition is %s"%shared_resource_with_no_lock)

the value of shared variable with lock management is 0
the value of shared variable with race condition is 0


## Thread synchronization with RLock

* 如果我们只希望获得锁的线程将其释放，则必须使用RLock（）对象。
* 与Lock（）对象类似，RLock（）对象有两种方法：acquire（）和release（） 。
* 当您想从外部访问线程安全时，RLock（）很有用。并在类内部使用相同的方法。

In [11]:
import threading
import time
class Box(object):
    lock = threading.RLock()
    def __init__(self):
        self.total_items = 0
    def execute(self,n):
        Box.lock.acquire()
        self.total_items += n
        Box.lock.release()
    def add(self):
        Box.lock.acquire()
        self.execute(1)
        Box.lock.release()
    def remove(self):
        Box.lock.acquire()
        self.execute(-1)
        Box.lock.release()
## These two functions run n in separate
## threads and call the Box's methods
def adder(box,items):
    while items > 0:
        print ("adding 1 item in the box\n")
        box.add()
        time.sleep(2)
        items -= 1
def remover(box,items):
    while items > 0:
        print ("removing 1 item in the box")
        box.remove()
        time.sleep(2)
        items -= 1

## the main program build some
## threads and make sure it works
items = 5
print ("putting %s items in the box " % items)
box = Box()
t1 = threading.Thread(target=adder,args=(box,items))
t2 = threading.Thread(target=remover,args=(box,items))
t1.start()
t2.start()
t1.join()
t2.join()
print ("%s items still remain in the box " % box.total_items)

putting 5 items in the box 
adding 1 item in the box

removing 1 item in the box
adding 1 item in the box

removing 1 item in the box
adding 1 item in the box

removing 1 item in the box
removing 1 item in the box
adding 1 item in the box

removing 1 item in the box
adding 1 item in the box

0 items still remain in the box 


## Thread synchronization with semaphores
![](./figs/Semaphores.png)

In [14]:
###Using a Semaphore to synchronize threads
import threading
import time
import random
##The optional argument gives the initial value for the internal
##counter;
##it defaults to 1.
##If the value given is less than 0, ValueError is raised.
semaphore = threading.Semaphore(0)
def consumer():
    print ("consumer is waiting.")
    ##Acquire a semaphore
    semaphore.acquire()
    ##The consumer have access to the shared resource
    print ("Consumer notify : consumed item number %s " %item)
def producer():
    global item
    time.sleep(2)
    ##create a random item
    item = random.randint(0,1000)
    print ("producer notify : produced item number %s" %item)
    ##Release a semaphore, incrementing the internal counter by one.
    ##When it is zero on entry and another thread is waiting for it
    ##to become larger than zero again, wake up that thread.
    semaphore.release()
    
    
#Main program
for i in range (0,5) :
    t1 = threading.Thread(target=producer)
    t2 = threading.Thread(target=consumer)
    t1.start()
    t2.start()
    t1.join()
    t2.join()
print ("program terminated")

consumer is waiting.
producer notify : produced item number 387
Consumer notify : consumed item number 387 
consumer is waiting.
producer notify : produced item number 313
Consumer notify : consumed item number 313 
consumer is waiting.
producer notify : produced item number 845
Consumer notify : consumed item number 845 
consumer is waiting.
producer notify : produced item number 557
Consumer notify : consumed item number 557 
producer notify : produced item number 2
consumer is waiting.
Consumer notify : consumed item number 2 
producer notify : produced item number 721
program terminated


## Thread synchronization with a condition

In [17]:
from threading import Thread, Condition
import time
items = []
condition = Condition()
class consumer(Thread):
    def __init__(self):
        Thread.__init__(self)
    def consume(self):
        global condition
        global items
        condition.acquire()
        if len(items) == 0:
            condition.wait()
            print("Consumer notify : no item to consume")
            items.pop()
            print("Consumer notify : consumed 1 item")
            print("Consumer notify : items to consume are "\
            + str(len(items)))
            condition.notify()
        condition.release()
    def run(self):
        for i in range(0,20):
            time.sleep(1)
            self.consume()
class producer(Thread):
    def __init__(self):
        Thread.__init__(self)
    def produce(self):
        global condition
        global items
        condition.acquire()
        if len(items) == 10:
            condition.wait()
            print("Producer notify : items producted are "\
            + str(len(items)))
            print("Producer notify : stop the production!!")
            items.append(1)
            print("Producer notify : total items producted "\
            + str(len(items)))
            condition.notify()
        condition.release()
    def run(self):
        for i in range(0,20):
            time.sleep(1)
            self.produce()


producer = producer()
consumer = consumer()
producer.start()
consumer.start()
producer.join()
consumer.join()

KeyboardInterrupt: 

## Thread synchronization with an event

In [None]:
import time
from threading import Thread, Event
import random
items = []
event = Event()
class consumer(Thread):
    def __init__(self, items, event):
        Thread.__init__(self)
        self.items = items
        self.event = event
    def run(self):
        while True:
            time.sleep(2)
            self.event.wait()
            item = self.items.pop()
            print ('Consumer notify : %d popped from list by %s'\
            %(item, self.name))
class producer(Thread):
    def __init__(self, integers, event):
        Thread.__init__(self)
        self.items = items
        self.event = event
    def run(self):
        global item
        for i in range(100):
            time.sleep(2)
            item = random.randint(0, 256)
            self.items.append(item)
            print ('Producer notify : item N° %d appended \
            to list by %s'\
            % (item, self.name))
            print ('Producer notify : event set by %s'\
            % self.name)
            self.event.set()
            print ('Produce notify : event cleared by %s \n'\
            % self.name)
            self.event.clear()

t1 = producer(items, event)
t2 = consumer(items, event)
t1.start()
t2.start()
t1.join()
t2.join()

## Thread communication using a queue

In [None]:
from threading import Thread, Event
from queue import Queue
import time
import random
class producer(Thread):
    def __init__(self, queue):
        Thread.__init__(self)
        self.queue = queue
    def run(self) :
        for i in range(10):
            item = random.randint(0, 256)
            self.queue.put(item)
            print ('Producer notify: item N°%d appended to queue by %s \n'% (item, self.name))
            time.sleep(1)
class consumer(Thread):
    def __init__(self, queue):
        Thread.__init__(self)
        self.queue = queue
    def run(self):
        while True:
            item = self.queue.get()
            print ('Consumer notify : %d popped from queue by %s'\
            % (item, self.name))
            self.queue.task_done()
            
queue = Queue()
t1 = producer(queue)
t2 = consumer(queue)
t3 = consumer(queue)
t4 = consumer(queue)
t1.start()
t2.start()
t3.start()
t4.start()
t1.join()
t2.join()
t3.join()
t4.join()

## Evaluating the performance of multithread applications
我们将验证GIL的影响，评估多线程应用程序的性能。如上一章所述，GIL是CPython解释器引入的锁。 GIL阻止在解释器中并行执行多个线程。在执行之前，每个线程必须等待GIL释放正在运行的线程。实际上，解释器会强制执行线程在GIL访问解释器本身上的任何内容（如Python对象的堆栈和实例）之前获取GIL。这正是GIL的目的-阻止并发访问来自不同线程的Python对象。然后，GIL保护解释器的内存，并使垃圾以正确的方式工作。事实是，GIL阻止程序员通过并行执行线程来提高性能。如果我们从CPython解释器中删除GIL，则线程将并行执行。 GIL不会阻止进程在其他处理器上运行，它只是一次只允许一个线程进入解释器内部。

In [2]:
from threading import Thread

class threads_object(Thread):
    def run(self):
        function_to_run()

class nothreads_object(object):
    def run(self):
        function_to_run()
        
def non_threaded(num_iter):
    funcs = []
    for i in range(int(num_iter)):
        funcs.append(nothreads_object())
        for i in funcs:
            i.run()
def threaded(num_threads):
    funcs = []
    for i in range(int(num_threads)):
        funcs.append(threads_object())
        for i in funcs:
            i.start()
        for i in funcs:
            i.join()
def function_to_run():
    pass

def show_results(func_name, results):
    print ("%-23s %4.6f seconds" % (func_name, results))

    
import sys
from timeit import Timer
repeat = 100
number = 1
num_threads = [ 1, 2, 4, 8]
print ('Starting tests')
for i in num_threads:
    t = Timer("non_threaded(%s)"% i, "from __main__ import non_threaded")
    best_result =min(t.repeat(repeat=repeat, number=number))
    show_results("non_threaded (%s iters)" % i, best_result)
    t = Timer("threaded(%s)" % i, "from __main__ import threaded")
    best_result = min(t.repeat(repeat=repeat, number=number))
    show_results("threaded (%s threads)"% i, best_result)
print ('Iterations complete')

Starting tests
non_threaded (1 iters)  0.000001 seconds
threaded (1 threads)    0.000051 seconds
non_threaded (2 iters)  0.000003 seconds


RuntimeError: threads can only be started once