[Python並列処理(multiprocessingとJoblib)](https://qiita.com/yukiB/items/203a6248c2d466b80d38)

- multiprocessing official: https://docs.python.org/ja/3/library/multiprocessing.html
- [Python で並列処理(初めての人向け)](http://iatlex.com/python/parallel_first)

In [1]:
from joblib import Parallel, delayed
from multiprocessing import Pool
import multiprocessing as multi

def process(i):
    return [{'id': j, 'sum': sum(range(i*j))} for j in range(100)]

In [9]:
%%time
x = [process(i) for i in range(1000)]

CPU times: user 34.2 s, sys: 84.6 ms, total: 34.3 s
Wall time: 34.4 s


In [10]:
%%time
# joblib
result = Parallel(n_jobs=-1)([delayed(process)(n) for n in range(1000)])

CPU times: user 122 ms, sys: 19 ms, total: 141 ms
Wall time: 8.6 s


In [11]:
%%time
# multiprocessing
p = Pool(multi.cpu_count())  # Pool()インスタンスを作成、引数に使用するCPU数を取る、コンピュータのCPU数はmulti.cpu_count()で取得可能
p.map(process, list(range(1000)))  # 並列計算の実行、p.map()に「並列計算させる関数」と「その関数たちに渡す引数」を渡す
p.close()  # インスタンスを閉じる

CPU times: user 69.8 ms, sys: 54.9 ms, total: 125 ms
Wall time: 8.26 s


並列計算させたい関数が複数の引数を持つ場合：ラッパーを使って複数の引数を一つの引数のように見せかける

In [22]:
%%time
def kakezan(a, b):
    return a*b

tutumimono = [[i, 3] for i in range(1000000)]
x = [kakezan(i[0], i[1]) for i in tutumimono]

CPU times: user 647 ms, sys: 75.5 ms, total: 723 ms
Wall time: 722 ms


In [21]:
%%time
def kakezan(a, b):
    return a*b

def wrapper_kakezan(args):
    return kakezan(*args)

cpu_count = multi.cpu_count()
tutumimono = [[i, 3] for i in range(1000000)]
p = Pool(processes=cpu_count)
x = p.map(wrapper_kakezan, tutumimono)
p.close()

CPU times: user 749 ms, sys: 191 ms, total: 941 ms
Wall time: 1.36 s


Pandas でのデータ処理で multiprocessing を用いる

In [24]:
# Kaggle Malware Competition
# https://www.kaggle.com/c/microsoft-malware-prediction/data

import pandas as pd
from multiprocessing import Pool
import multiprocessing as multi

train = pd.read_csv('input/train.csv')  # 4.38GB
print(train.shape)
train.head(3)

  interactivity=interactivity, compiler=compiler, result=result)


(8921483, 83)


Unnamed: 0,MachineIdentifier,ProductName,EngineVersion,AppVersion,AvSigVersion,IsBeta,RtpStateBitfield,IsSxsPassiveMode,DefaultBrowsersIdentifier,AVProductStatesIdentifier,...,Census_FirmwareVersionIdentifier,Census_IsSecureBootEnabled,Census_IsWIMBootEnabled,Census_IsVirtualDevice,Census_IsTouchEnabled,Census_IsPenCapable,Census_IsAlwaysOnAlwaysConnectedCapable,Wdft_IsGamer,Wdft_RegionIdentifier,HasDetections
0,0000028988387b115f69f31a3bf04f09,win8defender,1.1.15100.1,4.18.1807.18075,1.273.1735.0,0,7.0,0,,53447.0,...,36144.0,0,,0.0,0,0,0.0,0.0,10.0,0
1,000007535c3f730efa9ea0b7ef1bd645,win8defender,1.1.14600.4,4.13.17134.1,1.263.48.0,0,7.0,0,,53447.0,...,57858.0,0,,0.0,0,0,0.0,0.0,8.0,0
2,000007905a28d863f6d0d597892cd692,win8defender,1.1.15100.1,4.18.1807.18075,1.273.1341.0,0,7.0,0,,53447.0,...,52682.0,0,,0.0,0,0,0.0,0.0,3.0,0


In [43]:
def extract_app_ver(AppVersion):
    AppVersion = str(AppVersion)
    first_period = AppVersion.find('.')
    second_period = (first_period+1) + AppVersion[(first_period+1):].find('.')
    major_version = AppVersion[:first_period]
    minor_version = AppVersion[(first_period+1):second_period]
    return major_version, minor_version

In [39]:
%%time
major_minor = train['AppVersion'].map(extract_app_ver)

CPU times: user 6.58 s, sys: 217 ms, total: 6.79 s
Wall time: 6.79 s


In [45]:
%%time
p = Pool(multi.cpu_count())
major_minor = p.map(extract_app_ver, train['AppVersion'])
p.close()

CPU times: user 2.82 s, sys: 1.3 s, total: 4.12 s
Wall time: 4.35 s


('4', '18')