https://zhuanlan.zhihu.com/p/64702600

https://www.cnblogs.com/654321cc/p/7683238.html

https://zhuanlan.zhihu.com/p/46798399

In [1]:
#right way to read data not to cause memory to overflow

#read the data in line by line so that only a single line is stored in the RAM at any given time
import os
root = '/srv/spark/shared-data-export/liuy/datasets/video_level_classification/Video_game_extraction/Access_wikidata'

with open(os.path.join(root, 'wikiListOfArticles_nongame.txt')) as f:
    for line in f:
        process(line)

### 使用Process类

***函数 multiprocessing.Process(group=None, target=None, name=None, args=(), kwargs={}, *, daemon=None)***

group为预留参数。

target为可调用对象（函数对象），为子进程对应的活动；相当于multiprocessing.Process子类化中重写的run()方法。

name为线程的名称，默认（None）为"Process-N"。

args、kwargs为进程活动（target）的非关键字参数、关键字参数。

deamon为bool值，表示是否为守护进程。

In [2]:
# main part

"""
def f(a, b = value):
    pass

p = multiprocessing.Process(target = f, args = (a,), kwargs = {b : value}) 
p.start()
p.join()

"""

from multiprocessing import  Process

def fun1(name):
    print('测试%s多进程' %name)

if __name__ == '__main__':
    process_list = []
    for i in range(5):  #开启5个子进程执行fun1函数
        p = Process(target=fun1,args=('Python',)) #实例化进程对象 #创建子进程
        p.start() #启动进程活动, p为进程实例
        process_list.append(p)

    for i in process_list:
        p.join() #使主调进程（包含XXX.join()语句的进程）阻塞，直至被调用进程p运行结束或超时（如指定timeout)

    print('结束测试')

测试Python多进程
测试Python多进程
测试Python多进程
测试Python多进程
测试Python多进程
结束测试


In [3]:
from multiprocessing import Process
import os
def foo():
    print('%s from foo'%os.getpid())
def bar():
    print('%s from bar' % os.getpid())
if __name__ == '__main__':
    p1=Process(target=foo)
    p2=Process(target=bar)
    p1.start()
    p2.start()
    p1.join()
    p2.join()
    print('%s over'%os.getpid())

1589 from foo
1592 from bar
1482 over


### 使用Pool类：

***如果要启动大量的子进程，可以用进程池的方式批量创建子进程：Pool类***


将输入映射到不同的CPU, 并收集所有CPU的输出。等待所有任务完成后，然后返回输出。

输出为：以列表或者数组的形式返回输出

执行中的进程存储在存储器中，而其他非执行进程则存储在存储器之外。


***multiprocessing.Pool(processes=None, initializer=None, initargs=(), maxtasksperchild=None)***

processes ：使用的工作进程的数量，如果processes是None那么使用 os.cpu_count()返回的数量

In [4]:
#main part

"""
def f(a, b = value):
    pass

pool = multiprocessing.Pool() 
pool.apply_async(f, args = (a,), kwds = {b : value})
pool.close()
pool.join()

"""


from  multiprocessing import Process,Pool
import os, time, random

def fun1(name):
    print('Run task %s (%s)...' % (name, os.getpid()))
    start = time.time()
    time.sleep(random.random() * 3)
    end = time.time()
    print('Task %s runs %0.2f seconds.' % (name, (end - start)))

if __name__=='__main__':
    pool = Pool(8) #创建一个6个进程的进程池, 即6个cores

    for i in range(10): ##开启10个子进程执行fun1函数
        pool.apply_async(func=fun1, args=(i,)) 
        #apply_async对应的每个子进程是异步执行的（即并行）。异步执行指的是一批子进程并行执行，且子进程完成一个，就新开始一个，而不必等待同一批其他进程完成

    pool.close() #关闭进程池，关闭后不能往pool中增加新的子进程，然后可以调用join()函数等待已有子进程执行完毕。
    pool.join() ##等待进程池中的子进程执行完毕。需在close()函数后调用。
    print('结束测试')

Run task 3 (1598)...
Run task 1 (1596)...
Run task 4 (1599)...
Run task 2 (1597)...
Run task 5 (1600)...
Run task 6 (1601)...
Run task 7 (1602)...
Task 4 runs 0.01 seconds.
Run task 8 (1599)...
Run task 0 (1595)...
Task 0 runs 0.10 seconds.
Run task 9 (1595)...
Task 6 runs 0.23 seconds.
Task 2 runs 0.83 seconds.
Task 1 runs 1.02 seconds.
Task 5 runs 1.12 seconds.
Task 3 runs 1.56 seconds.
Task 8 runs 1.65 seconds.
Task 9 runs 1.88 seconds.
Task 7 runs 2.31 seconds.
结束测试


对Pool对象调用join()方法会等待所有子进程执行完毕，调用join()之前必须先调用close()，调用close()之后就不能继续添加新的Process了。

In [5]:
from multiprocessing import Pool
import os,time
def foo():
    time.sleep(1)
    print('%s from foo'%os.getpid())
    return 'foo'
def bar():
    time.sleep(2)
    print('%s from bar' % os.getpid())
    return 'bar'
if __name__ == '__main__':
    p=Pool(8)
    t1=time.time()
    res1=p.apply_async(foo) #给出foo()函数结果
    res2=p.apply_async(bar) #给出bar()函数结果
    p.close()
    p.join()
    print(res1)
    print(time.time()-t1)          ##多出来的0.15秒是开启进程所花费的时间
    t2=time.clock()
    print(res1.get())
    print(res2.get())
    print(time.clock()-t2)

#先运行子进程res1 and res2

1646 from foo
1647 from bar
<multiprocessing.pool.ApplyResult object at 0x7fa0b4d2d9e8>
2.0118601322174072
foo
bar
0.00010599999999993948




In [None]:
from multiprocessing import Pool
import time
def test(p):
       print(p)
       time.sleep(3)
if __name__=="__main__":
    pool = Pool(processes=10)
    for i  in range(500):
        '''
        ('\n'
         '	（1）遍历500个可迭代对象，往进程池放一个子进程\n'
         '	（2）执行这个子进程，等子进程执行完毕，再往进程池放一个子进程，再执行。（同时只执行一个子进程）\n'
         '	 for循环执行完毕，再执行print函数。\n'
         '	')
        '''
        pool.apply(test, args=(i,))   #维持执行的进程总数为10，当一个进程执行完后启动一个新进程.
    print('test')
    pool.close()
    pool.join()

### Pool的 map 方法

如果子进程有返回值，且返回值需要集中处理，则建议采用map方式（子进程活动只允许1个参数）

***map在爬虫的领域里也可以使用，比如多个URL的内容爬取，可以把URL放入元祖里，然后传给执行函数。***

例如 https://medium.com/@Sean_Hsu/concurrency-parallelism-in-python-ebdc040e0881

#### 如何使用Pool Map：

如果要执行一百万个任务，则可以创建一个池，其中包含与CPU内核一样多的进程数，然后将百万个任务列表传递给pool.map，池将这些任务分发给工作进程 (通常与可用内核的数量相同），并以列表的形式收集返回值并将其传递给父进程。


***对map的理解：***

pool.map() will loop over the file and deliver lines to the worker function. 

Map blocks and returns the entire result when its done.

the pool.map() is going to read your entire file into memory all at once before dishing out work. 

In [6]:
#map()

"""
XXX.map(func, iterable, chunksize=None) 
#将iterable的每个元素作为参数，应用func函数，返回函数结果组成的list，阻塞版本。func(iterable[i])为子进程对应的活动。XXX为进程池实例。
#chunksize

map(func, iterable[, chunksize])

This method chops the iterable into a number of chunks which it submits to the process pool as separate tasks. 
The (approximate) size of these chunks can be specified by setting chunksize to a positive integer.

I presume a process picks up the next chunk from a queue when done with previous chunk.

The default chunksize depends on the length of iterable and is chosen 
so that the number of chunks is approximately four times the number of processes.

def f(a): #map方法只允许1个参数
    pass

pool = multiprocessing.Pool() 
result = pool.map_async(f, (a0, a1, ...)).get() #XXX.map()的异步（并行）版本，返回MapResult实例（其具有get()方法，获取结果组成的list）
pool.close()
pool.join()

"""

import time
from multiprocessing import Pool

def foo(x):
    time.sleep(1)
    return x*x

if __name__ == '__main__':
    l = [1,2,3,4,5,6,7,8,9,10]
    t1 = time.time()
    p = Pool(6)
    print(time.time()-t1)
    res = p.map(foo,l) # 这行代码表示所有的进行都已经执行完了，并且每个进程的结果都拿到，放在了res中 #l是foo()的参数
    print(time.time()-t1)   
    print(res,type(res)) #res 存储结果
    p.close()
    p.join()
    print(time.time()-t1)

0.07900619506835938
2.0829520225524902
[1, 4, 9, 16, 25, 36, 49, 64, 81, 100] <class 'list'>
2.1894869804382324


### 使用concurrent.futures 库内的ProcessPoolExecutor 来创建多进程

concurrent.futures為python集合thread和process的高階API，能夠讓使用者更容易的使用這進（線）程，而且能夠方便的得到函式的回傳值，先前的thread及multiprocessing要取得return值稍稍麻煩了一點。但使用concurrent的executor提供的map及submit方法能夠快速地取得個函式的回傳值。以下會提供map及submit的使用方式，兩者在thread及process的情況下階相同。


executor=ProcessPoolExecutor()：生成一个ProcessPoolExecutor对象；

future=executor.submit():提交任务，返回一个Future对象。

executor.shutdown()。相当于Pool类中的close()和join()

future.result()：从Future对象中获取其返回值。

In [7]:
from concurrent.futures import ProcessPoolExecutor,ThreadPoolExecutor
import os,time
from threading import current_thread
def foo():
    time.sleep(1)
    print('%s  %s from foo'%(os.getpid(),current_thread().getName()))
    return 'foo'
def bar():
    time.sleep(2)
    print('%s  %s from bar' % (os.getpid(),current_thread().getName()))
    return 'bar'
if __name__ == '__main__':
    t1=time.time()
    executor=ProcessPoolExecutor()
    print('executor',executor)
    future_to_url = {}
    future1=executor.submit(foo) #foo函数不用传入参数
    future2=executor.submit(bar)
    print(future1,future2)
    executor.shutdown()
    print(future1.result())
    print(future2.result())
    print(time.time()-t1)

executor <concurrent.futures.process.ProcessPoolExecutor object at 0x7fa0d497d358>
<Future at 0x7fa0d4987ef0 state=running> <Future at 0x7fa0d49062e8 state=running>
1670  MainThread from foo
1671  MainThread from bar
foo
bar
2.0878539085388184


In [8]:
import multiprocessing
p = multiprocessing.Pool(multiprocessing.cpu_count())
# result = p.map(eval_formula, expression_list)
# p.close()
# p.join()

# def eval_formula(formula):
#     #evaluates the expression

p

<multiprocessing.pool.Pool at 0x7fa0d496c240>

In [9]:
multiprocessing.cpu_count()

8

### 使用python的多进程详细步骤

In [10]:
import os
from tqdm import trange                  # 显示进度条
from multiprocessing import cpu_count    # 查看cpu核心数
from multiprocessing import Pool         # 并行处理必备，进程池 
 
# import cv2                                
from PIL import Image, ImageOps
import numpy as np                        
# from contrast import ImageContraster     # 自定义图像处理库，忽略


***2、定义单线程的工作流函数，换句话说就是单线程需要完成的任务***

In [11]:
# 单线程所进行的自定义任务，每个任务可能不同，所以以下代码没有太大的参考价值
def single_worker(List_imgs, src_path, dest_path):
    icter = ImageContraster()
        
    # 注意：
    # 这里使用的是 trange 而不是 range 原因是为了美观
    # trange 会输出一个进度条
    # 当然，你也可以换成 range 函数
    for i in trange(len(List_imgs)):
        if not List_imgs[i].endswith('.jpg'):
            continue
        file = List_imgs[i]
        filepath = os.path.join(src_path, file)
        img = cv2.imread(filepath)
        he_eq_img = icter.enhance_contrast(img, method="HE")
        he_eq_img = np.array(he_eq_img)
        save_he = os.path.join(dest_path, file)
        cv2.imwrite(save_he, he_eq_img)

***3、将总数据集进行分割，分割成子数据集 (分割成chunks）***

根据CPU核数来划分数据

In [14]:
# 忽略
SourceImgs = './data/source/'
HE_path = "./data/he_trainval_jpg/"

# if not os.path.exists(HE_path):
#     os.mkdir(HE_path)

In [15]:
num_cores = cpu_count()
num_cores

8

In [None]:
#以下重点
#将数据集下的图片名加载到List_imgs中
List_imgs = os.listdir(SourceImgs)
Len_imgs = len(List_imgs) # 数据集长度
num_cores = cpu_count()   # cpu核心数

 # 双核，将所有数据集分成两个子数据集
if num_cores == 2:
    subset1 = List_imgs[:Len_imgs // 2]
    subset2 = List_imgs[Len_imgs // 2:]
 
    List_subsets = [subset1, subset2]

    # 四核，将所有数据集分成四个子数据集

elif num_cores == 4:  
    subset1 = List_imgs[:Len_imgs // 4]
    subset2 = List_imgs[Len_imgs // 4: Len_imgs // 2]
    subset3 = List_imgs[Len_imgs // 2: (Len_imgs * 3) // 4]
    subset4 = List_imgs[(Len_imgs * 3) // 4:]
 
    List_subsets = [subset1, subset2, subset3, subset4]
      
    # 八核以上，将所有数据集分成八个子数据集
elif num_cores >= 8:     
    num_cores = 8
    subset1 = List_imgs[:Len_imgs // 8]
    subset2 = List_imgs[Len_imgs // 8: Len_imgs // 4]
    subset3 = List_imgs[Len_imgs // 4: (Len_imgs * 3) // 8]
    subset4 = List_imgs[(Len_imgs * 3) // 8: Len_imgs // 2]
    subset5 = List_imgs[Len_imgs // 2: (Len_imgs * 5) // 8]
    subset6 = List_imgs[(Len_imgs * 5) // 8: (Len_imgs * 6) // 8]
    subset7 = List_imgs[(Len_imgs * 6) // 8: (Len_imgs * 7) // 8]
    subset8 = List_imgs[(Len_imgs * 7) // 8: ]
 
    List_subsets = [subset1,subset2,subset3,subset4,
                    subset5,subset6,subset7,subset8]

***4、开启多线程处理数据***

In [None]:
# 开辟进程池，不需要改动
# num_cores为cpu核心数，也就是开启的进程数
p = Pool(num_cores)   
 
#
# 对每个进程分配工作
for i in range(num_cores):
    #
    # 格式：p.apply_async(task, args=(...))
    # task：当前进程需要进行的任务/函数，只需要填写函数名
    # args：task函数中所需要传入的参数
    # 注意看 List_subsets[i] 就是传入不同的数据子集
    p.apply_async(single_worker, args=(List_subsets[i], SourceImgs, HE_path))
    res = p.map(single_worker, args=(List_subsets[i], SourceImgs, HE_path))
 # 当进程完成时，关闭进程池
# 以下两行代码不需要改动
p.close()
p.join()


https://www.blopig.com/blog/2016/08/processing-large-files-using-python/

In [1]:
#重新写分块的function

#reading the data in line by line, so that only a single line is stored in the RAM at any given time.
#main part
#process： 是执行函数，需要自己写的
#这里是process的line
# generate a pool of workers, ideally one for each core, 即8个工人，one for processing each line
# this still will run into memory problems

import multiprocessing as mp

#init objects
pool = mp.Pool(8) #generate 
jobs = []

#create jobs
with open("input.txt") as f:
    for line in f:
        jobs.append( pool.apply_async(process,(line)) )

#wait for all jobs to finish
for job in jobs:
    job.get()

#clean up
pool.close()

FileNotFoundError: [Errno 2] No such file or directory: 'wikiListOfArticles_nonredirects.txt'

In [None]:
#上面可以改进为
#change the function to include opening the file, locating the specified line, reading it into memory, and then processing it
#lineID is line number, thereby preventing the memory overflow
#overhead 的问题：
#the overhead involved in having to locate the line by reading iteratively through the file for each job is untenable,
#getting progressively more time consuming as you get further into the file. 
def process_wrapper(lineID):
    with open("input.txt") as f:
        for i,line in enumerate(f):
            if i != lineID:
                continue
            else:
                process(line)
                break

#init objects
pool = mp.Pool(8)
jobs = []

#create jobs
with open("input.txt") as f:
    for ID,line in enumerate(f):
        jobs.append(pool.apply_async(process_wrapper,(ID)))

#wait for all jobs to finish
for job in jobs:
    job.get()

#clean up
pool.close()

In [None]:
#上面可以改进为：
# 解决overhead的问题：
#seek function of file: use the seek function of file objects which skips you to a particular location within a file. 
#tell function: Combining with the tell function, which returns the current location within a file
#no need to iterate through the file line by line

import multiprocessing as mp

def process_wrapper(lineByte):
    with open("input.txt") as f:
        f.seek(lineByte)
        line = f.readline()
        process(line)

#init objects
pool = mp.Pool(cores)
jobs = []

#create jobs
with open("input.txt") as f:
    nextLineByte = f.tell()
    for line in f:
        jobs.append(pool.apply_async(process_wrapper,(nextLineByte)) )
        nextLineByte = f.tell() #returns the current location within a file

#wait for all jobs to finish
for job in jobs:
    job.get()

#clean up
pool.close()

Using seek we can move directly to the correct part of the file, whereupon we read a line into the memory and process it. We have to be careful to correctly handle the first and last lines, but otherwise this does exactly what we set out, namely using all the cores to process a given file while not overflowing the memory.

***process multiple lines of the file at a time as a chunk***

In [1]:
import multiprocessing as mp,os

def process_wrapper(chunkStart, chunkSize):
    with open("wikiListOfArticles_nonredirects.txt") as f:
        f.seek(chunkStart)
        lines = f.read(chunkSize).splitlines()
        for line in lines:
            process(line)

def chunkify(fname,size=1024*1024):
    fileEnd = os.path.getsize(fname)
    with open(fname,'r') as f:
        chunkEnd = f.tell()
    while True:
        chunkStart = chunkEnd
        f.seek(size,1)
        f.readline()
        chunkEnd = f.tell()
        yield chunkStart, chunkEnd - chunkStart
        if chunkEnd > fileEnd:
            break

#init objects
pool = mp.Pool(8)
jobs = []

#create jobs
for chunkStart,chunkSize in chunkify("wikiListOfArticles_nonredirects.txt"):
    jobs.append(pool.apply_async(process_wrapper,(chunkStart,chunkSize)) )

#wait for all jobs to finish
for job in jobs:
    job.get()

#clean up
pool.close()


FileNotFoundError: [Errno 2] No such file or directory: 'wikiListOfArticles_nonredirects.txt'

In [1]:
import requests
import json
from lxml import html
from pyspark.sql import Row
import pyspark.sql.functions as sparkfn
from pyspark.sql.types import *
from  multiprocessing import Process,Pool, cpu_count
import os, time, random
import itertools
from functools import reduce 
from pyspark.sql import DataFrame


In [2]:
def crawler(title_list):

    lst_categories = []
    lst_page_id = []
    lst_links = []
    lst_externallinks = []
    lst_sections = []
    lst_redirects = []
    lst_texts = []
    lst_page_titles = []


    URL = "https://en.wikipedia.org/w/api.php"
    for title in title_list:
        PARAMS = {
        "action": "parse",
        "page": title,
        "format": "json",
        "redirects": True
    
    }   
        json_return = requests.get(url=URL, params=PARAMS)
        json_data = json_return.json()

        if 'parse' in json_data.keys():
            #page_id
            page_id = json_data['parse']['pageid']
    
            #categories
            categories = []
            for item in json_data['parse']['categories']:
                if 'hidden' not in item.keys():
                    label = item["*"]
                    categories.append(label)  
    
            #links
            links = [item['*'] for item in json_data['parse']['links']]
    
            #external links
            external_links = json_data['parse']['externallinks']
    
            #sections
            sections = []
    
            for item in json_data['parse']['sections']:
                section_name = item['line']
                sections.append(section_name)
        
            lst_page_id.append(page_id)
            lst_categories.append(categories)
            lst_links.append(links)
            lst_externallinks.append(external_links)
            lst_sections.append(sections) 
            
            #redirects
            if len(json_data['parse']['redirects']) > 0:
                lst_redirects.append(json_data['parse']['redirects'][0]['to'])
            else:
                lst_redirects.append(None)
            
            #texts
            raw_html = json_data['parse']['text']['*']
            document = html.document_fromstring(raw_html)
            # redirect pages
            #if len(para) > 0 and para[0].text_content().startswith("Redirect to") is False:
            text = ""    
            for idx in range(len(document.xpath('//p'))):
                text = text + " " + str(document.xpath('//p')[idx].text_content())
                text = text.replace("\n", ".")
            
            lst_texts.append(text)
            lst_page_titles.append(title)

    mySchema = StructType([StructField("page_id", StringType(), True)\
                       ,StructField("page_title", StringType(), True)\
                       ,StructField("page_text", StringType(), True)\
                       ,StructField("category", ArrayType(StringType()), True)\
                       ,StructField("links", ArrayType(StringType()), True)\
                       ,StructField("external_links", ArrayType(StringType()), True)\
                       ,StructField("sections", ArrayType(StringType()), True)\
                       ,StructField("redirects_page", StringType(), True)])

   
    data = [{'page_id':lst_page_id,'page_title':lst_page_titles, 'page_text': lst_texts, 'category': lst_categories, 'links': lst_links, 
             'external_links': lst_externallinks, 'sections': lst_sections, 'redirects_page': lst_redirects} 
            for lst_page_id,lst_page_titles,lst_texts,lst_categories, lst_links, lst_externallinks, lst_sections, lst_redirects
            in zip(lst_page_id,lst_page_titles,lst_texts,lst_categories, lst_links, lst_externallinks, lst_sections, lst_redirects)]

    #df = spark.createDataFrame(Row(**x) for x in data, schema = mySchema)
    df = spark.createDataFrame(data, schema = mySchema)
    df = df.where(~sparkfn.array_contains(df.category, 'Disambiguation_pages'))
    df = df.where(~sparkfn.array_contains(df.category, 'Disambiguation pages'))
    return df

In [3]:
root = '/srv/spark/shared-data-export/liuy/datasets/video_level_classification/Video_game_extraction/Access_wikidata'
with open(os.path.join(root, 'wikiListOfArticles_samp.txt')) as f:
    content = f.readlines()
    
samp_titles = [x.split('; ')[1].strip() for x in content]
samp_titles = samp_titles[:1000]
len(samp_titles)

def split(a, n):
    k, m = divmod(len(a), n)
    return (a[i * k + min(i, m):(i + 1) * k + min(i + 1, m)] for i in range(n))

def unionAll(dfs):
    return reduce(DataFrame.unionAll, dfs)

In [None]:
if __name__ == '__main__':
    
    Len_titles = len(samp_titles) # 数据集长度
    num_cores = cpu_count()   # cpu核心数
    results = []
    pool = Pool(num_cores)
    
    chunks = list(split(samp_titles, 10))
    stime = time.time()
    print(time.time()-stime)
    #p.apply_async(single_worker, args=(List_subsets[i], SourceImgs, HE_path))
    result = pool.map(crawler,chunks) # 这行代码表示所有的进行都已经执行完了，并且每个进程的结果都拿到，放在了res中 #l是foo()的参数
    print(time.time()-stime) 
    results.extend(result)
    #print(res,type(res)) #res 存储结果
    pool.close()
    pool.join()
    print(time.time()-stime)

8.58306884765625e-06


In [41]:
if __name__ == '__main__':
    
    num_cores = cpu_count()   # cpu核心数
    #init objects
    pool = Pool(num_cores)
    jobs = []
    
    chunks = list(split(samp_titles, 10))
    print(len(chunks))
    stime = time.time()
    print(time.time()-stime)

    for mini_chunk in chunks:
    #
    # 格式：p.apply_async(task, args=(...))
    # task：当前进程需要进行的任务/函数，只需要填写函数名
    # args：task函数中所需要传入的参数
    # 注意看 List_subsets[i] 就是传入不同的数据子集
        jobs.append(pool.apply_async(crawler, args=(mini_chunk,)))
    print(time.time()-stime) 
    
    #wait for all jobs to finish
#     for job in jobs:
#         job.get()
    pool.close()
    pool.join()
    print(time.time()-stime)

10
1.0967254638671875e-05
0.00019431114196777344
62.00026869773865


In [42]:
type(jobs)

list

In [43]:
len(jobs)

10

#理解map() and apply_asyn()

pool.apply(f, args): f is only executed in ONE of the workers of the pool. So ONE of the processes in the pool will run f(args).
only one of the workers execute the function, you have to call apply multiple times to have the rest of the workers perform a task.

apply() can only be called for one job, func 仅在池中的一个工作程序中执行。
***********

pool.map(f, iterable): This method chops the iterable into a number of chunks which it submits to the process pool as separate tasks. So you take advantage of all the processes in the pool.

map() block the main process until all the processes complete and return the result.
map is called for a list of jobs in one time

when using map(), put 13 arguments in one iterable and run it with 12 core. 12 jobs were running simultaneously and the rest one works after the first bunch is done.

此方法将iterable内的每一个对象作为单独的任务提交给进程池。可以通过将chunksize设置为正整数来指定这些块的（近似）大小。
****************

global interpreter lock (GIL).This means that only one thread can execute at a time, since the interpreter blocks access for all other threads.

In [None]:
def my_func(x):
    s0 = time()
    res = 0
    for _ in range(x*1000000):
        res += 1
    print(mp.current_process(),'run time:%.3f s, result:%.1f'%(time()-s0,res))



Multiprocessing.Pool可以提供指定数量的进程供用户调用，当有新的请求提交到pool中时，如果池还没有满，那么就会创建一个新的进程用来执行该请求；但如果池中的进程数已经达到规定最大值，那么该请求就会等待，直到池中有进程结束，才会创建新的进程来执行它。