`'Window 환경에서의 multiprocessing은 모두 실행하려는 함수를 py 파일로 만들고 진행해야 한다.`

-----

# multiprocessing

- pool.close() : 더 이상 Pool에 추가 작업이 들어가지 않는다는 것을 알려준다. 지금 수행 중인 작업이 모두 끝나면 Pool의 프로세스들을 종료한다.
    - close() 대신 terminate()를 사용하면, 현재 진행 중인 작업이 있떠라도 즉시 Pool의 프로세스들을 종료한다. 
  
- pool.join() : .join()은 Pool의 모든 프로세스들의 종료가 완료되기를 기다린다.

In [None]:
"""아래와 같이 데이터프레임이 들어왔을 때 처리되는 apply 함수를 짜놓고,
데이터를 분할해서 함수를 적용한다. 이후 concat"""
import random
import pandas as pd
import numpy as np
from multiprocessing import  Pool

def add_features(df):
    df['question_text'] = df['question_text'].apply(lambda x:str(x))
    df["lower_question_text"] = df["question_text"].apply(lambda x: x.lower())
    df['total_length'] = df['question_text'].apply(len)
    df['capitals'] = df['question_text'].apply(lambda comment: sum(1 for c in comment if c.isupper()))
    df['caps_vs_length'] = df.apply(lambda row: float(row['capitals'])/float(row['total_length']),
                                axis=1)
    df['num_words'] = df.question_text.str.count('\S+')
    df['num_unique_words'] = df['question_text'].apply(lambda comment: len(set(w for w in comment.split())))
    df['words_vs_unique'] = df['num_unique_words'] / df['num_words'] 
    df['num_exclamation_marks'] = df['question_text'].apply(lambda comment: comment.count('!'))
    df['num_question_marks'] = df['question_text'].apply(lambda comment: comment.count('?'))
    df['num_punctuation'] = df['question_text'].apply(lambda comment: sum(comment.count(w) for w in '.,;:'))
    df['num_symbols'] = df['question_text'].apply(lambda comment: sum(comment.count(w) for w in '*&$%'))
    df['num_smilies'] = df['question_text'].apply(lambda comment: sum(comment.count(w) for w in (':-)', ':)', ';-)', ';)')))
    df['num_sad'] = df['question_text'].apply(lambda comment: sum(comment.count(w) for w in (':-<', ':()', ';-()', ';(')))
    df["mean_word_len"] = df["question_text"].apply(lambda x: np.mean([len(w) for w in str(x).split()]))
    return df

In [None]:
"""multiprocessing"""
def parallelize_dataframe(df, func, n_cores = 8): 
    df_split = np.array_split(n_cores) # core의 개수만큼 df를 나눔
    pool = Pool(n_cores) # pool을 core개수만큼 생성 
    df = pd.concat(pool.map(func, df_split)) # 나누어진 df를 func을 적용해서 수행 및 concat
    pool.close() pool.join() # 모두 완료될 때까지 대기 
    return df


------

# parmap

In [23]:
import multiprocessing
import parmap
import numpy as np

"""multiprocessing으로 처리하는 함수를 담은 모듈"""
import test_func as tf

"""py 파일 reload할 때 사용하는 라이브러리"""
import importlib
importlib.reload(tf)

<module 'test_func' from 'C:\\Users\\RohSeungChan\\Desktop\\sps. Lab\\### SIT 프로젝트 ###\\[[코드]]\\test_func.py'>

In [None]:
"""test_function 안의 함수들"""
def square(input_list):
    return [x*x for x in input_list]

def sss(input_list):
    return [x+100 for x in input_list]

In [27]:
## 데이터 생성
data = list(range(1, 25))

"""데이터를 core의 개수만큼 나눠준다."""
num_cores = multiprocessing.cpu_count() # 8
print(data)

[1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24]


In [28]:
"""데이터를 사용하는 core의 개수만큼 나눠준다."""
splited_data =  np.array_split(data, num_cores)
splited_data = [x.tolist() for x in splited_data]
splited_data

### 결과물 예시
# split_input = [[1,2,3,4,5], [6,7,8,9,10], ... ]
"""아래와 같이 각 데이터가 리스트로 되어서 들어있는 것을 알 수 있다."""

[[1, 2, 3],
 [4, 5, 6],
 [7, 8, 9],
 [10, 11, 12],
 [13, 14, 15],
 [16, 17, 18],
 [19, 20, 21],
 [22, 23, 24]]

In [29]:
"""모듈 내의 함수 확인"""
tf.sss(splited_data[0])

[101, 102, 103]

## parmap.map()
- parmap.map(	
-  ex_func,      # 실행 함수 
-  split_input,  # input data (core 수 대로 나눈 데이터)
-  a, b,         # 실행하려는 함수의 파라미터
-  pm_pbar=True, # 진행상황 bar  
-  pm_processes=NUM_CORES, # core 갯수
-    )

In [30]:
result = parmap.map(tf.square, splited_data, pm_pbar=True, pm_processes=num_cores)
result

  0%|          | 0/8 [00:00<?, ?it/s]

[[1, 4, 9],
 [16, 25, 36],
 [49, 64, 81],
 [100, 121, 144],
 [169, 196, 225],
 [256, 289, 324],
 [361, 400, 441],
 [484, 529, 576]]