In [4]:
import os

print('Process (%s) start...' % os.getpid())
# Only works on Unix/Linux/Mac:
pid = os.fork()
if pid == 0:
    print('I am child process (%s) and my parent is %s.' % (os.getpid(), os.getppid()))
else:
    print('I (%s) just created a child process (%s).' % (os.getpid(), pid))

Process (1845) start...
I (1845) just created a child process (3807).
I am child process (3807) and my parent is 1845.


In [5]:
from multiprocessing import Process, Queue
import os, time, random

# 写数据进程执行的代码:
def write(q):
    print('Process to write: %s' % os.getpid())
    for value in ['A', 'B', 'C']:
        print('Put %s to queue...' % value)
        q.put(value)
        time.sleep(random.random())

# 读数据进程执行的代码:
def read(q):
    print('Process to read: %s' % os.getpid())
    while True:
        value = q.get(True)
        print('Get %s from queue.' % value)

if __name__=='__main__':
    # 父进程创建Queue，并传给各个子进程：
    q = Queue()
    pw = Process(target=write, args=(q,))
    pr = Process(target=read, args=(q,))
    # 启动子进程pw，写入:
    pw.start()
    # 启动子进程pr，读取:
    pr.start()
    # 等待pw结束:
    pw.join()
    # pr进程里是死循环，无法等待其结束，只能强行终止:
    pr.terminate()

Process to write: 4040
Process to read: 4041
Put A to queue...
Get A from queue.
Put B to queue...
Get B from queue.
Put C to queue...
Get C from queue.


In [None]:
#!/usr/bin/env python3
# -*- coding:utf8 -*-
from threading import Thread, Condition
import time
import random
import threading


def consumer(event, l):
    t = threading.current_thread()
    while 1:
        event_is_set = event.wait()
        if event_is_set:
            try:
                integer = l.pop()
                print('{} popped from list by {}'.format(integer, t.name))
                event.clear()  # 重置事件状态
            except IndexError:  # 为了让刚启动时容错
                pass
def producer(event, l):
    t = threading.current_thread()
    while 1:
        integer = random.choice(range(5))
        l.append(integer)
        print('{} appended to list by {}'.format(integer, t.name))
        event.set()	 # 设置事件
        time.sleep(1)
event = threading.Event()
l = []
threads = []
for name in ('consumer1', 'consumer2'):
    t = threading.Thread(name=name, target=consumer, args=(event, l))
    t.start()
    threads.append(t)
p = threading.Thread(name='producer1', target=producer, args=(event, l))
p.start()
threads.append(p)
for t in threads:
    t.join()

# # semaphore = threading.Semaphore(3)
# semaphore = threading.BoundedSemaphore(3)
#
# def fun():
#     print("Thread %s is waiting semphore\n" % threading.currentThread().getName())
#     semaphore.acquire()
#     print("Thread %s get semphore\n" % threading.currentThread().getName())
#     time.sleep(1)
#     print("Thread %s release semphore\n" % threading.currentThread().getName())
#     semaphore.release()
#
#
# if __name__ == "__main__":
#     t1 = threading.Thread(target=fun)
#     t2 = threading.Thread(target=fun)
#     t3 = threading.Thread(target=fun)
#     t4 = threading.Thread(target=fun)
#
#     t1.start()
#     t2.start()
#     t3.start()
#     t4.start()
#
#     t1.join()
#     t2.join()
#     t3.join()
#     t4.join()
#
#     semaphore.release()  #这里因为是简单的Semaphore,所以可以再次释放，不会报错，而BoundedSemaphore，则会报错


# queue = []
# MAX_NUM = 10
# condition = Condition()
#
# class ProducerThread(Thread):
#     def run(self):
#         nums = range(5)
#         global queue
#         while True:
#             condition.acquire()
#             if len(queue) == MAX_NUM:
#                 print('queue is full')
#                 condition.wait()
#                 print('got notified, space in queue')
#             num = random.choice(nums)
#             queue.append(num)
#             print('produced %s' % num)
#             condition.notify()
#             condition.release()
#             time.sleep(random.random())
#
# class ConsumerThread(Thread):
#     def run(self):
#         global queue
#         while True:
#             condition.acquire()
#             if not queue:
#                 print("Nothing in queue, consumer is waiting")
#                 condition.wait()
#                 print("Producer added something to queue and notified the consumer")
#             num = queue.pop(0)
#             print('consumed %s' % num)
#             condition.notify()
#             condition.release()
#             time.sleep(random.random())
#
#
# ProducerThread().start()
# ConsumerThread().start()


# from threading import Thread
# import time
# import random
# from Queue import Queue
#
# queue = Queue(10)
#
# class ProducerThread(Thread):
#     def run(self):
#         nums = range(5)
#         global queue
#         while True:
#             num = random.choice(nums)
#             queue.put(num)
#             print "Produced", num
#             time.sleep(random.random())
#
# class ConsumerThread(Thread):
#     def run(self):
#         global queue
#         while True:
#             num = queue.get()
#             queue.task_done()
#             print "Consumed", num
#             time.sleep(random.random())
#
# ProducerThread().start()
# ConsumerThread().start()


## threading

In [9]:
import logging
import threading

LOGGER = logging.getLogger()
LOGGER.setLevel(logging.DEBUG)
handler = logging.StreamHandler()
handler.setFormatter(
    logging.Formatter('[%(levelname)s] (%(threadName)-10s) %(message)s')
)
LOGGER.handlers = [handler]

# 使用logging而不是print
def worker(num):
    # current_thread方法获取当前线程
    LOGGER.debug('%s started', threading.current_thread().getName())
    LOGGER.debug(num)
    LOGGER.debug('%s finished', threading.current_thread().getName())
    
for i in range(5):
    # threading.Thread直接构造thread object，args和kwargs传入参数
    t = threading.Thread(name='Thread %s' % i, target=worker, args=(i, ))
    t.start() # 启动线程
    # threads can only be started once
# 打印结果会体现操作系统对线程的抢占式分配

# ---------------

# daemon vs non-daemon thread
# daemon线程作为后台线程，如果只剩下daemon类线程的话，程序不会等待它们执行结束，而是直接退出
# 所以daemon适合做heartbeat之类的事情
# The entire Python program exits when no alive non-daemon threads are left.
import time

def daemon():
    logging.debug('Starting...')
    time.sleep(2)
    logging.debug('Ending...')
    
def non_daemon():
    logging.debug('Starting...')
    logging.debug('Ending...')
    
d = threading.Thread(name='daemon', target=daemon, daemon=True)
t = threading.Thread(name='non-daemon', target=non_daemon)

d.start()
t.start()
# 这种情况下daemon的`Ending...`不会打印出来，因为main thread不会等待它完成
# !!!注意在jupyter和shell里面是无法体现出来的，需要拿脚本测试
# 需要使用join来解决
d.join()
t.join()
# 此时会打印daemon的`Ending...`，因为
# Wait until the thread terminates.
# This blocks the calling thread until the thread whose join() method is called terminates – 
# either normally or through an unhandled exception – or until the optional timeout occurs.
d.join(0.1)
print('d.isAlive()', d.isAlive())
t.join()
# join方法提供了timeout的参数，可以理解成wait的时间限制，超时的话就不再等待

# ---------------

# 遍历所有存活的thread：enumerate方法
# Return a list of all Thread objects currently alive. 
# The list includes daemonic threads, dummy thread objects created by current_thread(), 
# and the main thread. It excludes terminated threads and threads that have not yet been started.
main_thread = threading.main_thread() # main_thread方法获取主线程
for t in threading.enumerate():
    if t is main_thread:
        continue # enumerate的结果也包括主线程，join主线程会造成死锁
    # t.join()

# ---------------

# 运行线程的两种方法
# 1. 直接构造thread object：
# class threading.Thread(group=None, target=None, name=None, args=(), kwargs={}, *, daemon=None)
# 2. 继承Thread类，实现run方法
class MyThread(threading.Thread):
    def run(self): # 可以扩展更多参数 args & kwargs
        logging.debug('Running...')

[DEBUG] (Thread 0  ) Thread 0 started
[DEBUG] (Thread 1  ) Thread 1 started
[DEBUG] (Thread 0  ) 0
[DEBUG] (Thread 2  ) Thread 2 started
[DEBUG] (Thread 3  ) Thread 3 started
[DEBUG] (Thread 4  ) Thread 4 started
[DEBUG] (daemon    ) Starting...
[DEBUG] (Thread 1  ) 1
[DEBUG] (non-daemon) Starting...
[DEBUG] (Thread 0  ) Thread 0 finished
[DEBUG] (Thread 2  ) 2
[DEBUG] (Thread 3  ) 3
[DEBUG] (Thread 4  ) 4
[DEBUG] (Thread 1  ) Thread 1 finished
[DEBUG] (non-daemon) Ending...
[DEBUG] (Thread 2  ) Thread 2 finished
[DEBUG] (Thread 3  ) Thread 3 finished
[DEBUG] (Thread 4  ) Thread 4 finished
[DEBUG] (daemon    ) Ending...


d.isAlive() False


## signaling between threads

In [None]:
import threading

# Lock object
lock = threading.Lock()
# lock有locked和unlocked两种状态，初始化为unlocked
# acquire方法默认会阻塞等待或者直接返回并且设置lock为locked状态
# 设置blocking为False不会阻塞，设置timeout会等待等长的时间再返回
lock.acquire(blocking=True, timeout=-1)
# 只应该在locked state下执行，否则会RuntimeError
lock.release()

# ---------------

# RLock object
rlock = threading.RLock()
# 可以不被阻塞地被同一个线程调用多次
# 但是release需要和acquire同样次数才能够释放锁
rlock.acquire(blocking=True, timeout=-1)
rlock.release()

# ---------------

# 用with来省去acquire和release的调用
def worker_with(lock):
    with lock:
        logging.debug('Lock acquired via with')


def worker_no_with(lock):
    lock.acquire()
    try:
        logging.debug('Lock acquired directly')
    finally:
        lock.release()
        
# ---------------

# Condition Variable object
# 条件变量总是和锁关联在一起，初始化的时候传入可以使多个CV共用一个lock
# threading.Condition(lock=None) lock应该是Lock或者RLock类型
# cv也实现了context management protocal，所以可以使用with
# The wait() method releases the lock, 
# and then blocks until another thread awakens it by calling notify() or notify_all(). 
# Once awakened, wait() re-acquires the lock and returns. It is also possible to specify a timeout.
# Consume one item

# acquire(*args)
# release()
# wait(timeout=None)
# wait_for(predicate, timeout=None) predicate应该是callable，返回一个boolean
# notify(n=1)
# notify_all()

cv = threading.Condition()

with cv:
    while not an_item_is_available():
        cv.wait()
    # while is necessary because wait() can return after an arbitrary long time,
    # and the condition which prompted the notify() call may no longer hold true.
    # 也就是说wait方法可能会返回，但是如果不用while做条件检查的话可能会出问题，因为那个时候
    # item还不一定available
    get_an_available_item()

# Produce one item
with cv:
    make_an_item_available()
    cv.notify() # notify的时候应保证thread持有lock，所以notify之后应该release，这里用了with保证这一点
    
# 可以使用wait_for来省略while loop
# Consume an item
with cv:
    cv.wait_for(an_item_is_available)
    get_an_available_item()
    
# ---------------

# semophore object
# 信号量允许多个线程持有某个资源，内部维护了一个计数器
# 调用acquire方法，计数器减一；调用release方法，计数器加一；计数器为0时，acquire方法会阻塞
# threading.Semaphore(value=1)，value默认为1

# ---------------

# Event object
e = threading.Event()
e.is_set() # 初始化的flag为False，所以is_set会返回False
e.set() # 设置flag为True
e.clear() # 设置flag为False
e.wait(timeout=2) # 如果flag为False则一直block，除非到达timeout
# 用来实现threads之间的signaling，注意和锁的区别，比如set之后，在waiting的thread都可以
# 收到信号然后wake up，而不是只唤醒其中一个；clear用来unset，此时wait的thread均会block

## threading.local

In [17]:
import threading

local_data = threading.local()
local_data.value = 1 # 在每个thread里面直接set/get就可以，不会互相影响

# 初始化
class MyLocal(threading.local):
    def __init__(self, value):
        super().__init__()
        self.value = value
        
my_local_data = MyLocal(1000)
print(my_local_data.value)

1000


生产者/消费者模型实现