# 05. 병렬 처리하기
- 여러 개의 페이지를 1개씩 내용을 추출하면 시간이 오래 걸림
- CPU, 메모리, 네트워크 대역 상황에 따라 병렬 처리하여 시간을 줄일 수 있음
- 병행(Concurrent) : 여러 개의 스레드를 사용해서 여러 처리를 하는 것
- 병렬(Parallel) : 여러 개의 프로세스를 사용해서 여러 처리를 하는 것

## 표준 라이브러리 사용하기(한 대의 머신 병렬화하기)
- 표준 병렬 처리 라이브러리 : multithread, multiprocess, concurrent.Future 등
- 비동기 처리 라이브러리 : asyncio(콜백을 사용하여 다중 처리를 구현)
- 병렬 처리는 매우 복잡하기 때문에 크롤링을 배우는 것보다 더 오래 걸릴 수 있음
- 그중에서도 가장 간단한 concurrent.Future 라이브러리 사용
    - 어떤 처리의 결과가 이후에 추출된다는 것을 전제로 처리를 구현할 수 있게 해주는 기능
    - 결과를 기다리지 않고 곧바로 후속 처리를 실행할 수 있음
    - 병렬 처리를 멀티 스레드로 하고 싶을 때는 `ThreadPoolExecutor` 메서드를, 멀티 프로세스로 하고 싶을 때는 `ProcessPoolExecutor` 메서드 사용

## 병렬로 내려받기
- 프랑스 고전 음악 작곡가인 모리스 라벨의 곡 중에서 저작권이 소멸된 곡을 archive.org에서 병렬로 내려받기
- https://archive.org/details/ThePianoMusicOfMauriceRavel

In [1]:
import concurrent.futures
import random
import time
from collections import namedtuple
from os import path
from urllib import parse
import requests
from my_logging import get_my_logger

logger = get_my_logger(__name__)

# 음악 파일의 이름과 데이터를 저장하기 위한 이름이 있는 튜플 정의
Music = namedtuple('music', 'file_name, file_content')

# 크롤링 요청별 간격 리스트 정의
RANDOM_SLEEP_TIMES = [x * 0.1 for x in range(10, 40, 5)]

# 크롤링 대상 URL 리스트
MUSIC_URLS = [
    'https://archive.org/download/ThePianoMusicOfMauriceRavel/01PavanePourUneInfanteDfuntePourPianoMr19.mp3',
    'https://archive.org/download/ThePianoMusicOfMauriceRavel/02JeuxDeauPourPianoMr30.mp3',
    'https://archive.org/download/ThePianoMusicOfMauriceRavel/03SonatinePourPianoMr40-Modr.mp3',
    'https://archive.org/download/ThePianoMusicOfMauriceRavel/04MouvementDeMenuet.mp3',
    'https://archive.org/download/ThePianoMusicOfMauriceRavel/05Anim.mp3',
]

def download(url, timeout=180):
    # mp3 파일 이름 추출
    parsed_url = parse.urlparse(url)
    file_name = path.basename(parsed_url.path)
    
    # 요청 간격을 랜덤하게 선택
    sleep_time = random.choice(RANDOM_SLEEP_TIMES)
    
    # 내려받기 시작을 로그에 출력
    logger.info("[download start] sleep : {time} {file_name}".format(time=sleep_time, file_name=file_name))
    
    # 요청 대기
    time.sleep(sleep_time)
    
    # 음악 파일 내려받기
    r = requests.get(url, timeout=timeout)
    
    # 내려받기 종료를 로그에 출력
    logger.info("[download finished] {file_name}".format(file_name=file_name))
    
    # 이름 있는 튜플에 파일 이름과 mp3 데이터를 넣어 반환
    return Music(file_name=file_name, file_content=r.content)

In [2]:
if __name__ == '__main__':
    # 동시에 2개의 처리를 하기 위한 executor 생성
    with concurrent.futures.ThreadPoolExecutor(max_workers=2) as executor:
        logger.info("[main start]")
        
        # executor.submit()으로 download()함수를 병행 실행
        # download() 함수의 매개 변수로 music_url 전달
        # 병행 실행 처리의 결과는 futures 변수에 넣어둠
        futures = [executor.submit(download, music_url) for music_url in MUSIC_URLS]
        
        # download() 함수의 처리가 완료되면 반복문에서 반복해 결과 출력하기
        for future in concurrent.futures.as_completed(futures):
            # download() 함수의 실행 결과는 result() 메서드로 확인 가능
            music = future.result()
            
            # music.filename에는 mp3 파일의 파일 이름이 들어있음
            # 이를 사용해 music.file_content에 저장된 데이터를 파일로 저장함
            with open(music.file_name, 'wb') as fw:
                fw.write(music.file_content)
        logger.info("[main finished]")

[37m[INFO]	2021-10-06 10:51:59	[main start][0m
[37m[INFO]	2021-10-06 10:51:59	[download start] sleep : 1.0 01PavanePourUneInfanteDfuntePourPianoMr19.mp3[0m
[37m[INFO]	2021-10-06 10:51:59	[download start] sleep : 2.0 02JeuxDeauPourPianoMr30.mp3[0m
[37m[INFO]	2021-10-06 10:52:23	[download finished] 02JeuxDeauPourPianoMr30.mp3[0m
[37m[INFO]	2021-10-06 10:52:23	[download start] sleep : 1.0 03SonatinePourPianoMr40-Modr.mp3[0m
[37m[INFO]	2021-10-06 10:52:25	[download finished] 01PavanePourUneInfanteDfuntePourPianoMr19.mp3[0m
[37m[INFO]	2021-10-06 10:52:25	[download start] sleep : 2.5 04MouvementDeMenuet.mp3[0m
[37m[INFO]	2021-10-06 10:52:34	[download finished] 04MouvementDeMenuet.mp3[0m
[37m[INFO]	2021-10-06 10:52:34	[download start] sleep : 1.0 05Anim.mp3[0m
[37m[INFO]	2021-10-06 10:52:35	[download finished] 03SonatinePourPianoMr40-Modr.mp3[0m
[37m[INFO]	2021-10-06 10:52:48	[download finished] 05Anim.mp3[0m
[37m[INFO]	2021-10-06 10:52:48	[main finished][0m


### 참고

In [8]:
from collections import namedtuple

ex = namedtuple('example', 'ex1, ex2')
ex

__main__.example

In [9]:
ex1 = ex(1, 2)
ex1

example(ex1=1, ex2=2)

In [11]:
ex1.ex1, ex1.ex2

(1, 2)

In [1]:
from urllib import parse

url = 'https://archive.org/download/ThePianoMusicOfMauriceRavel/01PavanePourUneInfanteDfuntePourPianoMr19.mp3'
parsed_url = parse.urlparse(url)
parsed_url

ParseResult(scheme='https', netloc='archive.org', path='/download/ThePianoMusicOfMauriceRavel/01PavanePourUneInfanteDfuntePourPianoMr19.mp3', params='', query='', fragment='')

In [4]:
from os import path

file_name = path.basename(parsed_url.path)
file_name

'01PavanePourUneInfanteDfuntePourPianoMr19.mp3'

In [6]:
import requests

r = requests.get(url)

[1;30m[DEBUG]	2021-10-06 10:41:26	Starting new HTTPS connection (1): archive.org:443[0m
[1;30m[DEBUG]	2021-10-06 10:41:27	https://archive.org:443 "GET /download/ThePianoMusicOfMauriceRavel/01PavanePourUneInfanteDfuntePourPianoMr19.mp3 HTTP/1.1" 302 None[0m
[1;30m[DEBUG]	2021-10-06 10:41:27	Starting new HTTPS connection (1): ia600604.us.archive.org:443[0m
[1;30m[DEBUG]	2021-10-06 10:41:29	https://ia600604.us.archive.org:443 "GET /26/items/ThePianoMusicOfMauriceRavel/01PavanePourUneInfanteDfuntePourPianoMr19.mp3 HTTP/1.1" 200 6401677[0m


## 작업 큐(여러 개의 머신을 사용해서 병렬 처리하기)
- 태스크 큐의 구조와 병렬 프로그래밍에 대한 지식이 없다면 직접 구현하기 굉장히 힘드므로 서드 파티 라이브러리 사용
- Celery
    - 재시도 횟수, 유효 기간, 특정 워커와 큐를 연결하는 등 다양한 태스크 제어
    - RabbitMQ : 메시지 큐 구현 중 하나로, 높은 부하가 예상되는 잡 큐 시스템을 활용할 때 선택
    - Redis : KVS(Key-Value Store)로서 메시지 큐는 아니지만 큐로 활용할 수 있는 외부 저장소