# chapter 5. Using Multiprocessing and ProcessPoolExecutor

## 리소스 (resource)

![resource](./images/resource.png)

- 공유 리소스 : 네트워크 사용자가 사용할 수 있는 폴더, 파일, 프린터 및 명명된 파이프와 같은 모든 리소스. 
   ( https://technet.microsoft.com/ko-kr/library/cc772501.aspx )

## Thread vs process 

http://www.tutorialspoint.com/operating_system/os_multi_threading.htm
    
![threadvsprocess](http://www.tutorialspoint.com/operating_system/images/thread_processes.jpg)



![threadex2](./images/11-4.bmp)

![threadex3](./images/11-5.bmp)

![threadex](./images/11-6.bmp)

## thread

user-level thread vs kernel-level thread

https://kldp.org/node/295

http://www.tldp.org/FAQ/Threads-FAQ/Types.html

### 1) kernel-level thread 모델
![kernelthread1](./images/11-8.bmp)

### 2) user-level thread 모델

- OS 에서 thread 지원안함. 별도의 thread library 사용하여 thread 생성하고 관리함.
![kernelthread2](./images/11-10.bmp)

### 3) Threads and processes 모식도
http://stackoverflow.com/a/29275453

![threadprocess](http://i.stack.imgur.com/IlJZP.png)


## inter-process communication (IPC)

http://en.wikipedia.org/wiki/Inter-process_communication

## 선점형 vs 비선점형 OS


<a href="http://ko.wikipedia.org/wiki/%EC%8A%A4%EC%BC%80%EC%A4%84%EB%A7%81_(%EC%BB%B4%ED%93%A8%ED%8C%85)"> 내용보기 </a>

###  ** 이 책의 예제들은 python 3 기준입니다... 그래도 저는 python 2.7 & windows 사용!!!
###       아래의 모든 예제들은 윈도우즈에서는 terminal 이나, pycharm 에서 실행하세요!!

In [2]:
import platform

platform.python_version()

'2.7.6'

## 참고 : multiprocessing 예제

http://pymotw.com/2/multiprocessing/index.html


# Understanding the concept of a process


## 1. Understanding the process model

- Process Control Block (PCB), which stores information referring to processes.

http://www.cs.uic.edu/~jbell/CourseNotes/OperatingSystems/3_Processes.html

http://www.tutorialspoint.com/operating_system/os_processes.htm


![process_state](http://www.tutorialspoint.com/operating_system/images/pcb.jpg)

![process_state](http://www.cs.uic.edu/~jbell/CourseNotes/OperatingSystems/images/Chapter3/3_04_ProcessSwitch.jpg)

## 2. Defining the states of a process

    - Running state
    - Ready state 
    - Waiting (or Blocked) state


http://www.cs.uic.edu/~jbell/CourseNotes/OperatingSystems/3_Processes.html
http://www.personal.kent.edu/~rmuhamma/OpSystems/Myos/processState.htm

![process_state](http://www.cs.uic.edu/~jbell/CourseNotes/OperatingSystems/images/Chapter3/3_02_ProcessState.jpg)


# Implementing multiprocessing communication

https://docs.python.org/2.7/library/multiprocessing.html#exchanging-objects-between-processes

## 1. Using multiprocessing.Pipe

- The two connection objects returned by Pipe() represent the two ends of the pipe. 

In [3]:
# %load multiprocessing_pipe.py
import os, random
from multiprocessing import Process, Pipe


def producer_task(conn):
    value = random.randint(1, 10)
    conn.send(value)
    print('Value [%d] sent by PID [%d]' % (value, os.getpid()))
    conn.close()

def consumer_task(conn):
    print('Value [%d] received by PID [%d]' % (conn.recv(), os.getpid()))

if __name__ == '__main__':
    producer_conn, consumer_conn = Pipe()
    consumer = Process(target=consumer_task, args=(consumer_conn,))
    producer = Process(target=producer_task, args=(producer_conn,))
    
    consumer.start()
    producer.start()
    
    consumer.join()
    producer.join()



Value [9] sent by PID [6570]
Value [9] received by PID [6569]


In [4]:
%run ./src/multiprocessing_pipe.py

Value [10] sent by PID [6587]
Value [10] received by PID [6586]


## 2. Understanding multiprocessing.Queue

- The Queue class is a near clone of Queue.Queue.
- <span style="color:blue; weight:bold">Queues</span> are thread and process safe.

### <참고> Queue module 내부 살펴보기

https://docs.python.org/2/library/queue.html#Queue.Queue

소스보기 : https://hg.python.org/cpython/file/2.7/Lib/Queue.py

- 내부적으로 동기화 사용하고 있음을 알수있다. 따라서 thread 와 process 에 safe 하다!
   
   --> 그렇다면, 왜 4장 thread 예제에서 fibonacci 수열 구할때, condition 으로 동기화 사용할 필요있을까????? 

# Using multiprocessing to compute Fibonacci series terms with multiple inputs

- pipe, queue 는 메세지 전달 방식.
- 다른 방식을 사용해보자. Manager 사용.

### Manager

- A manager returned by Manager() will support types <span style="color:blue;"> list, dict, Namespace, Lock, RLock, Semaphore, BoundedSemaphore, Condition, Event, Queue, Value and Array</span>.( https://docs.python.org/2.7/library/multiprocessing.html#sharing-state-between-processes )

- Manager 예제 : http://pymotw.com/2/multiprocessing/communication.html#managing-shared-state



In [5]:
# %load multiprocessing_fibonacci.py

import sys, logging, time, os, random
from multiprocessing import Process, Queue, Pool, cpu_count, current_process, Manager

logger = logging.getLogger()
logger.setLevel(logging.DEBUG)
formatter = logging.Formatter('%(asctime)s - %(message)s')

ch = logging.StreamHandler()
ch.setLevel(logging.DEBUG)
ch.setFormatter(formatter)
logger.addHandler(ch)

def producer_task(q, fibo_dict):
    for i in range(15):
        value = random.randint(1, 20)
        fibo_dict[value] = None
        logger.info("Producer [%s] putting value [%d] into queue.. "
                % (current_process().name, value))
        q.put(value)

def consumer_task(q, fibo_dict):
    while not q.empty():
        value = q.get(True, 0.05)
        a, b = 0, 1
        for item in range(value):
            a, b = b, a + b
            fibo_dict[value] = a
        logger.info("consumer [%s] getting value [%d] from queue..."
                    % (current_process().name, value))

if __name__ == '__main__':
    data_queue = Queue()
    number_of_cpus = cpu_count()
    manager = Manager()
    fibo_dict = manager.dict()
    
    producer = Process(target=producer_task, args=(data_queue, fibo_dict))
    producer.start()
    producer.join()
    
    consumer_list = []
    for i in range(number_of_cpus):
        consumer = Process(target=consumer_task, args=(data_queue, fibo_dict))
        consumer.start()
        consumer_list.append(consumer)
    
    [consumer.join() for consumer in consumer_list]
    
    logger.info(fibo_dict)


INFO:root:Producer [Process-8] putting value [12] into queue.. 
2015-04-02 16:31:55,635 - Producer [Process-8] putting value [12] into queue.. 
INFO:root:Producer [Process-8] putting value [19] into queue.. 
2015-04-02 16:31:55,647 - Producer [Process-8] putting value [19] into queue.. 
INFO:root:Producer [Process-8] putting value [3] into queue.. 
2015-04-02 16:31:55,655 - Producer [Process-8] putting value [3] into queue.. 
INFO:root:Producer [Process-8] putting value [9] into queue.. 
2015-04-02 16:31:55,663 - Producer [Process-8] putting value [9] into queue.. 
INFO:root:Producer [Process-8] putting value [5] into queue.. 
2015-04-02 16:31:55,671 - Producer [Process-8] putting value [5] into queue.. 
INFO:root:Producer [Process-8] putting value [2] into queue.. 
2015-04-02 16:31:55,679 - Producer [Process-8] putting value [2] into queue.. 
INFO:root:Producer [Process-8] putting value [17] into queue.. 
2015-04-02 16:31:55,687 - Producer [Process-8] putting value [17] into queue.. 


# Crawling the Web using ProcessPoolExecutor

https://docs.python.org/3/library/concurrent.futures.html

ProcessPoolExecutor 소스보기 : https://hg.python.org/cpython/file/3.4/Lib/concurrent/futures/process.py

https://hg.python.org/cpython/file/3.4/Lib/concurrent/futures/

https://hg.python.org/cpython/file/47451f6e7e75/Lib/concurrent/futures/_base.py

In [7]:
# %load process_pool_executor_web_crawler.py

import sys, time, random, re, requests
import concurrent.futures
from multiprocessing import Queue, cpu_count, current_process, Manager


def group_urls_task(urls, result_dict, html_link_regex):
    try:
        url = urls.get(True, 0.05)
        result_dict[url] = None
        print("[%s] putting url [%s] in dictionary..." % (
            current_process().name, url))
    except Queue.Empty:   # queue  --> Queue 수정
        print('Nothing to be done, queue is empty')

def crawl_task(url, html_link_regex):
    links = []
    try:
        request_data = requests.get(url)
        print("[%s] crawling url [%s] ..." % (
            current_process().name, url))
        links = html_link_regex.findall(request_data.text)
    except:
        print(sys.exc_info())
        raise
    finally:
        return (url, links)

if __name__ == '__main__':
    manager = Manager()
    urls = manager.Queue()
    urls.put('http://www.google.com')
    urls.put('http://br.bing.com/')
    urls.put('https://duckduckgo.com/')
    urls.put('https://github.com/')
    urls.put('http://br.search.yahoo.com/')
    result_dict = manager.dict()
    
    html_link_regex = \
        re.compile('<a\s(?:.*?\s)*?href=[\'"](.*?)[\'"].*?>')    #  *?  --> “non-greedy” versions
    
    number_of_cpus = cpu_count()
    
    with concurrent.futures.ProcessPoolExecutor(max_workers=number_of_cpus) as group_link_processes:
        for i in range(urls.qsize()):
            group_link_processes.submit(group_urls_task, urls, result_dict, html_link_regex)
    
    with concurrent.futures.ProcessPoolExecutor(max_workers=number_of_cpus) as crawler_link_processes:
        future_tasks = {crawler_link_processes.submit(crawl_task, url, html_link_regex): url for url in result_dict.keys()}
        for future in concurrent.futures.as_completed(future_tasks):
            result_dict[future.result()[0]] = future.result()[1]

    for url, links in result_dict.items():
        print("[%s] with links : [%s..." % (url, links[0]))

[Process-13] putting url [http://br.bing.com/] in dictionary...
[Process-12] putting url [http://www.google.com] in dictionary...
[Process-13] putting url [https://github.com/] in dictionary...
[Process-12] putting url [https://duckduckgo.com/] in dictionary...
[Process-12] putting url [http://br.search.yahoo.com/] in dictionary...
[http://www.google.com] with links : [http://www.google.co.kr/imghp?hl=ko&tab=wi...
[http://br.search.yahoo.com/] with links : [https://br.yahoo.com/...
[https://github.com/] with links : [#start-of-content...
[https://duckduckgo.com/] with links : [/about...
[http://br.bing.com/] with links : [javascript:void(0)...
[Process-14] crawling url [http://www.google.com] ...
[Process-15] crawling url [http://br.search.yahoo.com/] ...
[Process-14] crawling url [https://github.com/] ...[Process-15] crawling url [https://duckduckgo.com/] ...

[Process-15] crawling url [http://br.bing.com/] ...


INFO:urllib3.connectionpool:Starting new HTTP connection (1): www.google.com
INFO:urllib3.connectionpool:Starting new HTTP connection (1): br.search.yahoo.com
2015-04-02 16:33:45,420 - Starting new HTTP connection (1): www.google.com
2015-04-02 16:33:45,434 - Starting new HTTP connection (1): br.search.yahoo.com
DEBUG:urllib3.connectionpool:Setting read timeout to None
DEBUG:urllib3.connectionpool:Setting read timeout to None
2015-04-02 16:33:45,488 - Setting read timeout to None
2015-04-02 16:33:45,527 - Setting read timeout to None
DEBUG:urllib3.connectionpool:"GET / HTTP/1.1" 302 261
DEBUG:urllib3.connectionpool:"GET / HTTP/1.1" 200 5498
2015-04-02 16:33:45,534 - "GET / HTTP/1.1" 302 261
2015-04-02 16:33:45,831 - "GET / HTTP/1.1" 200 5498
INFO:urllib3.connectionpool:Starting new HTTP connection (1): www.google.co.kr
INFO:urllib3.connectionpool:Starting new HTTPS connection (1): duckduckgo.com
2015-04-02 16:33:45,536 - Starting new HTTP connection (1): www.google.co.kr
2015-04-02 16:

#### python 2 용 concurrent.futures

ubuntu 14.04 LTS 에서 설치하기

$ sudo pip install futures

https://pypi.python.org/pypi/futures/2.2.0

http://pythonhosted.org//futures/

### 참고 : Regular Expression  - Greedy versus Non-Greedy

https://docs.python.org/2/howto/regex.html#greedy-versus-non-greedy

# 참고 : zeromq

http://zeromq.org/

- PyZMQ
    - the Python bindings for ØMQ.
    - ipython notebook 설치시 함께 설치됨.


http://learning-0mq-with-pyzmq.readthedocs.org/en/latest/pyzmq/pyzmq.html

In [8]:
from IPython.display import YouTubeVideo

YouTubeVideo('_JCBphyciAs',width=800, height=600)