# 并发

# 线程

In [20]:
import threading, time

def working():
    count = 0
    while True:
        time.sleep(1)
        print('i am working', count)
        count += 1
        if count > 5:
            break

t = threading.Thread(target=working)
t.start()

print('==end==')

==end==
i am working 0
i am working 1
i am working 2
i am working 3
i am working 4
i am working 5


In [2]:
import threading, time

def threadingifo():
    print('current thread = {}'.format(threading.current_thread()))
    print('main thread = {}'.format(threading.main_thread()))
    print('active count = {}'.format(threading.active_count()))

def working():
    count = 0
    threadingifo()
    while True:
        time.sleep(1)
        print('i am working', count)
        count += 1
        if count > 5:
            break

t = threading.Thread(target=working, name='work1')
threadingifo()
t.start()

print('===end===')

current thread = <_MainThread(MainThread, started 42688)>
main thread = <_MainThread(MainThread, started 42688)>
active count = 6
current thread = <Thread(work1, started 45108)>
main thread = <_MainThread(MainThread, started 42688)>
active count = 7
===end===


# threading.local()

In [21]:
import threading, time

global_data = threading.local()

def work():
    global_data.x = 0 # 动态为global_data类增加一个x属性，并赋值为0

    for _ in range(100):
        time.sleep(0.001)
        global_data.x += 1
    print(threading.current_thread(), global_data.x)

for i in range(5):
    t = threading.Thread(target=work, name='w-{}'.format(i))
    t.start()

<Thread(w-0, started 42004)><Thread(w-3, started 46000)> 100
 100<Thread(w-4, started 42976)> <Thread(w-1, started 2352)> 100
100
<Thread(w-2, started 40132)> 100



In [31]:
import threading, time

X = 'abc'
ctx = threading.local()
ctx.x = 123

print(ctx, type(ctx), ctx.x)

def worker():
    print(X)
    print(ctx)
    print(ctx.x)# 变量读取错误
    print('working')

worker()
print('-'*30)
# threading.Thread(target=worker).start()  #报错

<_thread._local object at 0x00000000055DCBA0> <class '_thread._local'> 123
abc
<_thread._local object at 0x00000000055DCBA0>
123
working
------------------------------


# 定时器，延时执行

In [32]:
from threading import Thread, Timer

def add(x, y):
    print(x + y)

t = Timer(5, add, (3, 6))
# t.cancel() # 延时取消
t.start()
# t.cancel() # 延时取消
print('======================')

9


# 线程同步--Event

In [7]:
from threading import Thread, Event
import time

cups = []
e = Event()

def worker(event, count=10):
    while True:
        time.sleep(0.5)
        cups.append(1)
        print('i am working', cups)
        if len(cups) > count:
            event.set()
            break

def boss(event):
    print('i am waiting')
    event.wait() # 线程阻塞
    print('good job')

b = Thread(target=boss, args=(e,))
w = Thread(target=worker, args=(e,))

b.start()
w.start()

i am waiting
i am working [1]
i am working [1, 1]
i am working [1, 1, 1]
i am working [1, 1, 1, 1]
i am working [1, 1, 1, 1, 1]
i am working [1, 1, 1, 1, 1, 1]
i am working [1, 1, 1, 1, 1, 1, 1]
i am working [1, 1, 1, 1, 1, 1, 1, 1]
i am working [1, 1, 1, 1, 1, 1, 1, 1, 1]
i am working [1, 1, 1, 1, 1, 1, 1, 1, 1, 1]
i am working [1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1]
good job


In [8]:
# 实现一个timer功能，不能继承thread
from threading import Thread, Event, Timer

class ttimer:
    def __init__(self, interval, fanc, args=(), kwargs={}):
        self.interval = interval
        self.fanc = fanc
        self.args = args
        self.kwargs = kwargs
        self.event = Event()

    def start(self):
        Thread(target=self._run).start()

    def cancel(self):
        print('cancel ~~~~~~~~~~~~~~~~')
        self.event.set()

    def _run(self):
        self.event.wait(self.interval)
        if not self.event.is_set():
            self.fanc(*self.args, **self.kwargs)
        self.event.set()

def add(x, y):
    print(x + y)

t = ttimer(3, add, (3, 5))
t.start()
# t.cancel()

8


# 锁

In [10]:
from threading import Lock
lock = Lock()
lock.acquire(blocking=True, timeout=-1) # 获得锁，谁获得锁，谁的任务完成后必须释放锁
lock.release()

In [16]:
from threading import Thread, Event, Timer, Lock
import time, logging

cups = []
lock = Lock()

def worker(count):
    while True:
        lock.acquire()
        if len(cups) >= count:
            lock.release()
            break
        time.sleep(0.001)
        cups.append(1)
        lock.release()
    print('{}=='.format(len(cups)), end='')

for i in range(10):
    t = Thread(target=worker, name='w-{}'.format(i), args=(1000,))
    t.start()

1000==1000==1000==1000==1000==1000==1000==1000==1000==1000==

# 可重入锁--Rlock

# condition

In [33]:
from threading import Thread, Event, Condition
import random, time, logging

FORMAT = '%(asctime)s %(threadName)s %(thread)s %(message)s'
logging.basicConfig(format=FORMAT, level=logging.INFO)

class Dispatcher:
    def __init__(self):
        self.data = None
        self.cond = Condition()

    def pruducter(self, total):
        for _ in range(total):
            self.data = random.randint(0, 100)
            with self.cond:
                logging.info(self.data)
                # self.cond.notify_all()
                self.cond.notify(2)
            time.sleep(1)

    def consumer(self):
        while True:
            with self.cond:
                self.cond.wait()
                logging.info(self.data)
                # self.data = None
            time.sleep(0.5)

d = Dispatcher()
p = Thread(target=d.pruducter, args=(10,), name='producter')

for i in range(3):
    c = Thread(target=d.consumer, name='consumer')
    c.start()

p.start()

print('='* 40)



# 线程同步--Barrier

In [36]:
from threading import Barrier, Thread

barrier = Barrier(3)

def worker(bar:Barrier):
    print('i am working ', bar.n_waiting)
    bar.wait()
    print('finish my job ', bar.n_waiting)

ts = []
for i in range(3):
    t = Thread(target=worker, args=(barrier,))
    t.start()
    ts.append(t)

i am working  0
i am working i am working  1
 1
finish my job  0
finish my job  0
finish my job  0


# 信号量---semaphore

In [44]:
from threading import Semaphore
sema = Semaphore(3)
print(sema.acquire())
print(sema.acquire())
print(sema._value)
print(sema.acquire())
print(sema._value)
print(sema.acquire(timeout=3))
print(sema._value)

True
True
1
True
0
False
0


# GIL全局解释器锁

In [None]:
GIL：
    CPython在解释器进程级别有一般锁，叫做GIL全局解释器锁
    它保证CPython进程中，只有一个线程执行字节码。甚至在多核CPU的情况下，也是只能允许一个CPU上的一个线程在运行
    
IO密集型：由于线程阻塞，可以调用其他线程
    比如文件读取，加载，初始化的情况下，可以用多线程，barrier

CPU密集型：大量的数据计算
    由于有GIL的存在，多线程在这种情况下，就没有任何优势，和单线程执行效率相当