# Multiprocessing Pool
- 프로세스를 미리 할당하고, 해당 프로세스에 각각의 데이터를 병렬적으로 나눠주어 연산을 진행

In [1]:
from multiprocessing import Pool
import time
import os
import math

def f(x):
    print("값", x, "에 대한 작업 Pid = ",os.getpid())
#     time.sleep(1)
    return x*x

if __name__ == '__main__':
    p = Pool(3)
    startTime = float(time.time())
    print(p.map(f, range(0,10)))  # 함수와 인자값을 맵핑하면서 데이터를 분배한다
    endTime = float(time.time())
    print("총 작업 시간", (endTime - startTime))

값 1 에 대한 작업 Pid =  11687
값 2 에 대한 작업 Pid =  11688
값 0 에 대한 작업 Pid =  11686
값 3 에 대한 작업 Pid =  11687
값 4 에 대한 작업 Pid =  11688
값 5 에 대한 작업 Pid =  11686
값 6 에 대한 작업 Pid =  11688
값 7 에 대한 작업 Pid =  11687
값 8 에 대한 작업 Pid =  11686
값 9 에 대한 작업 Pid =  11688
[0, 1, 4, 9, 16, 25, 36, 49, 64, 81]
총 작업 시간 0.012486934661865234


# Multiprocessing Process 
- 하나의 프로세스에 하나의 함수를 할당

In [2]:
import os
from multiprocessing import Process

def doubler(number):
    # A doubling function that can be used by a process
    
    result = number ** 2
    proc = os.getpid()
    print('{0} doubled to {1} by process id: {2}'.format(
        number, result, proc))

if __name__ == '__main__':
    startTime = float(time.time())
#     numbers = [5, 10, 15, 20, 25]
    numbers = range(0, 10)
    procs = []

    for index, number in enumerate(numbers):
        proc = Process(target=doubler, args=(number,))
        procs.append(proc)
        proc.start()

    for proc in procs:
        proc.join()
    
    endTime = float(time.time())
    print("총 작업 시간", (endTime - startTime))

0 doubled to 0 by process id: 11689
1 doubled to 1 by process id: 11690
2 doubled to 4 by process id: 11691
3 doubled to 9 by process id: 11692
4 doubled to 16 by process id: 11693
5 doubled to 25 by process id: 11694
6 doubled to 36 by process id: 11695
7 doubled to 49 by process id: 11696
8 doubled to 64 by process id: 11697
9 doubled to 81 by process id: 11698
총 작업 시간 0.09257125854492188


# Pandas DataFrame 에 적용
- 아래의 parallelize_dataframe 함수를 이용해서 pool 방식으로 나눠준다. 
- 거기에 map 함수를 이용해 적용한다

In [3]:
import pandas as pd
import numpy as np
import seaborn as sns
from multiprocessing import Pool

num_cores = 4
iris = pd.DataFrame(sns.load_dataset('iris'))

def parallelize_dataframe(df, func):
    df_split = np.array_split(df, num_cores)
    pool = Pool(num_cores)
    df = pd.concat(pool.map(func, df_split))
    pool.close()
    pool.join()
    return df

def multiply_columns(data):
    data['length_of_word'] = data['species'].apply(lambda x: len(x))
    return data

print("whitout parallelize")
%time _ = multiply_columns(iris)
print("\nwith parallelize")
%time _ = parallelize_dataframe(iris, multiply_columns)

whitout parallelize
CPU times: user 778 µs, sys: 100 µs, total: 878 µs
Wall time: 814 µs

with parallelize
CPU times: user 14 ms, sys: 17.8 ms, total: 31.9 ms
Wall time: 145 ms


# Dask DataFrame 이용
- 빅데이터를 처리하기 위한 라이브러리
- pandas, numpy 등을 지원함

## apply 함수 이용

In [4]:
import dask.dataframe as dd

# npartitions 로 분할 할당
iris_dd = dd.from_pandas(iris, npartitions=8)
iris_dd['length_of_word'] = iris_dd.apply(lambda x: len(x['species']), axis=1)
# compute() 하기 이전에는 작업이 진행되지 않음
# 전체 코어를 다 쓰기 위해서는 --> scheduler='processes'
%time _ = iris_dd.compute(scheduler='processes')

You did not provide metadata, so Dask is running your function on a small dataset to guess output types. It is possible that Dask will guess incorrectly.
To provide an explicit output types or to silence this message, please provide the `meta=` keyword, as described in the map or apply function that you are using.
  Before: .apply(func)
  After:  .apply(func, meta=(None, 'int64'))



CPU times: user 39.5 ms, sys: 29.5 ms, total: 69.1 ms
Wall time: 103 ms


## map partitions 함수
- partition 별로 map 함수 적용
- 병렬 처리가 아닌 메모리 이득을 위한 함수이므로 속도는 apply 함수에 비해 느릴 수밖에 없음

In [5]:
%time _ = iris_dd.map_partitions(multiply_columns).compute(scheduler='processes')

CPU times: user 52.5 ms, sys: 38.6 ms, total: 91.2 ms
Wall time: 131 ms


# 추가확인사항
- numba 라이브러리