# 30. Multi-threading and Multi-processing

## Multi-threading vs. Multi-processing

<div style="display:flex;">
<div><img src="multithread.png" width="200" /></div>
<div>&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;</div>
<div><img src="multi-processing.png" width="500" /></div>
</div>


In [15]:
import time
import threading

def calc_square(num_list):
    print('제곱 계산')
    for n in num_list:
        time.sleep(0.2)
        print('sqaure: ', n*n)
        
def calc_cube(num_list):
    print('세제곱 계산')
    for n in num_list:
        time.sleep(0.2)
        print('cube: ', n*n*n)
        
arr = [5, 10, 15, 20]

### serial processing

In [16]:
s = time.time()

calc_square(arr)
calc_cube(arr)

print()
print("작업 시간 ", time.time() - s)

제곱 계산
sqaure:  25
sqaure:  100
sqaure:  225
sqaure:  400
세제곱 계산
cube:  125
cube:  1000
cube:  3375
cube:  8000

작업 시간  1.6328392028808594


### Multi-threading

In [17]:
t1 = threading.Thread(target=calc_square, args=(arr,))
t2 = threading.Thread(target=calc_cube, args=(arr,))

s = time.time()

t1.start()
t2.start()

t1.join()
t2.join()

print()
print("작업 시간 ", time.time() - s)

제곱 계산
세제곱 계산
sqaure: cube:  125
 25
cube: sqaure:  1000
 100
cube: sqaure:  3375
 225
cube: sqaure:  8000
 400

작업 시간  0.8138248920440674


### Multi-processing

In [18]:
import multiprocessing

p1 = multiprocessing.Process(target=calc_square, args=(arr,))
p2 = multiprocessing.Process(target=calc_cube, args=(arr,))

s = time.time()

p1.start()
p2.start()

p1.join()
p2.join()

print()
print("작업 시간 ", time.time() - s)

제곱 계산
세제곱 계산
sqaure:  25
cube:  125
sqaure:  100cube: 
 1000
sqaure:  cube:  2253375

cube: sqaure:   4008000


작업 시간  0.8403067588806152


## Process 의 주소 공간 (Address Space)

<img src="addressSpace.png" width="400">  

- process 별로 주소 공간이 생성되므로 global variable 영역도 별도 생성된다.  

- 따라서, 전역 변수 (global variable)를 사용하여 프로세스 간 상태 공유 시 문제 발생

In [29]:
global_result = []

def calc_square(numbers):
    
    for n in numbers:
        global_result.append(n * n)
        
    print("Process 내부 결과 :", global_result)
        
arr = [2, 3, 4, 5, 6]
p1 = multiprocessing.Process(target=calc_square, args=(arr,))
p1.start()
p1.join()

print("Process 외부 결과 :",global_result)

Process 내부 결과 : [4, 9, 16, 25, 36]
Process 외부 결과 : []


## Shared Memory 를 이용한 프로세스 간 상태 공유

<img src="sharedMemory.png" width="300">  


### Value 또는 Array를 이용한 공유 메모리 맵 사용  
- multiprocessing.Array, multiprocessing.Value  
    - 'd': double precision float
    - 'i': integer

In [35]:
def calc_square(numbers, result, v):
    
    v.value = 3.45
    
    for idx, n in enumerate(numbers):
        result[idx] = n * n
        
    print("Process 내부 결과 :", result[:])
        
arr = [2, 3, 4, 5, 6]

result = multiprocessing.Array('i', len(arr))
v = multiprocessing.Value('d', 0.0)

p1 = multiprocessing.Process(target=calc_square, args=(arr, result, v))
p1.start()
p1.join()

print("Process 외부 결과 :", result[:])

Process 내부 결과 : [4, 9, 16, 25, 36]
Process 외부 결과 : [4, 9, 16, 25, 36]


### Queue를 이용한 Process 간의 Data 공유

<img src="multiprocessing-q.png" width="300">

In [37]:
def calc_square(numbers, q):
    for idx, n in enumerate(numbers):
        q.put(n * n)

arr = [2, 3, 4, 5, 6]
q = multiprocessing.Queue()
p = multiprocessing.Process(target=calc_square, args=(arr, q))

p.start()
p.join()

while not q.empty():
    print(q.get())

4
9
16
25
36


## Multiprocessing Pool (Map Reduce) 

<img src='mapreduce1.png'>
<img src='mapreduce2.png'>

In [2]:
import time

def f(n):
    time.sleep(0.2)
    return n*n

array = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]

s = time.time()
result = map(f, array)

print(list(result))
print(time.time() - s)

[1, 4, 9, 16, 25, 36, 49, 64, 81, 100]
2.0375301837921143


In [24]:
from multiprocessing import Pool
import time

def f(n):
    time.sleep(0.2)
    return n*n

array = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]

s = time.time()
p = Pool()
result = p.map(f, array)
p.close()
p.join()

print(result)
print(time.time() - s)

[1, 4, 9, 16, 25, 36, 49, 64, 81, 100]
0.26096010208129883


## Lock을 이용한 동기성(Synchronization) 유지

<img src="lock.png" width="500">

- 여러개의 thread가 share 하는 resource를 lock 하지 않으면 결과를 예측할 수 없다

In [18]:
import threading

n = 1000000

balance = 0

def increase(n):
    global balance
    for i in range(n):
        balance += 1

def decrease(n):
    global balance
    for i in range(n):
        balance -= 1
        
def main_task():
    t1 = threading.Thread(target=increase, args=(n,))
    t2 = threading.Thread(target=decrease, args=(n,))
    
    t1.start()
    t2.start()
    t1.join()
    t2.join()
    
main_task()
print(balance)

564032


- threading.Lock() 을 이용하여 문제 해결  
    - lock.acquire() 
    - lock.release()

In [21]:
n = 1000000

balance = 0

def increase(n, lock):
    global balance
    for i in range(n):
        lock.acquire()     
        balance += 1    
        lock.release()

def decrease(n, lock):
    global balance
    for i in range(n):
        lock.acquire()    
        balance -= 1   
        lock.release()
        
def main_task():
    lock = threading.Lock()
    
    t1 = threading.Thread(target=increase, args=(n, lock))
    t2 = threading.Thread(target=decrease, args=(n, lock))
    
    t1.start()
    t2.start()
    t1.join()
    t2.join()
    
main_task()
print(balance)

0


## multiprocessing에서 lambda 사용 방법

In [26]:
import multiprocessing

def calc_square(n):
    return n * n

arr = [1, 2, 3, 4, 5]

with multiprocessing.Pool() as p:
    result = p.map(calc_square, arr)
    
print(result)

[1, 4, 9, 16, 25]


- multiprocessing 에서는 lambda 사용 불가 

In [29]:
with multiprocessing.Pool() as p:
    try:
        result = p.map(lambda x: x*x, arr)
    except Exception as e:
        print(e)

Can't pickle <function <lambda> at 0x7ff55f17e950>: attribute lookup <lambda> on __main__ failed
