# Concurrency

## 학습목표
 1. 동시성, 병렬성의 이해
 3. multithreading 이해 및 실습
 4. multiprocessing 이해 및 실습

## Threading module
 1. thread는 프로그램의 기본 실행 단위
 2. Process내의 thread끼리는 코드와 데이터를 공유하여 경량 process라고도 함
 3. 동시성 프로그램의 기본이자, 고급 프로그래머로 가는 길목 중 하나
 4. python의 기본 구현체인 CPython의 경우, GIL(Global Interpreter Lock)이 존재하여 인터프리터가 한번에 하나의 thread만 실행하도록 제한함
 5. 대부분의 GUI application에서는 멀티 쓰레드 프로그램을 작성하여 반응성을 높임
 
 https://docs.python.org/2/library/threading.html

## Thread 생성 및 실행 방법
 1. threading.Thread 객체에 함수 전달하여 사용
 2. Thread class를 상속받아서 run method override하여 사용

* Thread 생성 및 실행

In [1]:
# Thread 객체에 함수를 전달하여 thread를 생성하는 방법
import threading

def work():
    print 'I am working..'

# 세개의 thread를 생성
for i in range(3): 
    # 쓰레드 생성
    t = threading.Thread(target=work)
    
    # 쓰레드 실행
    t.start()

I am working..
I am working..
I am working..


* Thread 실행 시, argument 전달

In [8]:
import threading

def work(num):
    print '{} : I am working..'.format(num)

for i in range(3): 
    # args parameter로 전달, 이때 tuple로 전달함
    t = threading.Thread(target=work, args=(i,))
    
    # 쓰레드 실행
    t.start()

0 : I am working..
1 : I am working..
2 : I am working..


* Thread naming

In [None]:
import threading

def work():
    print '{} : I am working..'.format(threading.currentThread().getName())

t1 = threading.Thread(target=work, name='test')
t2 = threading.Thread(target=work, name='work')
t3 = threading.Thread(target=work)
    
t1.start()
t2.start()
t3.start()

* Daemon Thread
 - background에서 실행되는 쓰레드를 의미함
 - python process는 종료되지 않은 thread가 daemon일 경우에만 종료된다.
 - 즉, 메인 process의 종료에 상관없이 background에서 작업을 수행하는 thread를 말함

In [None]:
import threading
import time

def work():
    i = 0
    while i < 10:
        print 'I am working..'
        time.sleep(0.5)
        i += 1

t = threading.Thread(target=work)
# Daemon 설정
#t.setDaemon(True) 
t.daemon = True # 혹인 이렇게도 가능
t.start()

print 'main thread finished'

# daemon으로 동작하게 했을 때와 그렇지 않을 때의 비교!

* thread 대기
 - join 함수를 이용하여 waiting
 - join 함수를 호출한 곳은 block됨

In [None]:
import threading
import time

def work():
    i = 0
    while i < 10:
        print 'I am working..'
        time.sleep(0.5)
        i += 1
        

def sleep():
    i = 0
    while i < 10:
        print 'I am sleeping..'
        time.sleep(0.3)
        i += 1

t = threading.Thread(target=work)
t.start()

ts = threading.Thread(target=sleep)
ts.start()

# blocking됨
t.join()
ts.join()

print 'main thread finished'

# daemon으로 동작하게 했을 때와 그렇지 않을 때의 비교!

* Thread class subclassing
 - Thread class를 상속을 받아 thread 생성 가능
 - run method overriding하여 구현
 - thread의 시작은 start method 호출

In [None]:
import threading

class MyThread(threading.Thread):
    # run method 재정의
    def run(self):
        print 'This is a thread'
        return

for i in range(5):
    t = MyThread()
    t.start()


In [None]:
import threading

class MyThread(threading.Thread):
    # run method 재정의
    def run(self):
        print 'This is a thread'
        return

for i in range(5):
    t = MyThread()
    t.run() # 위의 코드와 차이점은?

* Thread-subclass parameter 전달

In [None]:
import threading

class MyThread(threading.Thread):
    def __init__(self, group=None, target=None, name=None,
                 args=(), kwargs=None, verbose=None):
        threading.Thread.__init__(self, group=group, target=target, name=name,
                                  verbose=verbose)
        self.args = args
        self.kwargs = kwargs
        
    def run(self):
        print '{} {}: This is a thread'.format(*self.args)
        return


t = MyThread(args=(50, 30))
t.start()

## Thread 메시지 전달
 - thread의 주 사용 목적은 concurrent한 실행이나, 때로는 서로 다른 두 thread 사이에 실행이 
  동기화(synchronized)되어야 할 필요가 있음
 
 - 이때 가장 간단히 thread 사이에서 message를 주고 받는 방법을 제공하는 객체 : Event
 
## Event 객체
 - 내부적으로 set, clear함수를 제공하여 flag를 관리
 - 다른 thread는 wait 함수를 호출하여 blocking 대기
 - https://docs.python.org/2/library/threading.html#event-objects

In [None]:
import threading
import time
                    
def work(e):
    print 'wait for the event'
    event_set = e.wait()
    print 'event set : {}'.format(event_set)

# event 객체 생성
e = threading.Event()
t1 = threading.Thread(name='', 
                      target=work,
                      args=(e,))
t1.start()

print 'main thread before calling Event.set()'
time.sleep(5)

# set event
e.set()

print 'event is just set'

In [None]:
import threading
import time
                    
def work(e):
    i = 0
    while i < 10:
        event_set = e.wait()
        print i
        time.sleep(1)
        i += 1

# event 객체 생성
e = threading.Event()
t1 = threading.Thread(name='', 
                      target=work,
                      args=(e,))
t1.start()

print 'main thread before calling Event.set()'
time.sleep(5)

# set event
e.set()

time.sleep(3)

# clear event - wait blocks
e.clear()

time.sleep(5)

# set event -  block 되었던 thread 깨움!
e.set()


## Shared Resource
 - multithreaded 환경에서 복수의 thread에 접근되는 객체를 shared resource 라고 함
 - shared resource는 그 상태가 의도한대로 변경되지 않을 수 있음
 - 이유는 파이썬의 연산은 실제로 원자적이지 않은 경우가 많기 때문임
 - 원자적이지 않은 연산 중간에 다른 thread가 실행되면 shared resource 상태가 변할 수 있음
 - 따라서 thread간의 동기화 기법이 중요
 
## Race Condition (경쟁 상태)
  
 - 복수개의 thread가 하나의 데이터에 연산을 수행하는 상태
 - thread switching에 따라 결과가 달라질 수 있음
 - 따라서 thread간의 동기화가 중요!

* race condition simulation
 - 아래 코드를 수행한 결과는?

In [None]:
from threading import Thread

class Counter(object):
    def __init__(self, n=0):
        self.n = 0
        
    def increment(self):
        self.n += 1
        
counter = Counter()

def work(counter):
    for i in xrange(100000):
        counter.increment()

    
t1 = Thread(target=work, args=(counter,))
t2 = Thread(target=work, args=(counter,))

# thread 시작
t1.start()
t2.start()

# thread 대기
t1.join()
t2.join()

# 두 쓰레드가 모두 종료되고 나면 counter.n의 값은 200,000이 되어야 한다.
# 결과는?
print 'expected counter is {} and real counter is {}'.format(200000, counter.n)

* Thread 동기화
 - thread 사이의 race condition을 방지하기 위해 동기화 필요
 - 한 객체에 복수개의 thread가 접근하는 것을 방지
 - threading.Lock객체 사용 가능
   - python의 가장 기본적인 동기화 도구
   - 기본적으로 locked, unlocked 두개의 상태를 갖는다
   - acquire, release 함수 제공
   - acuire
     - unlock 상태일 때 : 상태를 lock으로 변경
     - lock 상태일 때 : 다른 thread에서 release를 호출할때까지 block
   - release
     - lock 상태일 때 : 상태를 unlock으로 변경
     - unlock 상태일 때 : ThreadError 발생

In [None]:
from threading import Thread
from threading import Lock

class Counter(object):
    def __init__(self, n=0):
        self.lock = Lock()
        self.n = 0
        
    def increment(self):
        self.lock.acquire()
        try:
            self.n += 1
        finally:
            self.lock.release()
        
counter = Counter()

def work(counter):
    for i in xrange(100000):
        counter.increment()
    
t1 = Thread(target=work, args=(counter,))
t2 = Thread(target=work, args=(counter,))

# thread 시작
t1.start()
t2.start()

# thread 대기
t1.join()
t2.join()

# 두 쓰레드가 모두 종료되고 나면 counter.n의 값은 200,000이 되어야 한다.
# 결과는?
print 'expected counter is {} and real counter is {}'.format(200000, counter.n)

* Lock with 'with' keyword

In [None]:
from threading import Thread
from threading import Lock

class Counter(object):
    def __init__(self, n=0):
        self.lock = Lock()
        self.n = 0
        
    def increment(self):
        with self.lock:
            self.n += 1
        
counter = Counter()

def work(counter):
    for i in xrange(100000):
        counter.increment()
    
t1 = Thread(target=work, args=(counter,))
t2 = Thread(target=work, args=(counter,))

# thread 시작
t1.start()
t2.start()

# thread 대기
t1.join()
t2.join()

# 두 쓰레드가 모두 종료되고 나면 counter.n의 값은 200,000이 되어야 한다.
# 결과는?
print 'expected counter is {} and real counter is {}'.format(200000, counter.n)

 ## MultiThreadedCrawler 실습
  - crawler를 multithread로 다시 구현해봅시다
  - 어느 부분이 병목 현상인가요?
  - 성능의 향상이 있나요?
  - 있다면 왜 있을까요? 없다면 왜 없을까요?
  - 다른 상황에서도 성능의 효과가 동일할까요?

* Sequential CPU Intensive(Bound) work

In [None]:
def work(number):
    result = [1]
    
    for i in xrange(2, number+1):
        if number % i == 0:
            result.append(i)
    return result

* 연습문제) 위의 work 함수는 어떤 기능을 제공하는지 말하시오.

In [None]:
def run_cpu_intensive():
    numbers = [2400, 2650, 5500, 9000, 7897, 12000, 14000, 20000, 105443]
    for num in numbers:
        work(num)
        
%timeit run_cpu_intensive()

* Concurrent CPU intensive(Bound) work

In [None]:
import threading
def run_cpu_intensive_concurrent():
    numbers = [2400, 2650, 5500, 9000, 7897, 12000, 14000, 20000, 105443]
    
    threads = []    
    for num in numbers:
        t = threading.Thread(target=work, args=(num,))
        threads.append(t)
        t.start()
        
    for t in threads:
        t.join()
        
%timeit run_cpu_intensive_concurrent()
        
    

* Sequential I/O intensive(Bound) work

In [None]:
import requests

In [None]:
def get_html(url):
    res = requests.get(url)
    return res.text

In [None]:
print get_html('http://www.naver.com')

In [None]:
urls = [
    'http://www.naver.com',
    'http://www.naver.com',
    'http://www.naver.com',
    'http://www.daum.com',
    'https://www.google.com',
    'http://www.nba.com'
]

def run_io_intensive():
    for url in urls:
        get_html(url)
        
%timeit run_io_intensive()

* Concurrent I/O Intensive(Bound) work

In [None]:
import threading
def run_io_intensive_concurrent():
    threads = []
    for url in urls:
        t = threading.Thread(target=get_html, args=(url,))
        threads.append(t)
        
        t.start()
        
    for t in threads:
        t.join()
        
%timeit run_io_intensive_concurrent()   

* Queue
 -  thread-safe Queue data structure
   - Queue란?
      - FIFO(First-in First-out) 데이터 구조
      - 즉, 먼저 들어간 순서대로 나가는 시퀀스를 의미
      - enque, deque의 연산을 제공
      - 예제) 은행이나 마트의 대기열
   - Stack이란? (참고용)
      - LIFO (Last-in First-out) 데이터 구조
      - 즉, 먼저 들어간 아이템이 가장 마지막에 나오는 시퀀스를 의미
      - push, pop의 연산을 제공
      - 예제) 차곡차곡 개어서 쌓아둔 옷
   - thread-safe란?
      - multithreaded 환경에서 shared-resource의 상태가 의도된 대로 유지되는 것을 보장하는 것을 의미
      - 쉽게 말하면, 사용하는 입장에서 thread의 동기화에 신경쓸 필요가 없음
   - Produce-Consumer pattern에 주로 사용
      - thread 사이에 데이터를 주고 받는 방법을 의미
      - 작업할 내용을 Queue에 담고(Producer), 그 것을 하나씩 꺼내서(Consumer) 해당 작업을 수행
      - 1 Producer vs Many Consumers
      - Many Producers vs Many Consumers
      - 위의 두 상황이 가장 일반적인 모델
      - 뉴스 댓글 크롤링에 이용 가능
         - 어떻게 이용할 수 있을까요?
   - from Queue import Queue로 사용
   
   
 - https://docs.python.org/2/library/queue.html

In [None]:
from Queue import Queue

In [None]:
# queue 사이트 정의
simple_q = Queue(maxsize=3)

# queue에 아이템 삽입
simple_q.put(1)
simple_q.put(2)
simple_q.put(3)
#simple_q.put(4)

# queue에서 아이템 추출
print simple_q.get()

# 종료를 의미 작업 종료 후, 명시적으로 호출해야 함
simple_q.task_done()

In [None]:
from Queue import Queue

def do_stuff(q):
    # queue 안에 내용이 있는지 검사하여 있으면
    while not q.empty():
        # 해당 내용을 가져옴
        print q.get()
        
        # 꺼낸 아이템이 처리 되었음을 알림 (호출되어야 함)
        q.task_done()

q = Queue(maxsize=0)

for x in range(20):
    q.put(x)

do_stuff(q)

In [None]:
from Queue import Queue
from threading import Thread

def do_stuff(q):
    while True:
        print q.get()
        q.task_done()

q = Queue(maxsize=0)
num_threads = 10

for i in range(num_threads):
    worker = Thread(target=do_stuff, args=(q,))
    
    # background 로 설정, 그렇지 않으면 process 종료되지 않음
    worker.setDaemon(True)
    worker.start()

for x in range(50):
    q.put(x)

# queue의 모든 아이템이 처리될때까지 block
q.join()

* 연습문제)
 - Queue와 multithread를 사용하여 comment crawler를 완성하세요.

## MultiProcess
 - 말그대로 여러개의 독립된 process를 의미
 - thread level이 아닌 process level의 병렬성
 - process를 생성하기 때문에 thread에 비해 자원이 소요가 큼
 - python에서 진정한 의미의 병렬성을 위해서는 multiprocess를 사용
 
https://docs.python.org/2/library/multiprocessing.html

* multiprocessing module

In [None]:
import os

def work(number):
    result = [1]
    
    #print '{} is calculating {}'.format(os.getpid(), number)
    
    for i in xrange(2, number+1):
        if number % i == 0:
            result.append(i)
    return result

# 위에서 사용한 약수 구하기 예제

* sequential, multithread, multiprocess비교

In [None]:
numbers = [240000, 265000, 550000, 9000000, 78970000, 12000, 14000, 20000, 105443]

In [None]:
def run_cpu_intensive():
    for num in numbers:
        work(num)
        
%timeit run_cpu_intensive()

In [None]:
import threading
def run_cpu_intensive_concurrent():
    threads = []    
    for num in numbers:
        t = threading.Thread(target=work, args=(num,))
        threads.append(t)
        t.start()
        
    for t in threads:
        t.join()
        
%timeit run_cpu_intensive_concurrent()

In [None]:
from multiprocessing import Process

def run_cpu_intensive_multiprocess():    
    procs = []    
    for num in numbers:
        p = Process(target=work, args=(num,))
        procs.append(p)
        p.start()
        
    for p in procs:
        p.join()
        
%timeit run_cpu_intensive_multiprocess()

## Pool 
 - process pool을 의미
 - 일반적으로 Computer 영역에서 Pool은 그때그때 생성하는 것이 아닌 미리 생성해두고, 필요할 때마다 해당 pool에서 가져다 쓰는 의미로 사용됨
 - 위와 같이 하면, 초기 비용은 증가하나 그 후로는 생성비용이 적다는 장점이 있음
 - 함수와 데이터를 쉽게 병렬화 가능케 함

In [None]:
from multiprocessing import Pool

def run_cpu_intensive_multiprocess_pool():    
    # 크기 4인 process pool 생성
    pool = Pool(processes=4)
    
    # 함수와 iterable을 병렬화 하여 수행
    return pool.map(work, numbers)
        
%timeit run_cpu_intensive_multiprocess_pool()

# 반환값은 매핑된 리스트
#print run_cpu_intensive_multiprocess_pool()

In [None]:
from multiprocessing import Pool

def run_cpu_intensive_multiprocess_pool():    
    pool = Pool(processes=20)
    pool.map(work, numbers)
        
%timeit run_cpu_intensive_multiprocess_pool()

## 결론
 - Thread - light-weight process
 - Python(CPython)은 GIL 때문에 한번에 하나의 thread만 실행됨
 - multithread 환경에서 공유자원에 접근하는 것을 race condition이라고 함
 - race condition 방지를 위해 thread synchronization(동기화) 필요
 - thread, process간의 메시지 전달은 Queue를 사용하여 전달 (Producer, Consumer 구조)
 - I/O intensive한 작업은 queue와 multithread를 이용하는 것이 더 효율적
 - CPU intensive한 경우는 multiprocess를 선별적으로 사용