# 启动与停止线程

In [49]:
import time
from threading import Thread
import threading

In [3]:
def countdown(n):
    while n > 0:
        print('T-minus', n)
        n -= 1
    time.sleep(3)

In [5]:
t = Thread(target=countdown, args=(5,))
t.start()

T-minus 5
T-minus 4
T-minus 3
T-minus 2
T-minus 1


In [6]:
if t.is_alive():
    print('Still running')
else:
    print('Completed')

Completed


In [8]:
# 加入当前线程
t.join()

In [9]:
class CountdownTask:
    def __init__(self):
        self._running = True
    def terminate(self):
        self._running = False
    def run(self, n):
        while self._running and n > 0:
            print('T-minus', n)
            n -= 1
            time.sleep(5)

In [10]:
c = CountdownTask()
t = Thread(target=c.run, args=(10,))
t.start()
c.terminate()
t.join()

T-minus 10


# 判断线程是否已经启动

In [11]:
from threading import Event

In [12]:
def countdown(n, started_evt):
    print('countdown starting')
    started_evt.set()
    while n > 0:
        print('T-mins', n)
        n -= 1
        time.sleep(5)

In [13]:
started_evt = Event()
print('Launching countdown')
t = Thread(target=countdown, args=(10, started_evt))
t.start()
started_evt.wait()
print('countdown is running')

Launching countdown
countdown starting
T-mins 10
countdown is running
T-mins 9
T-mins 8
T-mins 7
T-mins 6
T-mins 5
T-mins 4
T-mins 3
T-mins 2
T-mins 1


In [16]:
def worker(n, sema):
    sema.acquire()
    print(f'Working {n}')

In [17]:
from threading import Semaphore

In [19]:
sema = Semaphore(0)

In [20]:
nworkers = 10
for n in range(nworkers):
    t = Thread(target=worker, args=(n, sema))
    t.start()

In [21]:
sema.release()

Working 0


In [22]:
sema.release()

Working 1


# 线程间通信
- 消息队列

In [1]:
from queue import Queue

In [24]:
_sentinal = object()

In [3]:
q = Queue()

In [4]:
q.put('hello')

In [5]:
q.get()

'hello'

In [14]:
q.put('hello')

In [15]:
q.put('world')

In [16]:
q.put(123)

In [19]:
q.queue

deque(['hello', 'world', 123])

In [21]:
q.qsize()

3

In [22]:
q.get()

'hello'

In [24]:
q.queue

deque(['world', 123])

In [25]:
q.get()

'world'

In [26]:
q.queue

deque([123])

In [27]:
q.get()

123

In [28]:
q.queue

deque([])

In [29]:
q.get(timeout=3)

Empty: 

In [30]:
q.task_done()

# 给关键部分加锁

In [36]:
from threading import Lock

In [37]:
class ShareCounter:
    def __init__(self, initial_value=0):
        self._value = initial_value
        self._value_lock = Lock()
        
    def incr(self, delta=1):
        with self._value_lock:
            self._value += delta
            
    def decr(self, delta=1):
        with self._value_lock:
            self._value -= delta

In [38]:
sc = ShareCounter()

In [46]:
def operate():
    for _ in range(10000000):
        sc.incr()
        sc.decr()

In [42]:
t1 = Thread(target=operate)
t2 = Thread(target=operate)
t1.start()
t2.start()
t1.join()
t2.join()
sc._value

0

In [43]:
class ShareCounter2:
    def __init__(self, initial_value=0):
        self._value = initial_value
        
    def incr(self, delta=1):
        self._value += delta
            
    def decr(self, delta=1):
        self._value -= delta

In [44]:
sc = ShareCounter2()

In [47]:
# without lock there will be wrong
t1 = Thread(target=operate)
t2 = Thread(target=operate)
t1.start()
t2.start()
t1.join()
t2.join()
sc._value

-11

# 防止死锁的加锁机制
- 死锁：一个线程获得了第一个锁，在获取第二个锁时发生阻塞，那么这个线程可能阻塞其他线程运行，从而导致程序假死

In [48]:
from contextlib import contextmanager

In [52]:
_local = threading.local()
@contextmanager
def accquire(*locks):
    locks = sorted(locks, key=lambda x: id(x))
    accquired = getattr(_local, 'accquired', [])
    if accquired and max(id(lock) for lock in accquired) >= id(locks[0]):
        raise RuntimeError('Lock Order Violation')
        
    accquired.extend(locks)
    _local.accquired = accquired
    
    try:
        for lock in locks:
            lock.accquire()
        yield
    finally:
        for lock in reversed(locks):
            lock.release()
        del accquired[-len(locks):]

In [51]:
x_lock = Lock()
y_lock = Lock()

In [53]:
def thread_1():
    while True:
        with accquire(x_lock, y_lock):
            print("Thread-1")
            
def thread_2():
    while True:
        with accquire(x_lock, y_lock):
            print("Thread-2")

In [None]:
t1 = Thread(target=thread_1)
t1.daemon = True
t1.start()

t2 = Thread(target=thread_2)
t2.daemon = True
t2.start()

# 保存线程的状态信息
- thread.local()

# 创建线程池
- from concurrent.futures import ThreadPoolExecutor

# 简单的并行编程

In [1]:
import glob
import codecs

In [2]:
def find_robots(filename):
    robots = []
    robots.extend([line.split()[0] for line in codecs.open(filename,'r', encoding='utf-8') 
                   if line.find('robots.txt') != -1])
    return robots

In [3]:
def find_all_robots(logdir):
    files = glob.glob(logdir + 'data/*.txt')
    all_robots = []
    for robots in map(find_robots, files):
        all_robots.extend(robots)
    return len(all_robots)

In [4]:
%time find_all_robots('./')

Wall time: 235 ms


8640

In [5]:
# 并行编程
from concurrent import futures

In [6]:
def find_all_robots_parallel(logdir):
    files = glob.glob(logdir + 'data/*.txt')
    all_robots = []
    with futures.ProcessPoolExecutor() as pool:
        for robots in pool.map(find_robots, files):
            all_robots.extend(robots)
    return len(all_robots)

In [None]:
%%time 
if __name__ == '__main__':
    find_all_robots_parallel('./')

# 全局锁问题
- GIL只会影响严重依赖CPU的程序，如计算密集型的程序，如果只涉及I/O，如网络交互，则多线程很合适，因为它们的大部分时间都在等待
- 针对依赖CPU的程序
    - 优化底层算法比是使用多线程效果要好很多
        - 比如将性能瓶颈的模块转移到C语言扩展的模块中
        - 比如计算的时候用Numpy这样的扩展
        - 甚至考虑其他可选方案，比如PyPy，其通过JIT编译器来优化执行效率
    - 线程不是专门用来优化性能的
- 解决GIL的问题
    - 在纯Python环境中：使用进程池，使用基于进程的并行编程，来发挥多核CPU的作用
    - 使用C扩展编程技术，将计算密集型任务转移给C，在工作的时候，在C代码中释放GIL

# 定义一个Actor任务
- 每个actor简单地执行发送给它的消息任务
- actor之间的通信是单向（不会接收到一个消息已被处理的通知）和异步（消息发送者不知道消息什么时候被发送）的

In [1]:
from queue import Queue
import threading

In [5]:
# Sentinel used for shutdown
class ActorExit(Exception):
    pass

class Actor:
    def __init__(self):
        self._mailbox = Queue()
        
    def send(self, msg):
        # send a message to the actor
        self._mailbox.put(msg)
        
    def recv(self):
        # receive incoming message
        msg = self._mailbox.get()
        if msg is ActorExit:
            raise ActorExit()
        return msg
    
    def close(self):
        # close the actor, thus shutting it down
        self.send(ActorExit)
        
    def start(self):
        # start concurrent execution
        self._terminated = threading.Event()
        t = threading.Thread(target=self._bootstrap)
        t.daemon = True
        t.start()
        
    def _bootstrap(self):
        try:
            self.run()
        except ActorExit:
            pass
        finally:
            self._terminated.set()
            
    def join(self):
        self._terminated.wait()
        
    def run(self):
        # run method to be implemented by the user
        while True:
            msg = self.recv()
            
class PrintActor(Actor):
    def run(self):
        while True:
            msg = self.recv()
            print('Got:', msg)

In [6]:
p = PrintActor()
p.start()
p.send('hello')
p.send('world')
p.close()
p.join()

Got: hello
Got: world


In [134]:
def checker(func):
    from functools import wraps
    @wraps(func)
    def wrapper(*args, **kwargs):
        ann = func.__annotations__
        sig = inspect.signature(func)
        bind = sig.bind(*args, **kwargs)
        for k, v in bind.arguments.items():
            if k in ann:
                assert isinstance(v, ann[k]), f'Type Error: Expected {ann[k]}'
        return func(*args, **kwargs)
    return wrapper

In [138]:
@checker
def add(a:int, b) -> int:
    while b:
        a, b = b, a % b
    return a

In [141]:
import thread

ModuleNotFoundError: No module named 'thread'