# 线程

## sample

In [2]:
#示例
#!/usr/bin/env python
# -*- coding:utf-8 -*-
import threading
import time

def show(arg):
    time.sleep(1)
    print ('thread'+str(arg)+'\n')
    
threads = []
    
for i in range(10):
    t = threading.Thread(target=show, args=(i,))
    threads.append(t)

for t in threads:
    t.start()
for t in threads:
    t.join()

print ('main thread stop')

thread4
thread2

thread8

thread7

thread0

thread6

thread1

thread9

thread5

thread3


main thread stop


## 守护进程

In [5]:
import time
import threading

def run(n):

    print('[%s]------running----\n' % n)
    time.sleep(2)
    print('--done--')

def main():
    for i in range(5):
        t = threading.Thread(target=run,args=[i,])
        #time.sleep(1)
        t.start()
        t.join(1)
        print('starting thread', t.getName())


m = threading.Thread(target=main,args=[])
m.setDaemon(True) #将主线程设置为Daemon线程,它退出时,其它子线程会同时退出,不管是否执行完任务
m.start()
m.join(timeout=2)
print("---main thread done----")

[0]------running----

starting thread Thread-43
[1]------running----

---main thread done----


## 线程锁

In [2]:
import time
import threading

lock = threading.Lock() #生成全局锁

def addNum():
    global num #在每个线程中都获取这个全局变量
    with lock:
        print('--get num:',num )
        time.sleep(1)
        num  -=1 #对此公共变量进行-1操作

num = 10  #设定一个共享变量
thread_list = []
for i in range(num):
    t = threading.Thread(target=addNum)
    t.start()
    thread_list.append(t)

for t in thread_list: #等待所有线程执行完毕
    t.join()


print('final num:', num )

--get num: 10
--get num: 9
--get num: 8
--get num: 7
--get num: 6
--get num: 5
--get num: 4
--get num: 3
--get num: 2
--get num: 1
final num: 0


## 递归锁（Rlock）

In [1]:
import threading,time

def run1():
    print("grab the first part data")
    lock.acquire()
    global num
    num +=1
    lock.release()
    return num
def run2():
    print("grab the second part data")
    lock.acquire()
    global  num2
    num2+=1
    lock.release()
    return num2
def run3():
    lock.acquire()
    res = run1()
    print('--------between run1 and run2-----')
    res2 = run2()
    lock.release()
    print(res,res2)


if __name__ == '__main__':

    num,num2 = 0,0
    lock = threading.RLock()
    for i in range(4):
        t = threading.Thread(target=run3)
        t.start()


    print('----all threads done---')
    print(num,num2)

grab the first part data
--------between run1 and run2-----
grab the second part data
1 1
grab the first part data
--------between run1 and run2-----
grab the second part data
2 2
grab the first part data
--------between run1 and run2-----
grab the second part data
3 3
grab the first part data
--------between run1 and run2-----
grab the second part data
4 4
----all threads done---
4 4


## Semaphore(信号量)

互斥锁 同时只允许一个线程更改数据，而Semaphore是同时允许一定数量的线程更改数据 ，比如厕所有3个坑，那最多只允许3个人上厕所，后面的人只能等里面有人出来了才能再进去。

In [3]:
import threading,time

def run(n):
    global num
    semaphore.acquire()
    time.sleep(1)
    print("run the thread: %s\n" %n)
    num += 1
    semaphore.release()

if __name__ == '__main__':

    num= 0
    semaphore  = threading.BoundedSemaphore(5) #最多允许5个线程同时运行
    for i in range(20):
        t = threading.Thread(target=run,args=(i,))
        t.start()
    
    while threading.activeCount() > 5:
        print(threading.activeCount())
        print(f'num:{num}')
        time.sleep(1)
    else:
        print('----all threads done---')
        print(threading.activeCount())
        print(num)

24
num:0
run the thread: 0
run the thread: 4

23
num:1

run the thread: 3

run the thread: 2

run the thread: 1

19
num:5
run the thread: 5

run the thread: 8

run the thread: 9

run the thread: 6

run the thread: 7

14
num:10
run the thread: 10

run the thread: 11

run the thread: 13
run the thread: 12


run the thread: 14

9
num:15
run the thread: 15

run the thread: 16

run the thread: 17
run the thread: 18


run the thread: 19

----all threads done---
4
20


## Condition类

In [3]:
# encoding: UTF-8
import threading
import time

# 商品
product = None
# 条件变量
con = threading.Condition()

# 生产者方法
def produce():
    global product

    with con:
        while True:
            if product is None:
                print ('produce...')
                product = 'anything'
                # 通知消费者，商品已经生产
                con.notify()
            # 等待通知
            con.wait()
            time.sleep(2)

# 消费者方法
def consume():
    global product

    with con:
        while True:
            if product is not None:
                print( 'consume...')
                product = None
                # 通知生产者，商品已经没了
                con.notify()
            # 等待通知
            con.wait()
            time.sleep(2)


t1 = threading.Thread(target=produce)
t2 = threading.Thread(target=consume)
t1.start()
t2.start()
t1.join(10)


produce...
consume...
produce...
consume...
produce...
consume...
produce...


In [1]:
import threading
import time

condition = threading.Condition()
products = 0

class Producer(threading.Thread):
    def run(self):
        global products
        while True:
            if condition.acquire():
                if products < 10:
                    products += 1;
                    print("Producer(%s):deliver one, now products:%s" %(self.name, products))
                    condition.notify()#不释放锁定，因此需要下面一句
                    condition.release()
                else:
                    print("Producer(%s):already 10, stop deliver, now products:%s" %(self.name, products))
                    condition.wait();#自动释放锁定
                time.sleep(2)

class Consumer(threading.Thread):
    def run(self):
        global products
        while True:
            if condition.acquire():
                if products > 1:
                    products -= 1
                    print("Consumer(%s):consume one, now products:%s" %(self.name, products))
                    condition.notify()
                    condition.release()
                else:
                    print("Consumer(%s):only 1, stop consume, products:%s" %(self.name, products))
                    condition.wait();
                time.sleep(2)

if __name__ == "__main__":
    for p in range(0, 2):
        p = Producer()
        p.start()

    for c in range(0, 3):
        c = Consumer()
        c.setDaemon(True)
        c.start()
        
    c.join(10)
    

Producer(Thread-6):deliver one, now products:1
Producer(Thread-7):deliver one, now products:2
Consumer(Thread-8):consume one, now products:1
Consumer(Thread-9):only 1, stop consume, products:1
Consumer(Thread-10):only 1, stop consume, products:1
Producer(Thread-6):deliver one, now products:2
Consumer(Thread-9):consume one, now products:1
Consumer(Thread-9):only 1, stop consume, products:1
Producer(Thread-7):deliver one, now products:2
Consumer(Thread-8):consume one, now products:1
Producer(Thread-6):deliver one, now products:2
Consumer(Thread-10):consume one, now products:1
Consumer(Thread-10):only 1, stop consume, products:1


In [21]:
import threading
 
alist = None
condition = threading.Condition()
 
def doSet():
    if condition.acquire():
        while alist is None:
            condition.wait()
        for i in range(len(alist))[::-1]:
            print(f'doSet...{i}')
            alist[i] = 1
        condition.release()
 
def doPrint():
    if condition.acquire():
        while alist is None:
            condition.wait()
        for i in alist:
            print(f'doPrint...{i}')
        condition.release()
 
def doCreate():
    global alist
    if condition.acquire():
        if alist is None:
            alist = [0 for i in range(10)]
            print(alist)
            condition.notifyAll()
        condition.release()
 
tset = threading.Thread(target=doSet,name='tset')
tprint = threading.Thread(target=doPrint,name='tprint')
tcreate = threading.Thread(target=doCreate,name='tcreate')
tset.start()
tprint.start()
tcreate.start()

[0, 0, 0, 0, 0, 0, 0, 0, 0, 0]
doSet...9
doSet...8
doSet...7
doSet...6
doSet...5
doSet...4
doSet...3
doSet...2
doSet...1
doSet...0
doPrint...1
doPrint...1
doPrint...1
doPrint...1
doPrint...1
doPrint...1
doPrint...1
doPrint...1
doPrint...1
doPrint...1


## Event类

In [2]:
# encoding: UTF-8
import threading
import time

event = threading.Event()


def func():
    # 等待事件，进入等待阻塞状态
    print('%s wait for event...' % threading.currentThread().getName())
    event.wait()

    # 收到事件后进入运行状态
    print('%s recv event.' % threading.currentThread().getName())


t1 = threading.Thread(target=func)
t2 = threading.Thread(target=func)
t1.start()
t2.start()

time.sleep(2)

# 发送事件通知
print('MainThread set event.')
event.set()

Thread-8 wait for event...
Thread-9 wait for event...
MainThread set event.
Thread-9 recv event.
Thread-8 recv event.


## timer类

In [3]:
# encoding: UTF-8
import threading


def func():
    print('hello timer!')


timer = threading.Timer(5, func)
timer.start()
timer.join()

hello timer!


## local类

In [4]:
# encoding: UTF-8
import threading
 
local = threading.local()
local.tname = 'main'
 
def func():
    local.tname = 'notmain'
    print(local.tname)
 
t1 = threading.Thread(target=func)
t1.start()
t1.join()
 
print(local.tname)

notmain
main


## 线程返回结果

In [1]:

from threading import Thread

class MyThread(Thread):
    def __init__(self, func, *args,**kw):
        super().__init__()
        self.func = func
        self.args = args
        self.kw = kw
        # self.name = name
 
    def run(self):
        self.result = self.func(*self.args,**self.kw)
 
    def get_result(self):
        # Thread.join(self) # 等待线程执行完毕
        try:
            return self.result
        except Exception:
            return None

    def thread_isalive(self):
#         print(Thread.is_alive(self))
        return Thread.is_alive(self)

## 线程池

In [1]:
from concurrent.futures import ThreadPoolExecutor, as_completed
import time
st = time.time()

# 参数times用来模拟网络请求的时间
def get_html(times):
    time.sleep(times)
    print(f"{time.time()-st} get page {times}s finished")
    return times

executor = ThreadPoolExecutor(max_workers=2)
urls = [3, 2, 4] # 并不是真的url
all_task = [executor.submit(get_html, (url)) for url in urls]

for future in as_completed(all_task):
    data = future.result()
    print(f"{time.time()-st} in main: get page {data}s success")

2.0017471313476562 get page 2s finished
2.002741813659668 in main: get page 2s success
3.0014777183532715 get page 3s finished
3.003087043762207 in main: get page 3s success
6.0039684772491455 get page 4s finished
6.0039684772491455 in main: get page 4s success


In [2]:
from concurrent.futures import ThreadPoolExecutor
import time
st = time.time()
# 参数times用来模拟网络请求的时间
def get_html(times):
    time.sleep(times)
    print(f"{time.time()-st} get page {times}s finished")
    return times

executor = ThreadPoolExecutor(max_workers=2)
urls = [3, 2, 4] # 并不是真的url

for data in executor.map(get_html, urls):
    print(f"{time.time()-st} in main: get page {data}s success")

2.0028390884399414 get page 2s finished
3.002140522003174 get page 3s finished
3.004140853881836 in main: get page 3s success
3.004140853881836 in main: get page 2s success
6.005833387374878 get page 4s finished
6.005833387374878 in main: get page 4s success
