In [1]:
import multiprocessing
multiprocessing.cpu_count()

8

In [77]:
from multiprocessing import Process, Queue, Pipe, Manager
import os, sys, time
from os import getpid
from random import randint
from time import time, sleep

# 进程

## Multiprocessing

In [50]:
def download_task(filename):
    print('启动下载进程，进程号[%d].' % getpid())
    print('开始下载%s...' % filename)
    time_to_download = randint(5, 10)
    sleep(time_to_download)
    print('%s下载完成! 耗费了%d秒' % (filename, time_to_download))


def main():
    start = time()
    p1 = Process(target = download_task, args = ('Python从入门到住院.pdf', ))
    p1.start()
    p2 = Process(target=download_task, args=('Peking Hot.avi', ))
    p2.start()
    p1.join()
    p2.join()
    end = time()
    print('总共耗费了%.2f秒.' % (end - start))

if __name__ == '__main__':
    main()

启动下载进程，进程号[78542].
启动下载进程，进程号[78543].
开始下载Python从入门到住院.pdf...
开始下载Peking Hot.avi...
Python从入门到住院.pdf下载完成! 耗费了6秒
Peking Hot.avi下载完成! 耗费了8秒
总共耗费了8.03秒.


## Fork

In [64]:
import os
print("Process (%s) start..." % os.getpid())    #获取当前进程的id
pid = os.fork()
if pid == 0:
    print("子进程为 (%s) ，主进程为 %s" % (os.getpid(), os.getppid()))
else:
    print('I (%s) just created a child process (%s).' % (os.getpid(), pid))

Process (74287) start...
I (74287) just created a child process (87621).
子进程为 (87621) ，主进程为 74287


## Pool

In [67]:
from multiprocessing import Pool
import os, time, random

def long_time_task(name):
    print('启动下载进程，进程号[%d].' % getpid())
    print('开始下载%s...' % name)
    start = time.time()
    time.sleep(random.random() * 2)
    end = time.time()
    print('%s 执行时长为 %0.2f seconds.' % (name, (end - start)))
    
if __name__=='__main__':
    print("主进程 %s." % os.getpid())
    p = Pool(9)    #pool默认大小是CPU核数
    for i in range(9):
        p.apply_async(long_time_task, args = ("任务" + str(i), ))
    print("等待所有子进程执行")
    p.close()
    p.join()           
    '''对Pool对象调用join()方法会等待所有子进程执行完毕，
       调用join()之前必须先调用close()，调用close()之后就不能继续添加新的Process了。'''
    print("执行完毕")

主进程 74287.
启动下载进程，进程号[87838].
启动下载进程，进程号[87839].
启动下载进程，进程号[87840].
启动下载进程，进程号[87841].
启动下载进程，进程号[87843].
启动下载进程，进程号[87842].
启动下载进程，进程号[87844].
启动下载进程，进程号[87845].
开始下载任务0...
开始下载任务3...
开始下载任务1...
启动下载进程，进程号[87846].
开始下载任务5...
开始下载任务2...
开始下载任务4...
开始下载任务6...
开始下载任务8...
开始下载任务7...
等待所有子进程执行
任务8 执行时长为 0.18 seconds.
任务6 执行时长为 0.61 seconds.
任务3 执行时长为 0.74 seconds.
任务7 执行时长为 0.81 seconds.
任务2 执行时长为 0.98 seconds.
任务5 执行时长为 1.32 seconds.
任务4 执行时长为 1.37 seconds.
任务0 执行时长为 1.38 seconds.
任务1 执行时长为 1.88 seconds.
执行完毕


## Queue

In [73]:
def fun1(q,i):
    print('子进程%s 开始put数据' %i)
    q.put('我是%s 通过Queue通信' %i)

if __name__ == '__main__':
    q = Queue()

    process_list = []
    for i in range(3):
        p = Process(target=fun1,args=(q,i,))  #注意args里面要把q对象传给我们要执行的方法，这样子进程才能和主进程用Queue来通信
        p.start()
        process_list.append(p)

    for i in process_list:
        p.join()

    print('主进程获取Queue数据')
    print(q.get())
    print(q.get())
    print(q.get())
    print('结束测试')

子进程0 开始put数据
子进程1 开始put数据
子进程2 开始put数据
主进程获取Queue数据
我是0 通过Queue通信
我是1 通过Queue通信
我是2 通过Queue通信
结束测试


## Pipe

In [76]:
def fun1(conn):
    print('子进程发送消息：')
    conn.send('你好主进程')
    print('子进程接受消息：')
    print(conn.recv())
    conn.close()

if __name__ == '__main__':
    conn1, conn2 = Pipe() #关键点，pipe实例化生成一个双向管
    p = Process(target=fun1, args=(conn2,)) #conn2传给子进程
    p.start()
    print('主进程接受消息：')
    print(conn1.recv())
    print('主进程发送消息：')
    conn1.send("你好子进程")
    p.join()
    print('结束测试')

子进程发送消息：
子进程接受消息：
你好子进程
主进程接受消息：
你好主进程
主进程发送消息：
结束测试


## Manager

In [78]:
def fun1(dic,lis,index):

    dic[index] = 'a'
    dic['2'] = 'b'    
    lis.append(index)    #[0,1,2,3,4,0,1,2,3,4,5,6,7,8,9]
    #print(l)

if __name__ == '__main__':
    with Manager() as manager:
        dic = manager.dict()#注意字典的声明方式，不能直接通过{}来定义
        l = manager.list(range(5))#[0,1,2,3,4]

        process_list = []
        for i in range(10):
            p = Process(target=fun1, args=(dic,l,i))
            p.start()
            process_list.append(p)

        for res in process_list:
            res.join()
        print(dic)
        print(l)

{0: 'a', '2': 'b', 1: 'a', 2: 'a', 3: 'a', 4: 'a', 5: 'a', 6: 'a', 7: 'a', 8: 'a', 9: 'a'}
[0, 1, 2, 3, 4, 0, 1, 2, 3, 4, 5, 6, 7, 8, 9]


## Subprocess

In [68]:
import subprocess

print("$ nslookup www.python.org")
r = subprocess.call(['nslookup', 'www.python.org'])
print("Exit code:", r)

$ nslookup www.python.org
Exit code: 0


In [69]:
print('$ nslookup')
p = subprocess.Popen(['nslookup'], stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
output, err = p.communicate(b'set q=mx\npython.org\nexit\n')
print(output.decode('utf-8'))
print('Exit code:', p.returncode)

$ nslookup
Server:		222.200.115.251
Address:	222.200.115.251#53

Non-authoritative answer:
python.org	mail exchanger = 50 mail.python.org.

Authoritative answers can be found from:
python.org	nameserver = ns4.p11.dynect.net.
python.org	nameserver = ns1.p11.dynect.net.
python.org	nameserver = ns2.p11.dynect.net.
python.org	nameserver = ns3.p11.dynect.net.
mail.python.org	internet address = 188.166.95.178
ns1.p11.dynect.net	internet address = 208.78.70.11
ns2.p11.dynect.net	internet address = 204.13.250.11
ns3.p11.dynect.net	internet address = 208.78.71.11
ns4.p11.dynect.net	internet address = 204.13.251.11
mail.python.org	has AAAA address 2a03:b0c0:2:d0::71:1


Exit code: 0


# 线程

In [1]:
import time, threading

In [2]:
# 新线程执行的代码:
def loop():
    print("thread %s is running..." % threading.current_thread().name)
    n = 0
    while n < 5:
        n += 1
        print("thread %s >>> %s" % (threading.current_thread().name, n))
        time.sleep(1)
    print("thread %s ended." % threading.current_thread().name)
    
print("thread %s is running..." % threading.current_thread().name)
t = threading.Thread(target= loop, name = "LoopThread")
t.start()
t.join()
print("thread %s ended." % threading.current_thread().name)

thread MainThread is running...
thread LoopThread is running...
thread LoopThread >>> 1
thread LoopThread >>> 2
thread LoopThread >>> 3
thread LoopThread >>> 4
thread LoopThread >>> 5
thread LoopThread ended.
thread MainThread ended.


In [4]:
# 假定这是你的银行存款:
balance = 0
lock = threading.Lock()

def change_it(n):
    # 先存后取，结果应该为0:
    global balance
    balance = balance + n
    balance = balance - n

def run_thread(n):
    for i in range(100000):
        # 先要获取锁:
        lock.acquire()
        try:
            # 放心地改吧:
            change_it(n)
        finally:
            # 改完了一定要释放锁:
            lock.release()

t1 = threading.Thread(target=run_thread, args=(5,))
t2 = threading.Thread(target=run_thread, args=(8,))
t1.start()
t2.start()
t1.join()
t2.join()
print(balance)

0


### ThreadLocal

In [2]:
import threading

'''创建全局ThreadLocal对象'''
local_school = threading.local()

def process_student():
    '''获取当前线程关联的student'''
    std = local_school.student
    print("Hello, %s (in %s)" % (std, threading.current_thread().name))
    
def process_thread(name):
    '''绑定ThreadLocal的的student'''
    local_school.student = name
    process_student()
    
t1 = threading.Thread(target = process_thread, args = ("Alice",), name = "Thread-A")
t2 = threading.Thread(target = process_thread, args = ("Bob",), name = "Thread-B")
t1.start()
t2.start()
t1.join()
t2.join()

Hello, Alice (in Thread-A)
Hello, Bob (in Thread-B)


#### 全局变量`local_school`就是一个`ThreadLocal`对象，每个`Thread`对它都可以读写`student`属性，但互不影响。你可以把`local_school`看成全局变量，但每个属性如`local_school.student`都是线程的局部变量，可以任意读写而互不干扰，也不用管理锁的问题，`ThreadLocal`内部会处理。

#### 可以理解为全局变量`local_school`是一个`dict`，不但可以用`local_school.student`，还可以绑定其他变量，如`local_school.teacher`等等。

#### `ThreadLocal`最常用的地方就是为每个线程绑定一个数据库连接，`HTTP`请求，用户身份信息等，这样一个线程的所有调用到的处理函数都可以非常方便地访问这些资源。

In [1]:
import _thread, time
# 定义线程函数
def print_time(threadName, delay):
    count = 0
    while count < 5:
        time.sleep(delay)
        count += 1
        # 返回当前时间的时间戳（1970纪元后经过的浮点秒数）, 并格式化输出
        print("{}: {}".format(threadName, time.ctime(time.time()) ))
try:
    _thread.start_new_thread( print_time, ("Thread-1", 2))
    _thread.start_new_thread( print_time, ("Thread-2", 4))
except:
    print("Error")

while 1:
    # 让线程有足够的时间完成
    pass

Thread-1: Sun Mar 17 14:40:29 2019
Thread-2: Sun Mar 17 14:40:31 2019
Thread-1: Sun Mar 17 14:40:31 2019
Thread-1: Sun Mar 17 14:40:33 2019
Thread-2: Sun Mar 17 14:40:35 2019
Thread-1: Sun Mar 17 14:40:35 2019
Thread-1: Sun Mar 17 14:40:37 2019
Thread-2: Sun Mar 17 14:40:39 2019
Thread-2: Sun Mar 17 14:40:43 2019
Thread-2: Sun Mar 17 14:40:47 2019


KeyboardInterrupt: 

# 协程

In [8]:
import asyncio

@asyncio.coroutine
def hello():
    print("Hello world!")
    # 异步调用asyncio.sleep(1):
    r = yield from asyncio.sleep(1)
    print("Hello again!")

# 获取EventLoop:
loop = asyncio.get_event_loop()
# 执行coroutine
loop.run_until_complete(hello())
loop.close()

RuntimeError: This event loop is already running

Hello world!
Hello again!


In [6]:
@asyncio.coroutine
def hello():
    print('Hello world! (%s)' % threading.currentThread())
    yield from asyncio.sleep(1)
    print('Hello again! (%s)' % threading.currentThread())

loop = asyncio.get_event_loop()
tasks = [hello(), hello()]
loop.run_until_complete(asyncio.wait(tasks))
loop.close()

RuntimeError: This event loop is already running

Hello world! (<_MainThread(MainThread, started 4571387328)>)
Hello world! (<_MainThread(MainThread, started 4571387328)>)
Hello again! (<_MainThread(MainThread, started 4571387328)>)
Hello again! (<_MainThread(MainThread, started 4571387328)>)


In [7]:
@asyncio.coroutine
def wget(host):
    print('wget %s...' % host)
    connect = asyncio.open_connection(host, 80)
    reader, writer = yield from connect
    header = 'GET / HTTP/1.0\r\nHost: %s\r\n\r\n' % host
    writer.write(header.encode('utf-8'))
    yield from writer.drain()
    while True:
        line = yield from reader.readline()
        if line == b'\r\n':
            break
        print('%s header > %s' % (host, line.decode('utf-8').rstrip()))
    # Ignore the body, close the socket
    writer.close()

loop = asyncio.get_event_loop()
tasks = [wget(host) for host in ['www.sina.com.cn', 'www.sohu.com', 'www.163.com']]
loop.run_until_complete(asyncio.wait(tasks))
loop.close()

RuntimeError: This event loop is already running

wget www.163.com...
wget www.sohu.com...
wget www.sina.com.cn...
www.sohu.com header > HTTP/1.1 200 OK
www.sohu.com header > Content-Type: text/html;charset=UTF-8
www.sohu.com header > Connection: close
www.sohu.com header > Server: nginx
www.sohu.com header > Date: Thu, 16 May 2019 16:53:11 GMT
www.sohu.com header > Cache-Control: max-age=60
www.sohu.com header > X-From-Sohu: X-SRC-Cached
www.sohu.com header > Content-Encoding: gzip
www.sohu.com header > FSS-Cache: HIT from 3963534.5929624.5300396
www.sohu.com header > FSS-Proxy: Powered by 2695201.3416107.4009004
www.163.com header > HTTP/1.1 302 Moved Temporarily
www.163.com header > Date: Thu, 16 May 2019 16:54:11 GMT
www.163.com header > Content-Length: 0
www.163.com header > Connection: close
www.163.com header > Server: Cdn Cache Server V2.0
www.163.com header > Location: http://www.163.com/special/0077jt/error_isp.html
www.163.com header > X-Via: 1.0 PSgdzjdx6qe66:3 (Cdn Cache Server V2.0)
www.sina.com.cn header > HTTP/1.1 302 