# 비동기 작업 실행
* 순차실행과 Concurrent.future 비교로 수행속도 차이를 비교해보자

In [2]:
import os
import time
import sys
import csv

# 추출 국가 정보
NATION_LS = 'Singapore Germany Israel Norway Canada France Spain Mexico'.split()
# 대상 CSV 경로
TARGET_CSV = './resources/nations.csv'
# 저장 폴더 위치
DEST_DIR = './csvs/'
# CSV 파일 헤더
HEADER = ['Region','Country','Item Type','Sales Channel','Order Priority','Order Date','Order ID','Ship Date','Units Sold','Unit Price','Unit Cost','Total Revenue','Total Cost','Total Profit']

### 순차 실행

In [3]:
# 국가별 csv파일 저장
def save_csv(data, filename):
    # 최종 경로 생성
    path = os.path.join(DEST_DIR, filename)
    
    with open(path, 'w', newline='') as  fp:
        writer = csv.DictWriter(fp, fieldnames=HEADER)
        writer.writeheader()
        for row in data:
            writer.writerow(row)

def get_sales_data(nt):
    with open(TARGET_CSV, 'r') as f:
        reader = csv.DictReader(f)
        # dict를 리스트로 적재
        data = []
        # Header 확인
        #print(reader.fieldnames)
        for r in reader:
            #print(r)
            if r['Country'] == nt:
                data.append(r)
    return data

def show(text):
    print(text, end=' ')
    sys.stdout.flush()

def seperate_many(nt_list):
    for nt in sorted(nt_list):
        # 분리 데이터
        data = get_sales_data(nt)
        # 상황 출력
        show(nt)
        # 파일 저장
        save_csv(data, nt.lower()+'.csv')
        
    return len(nt_list)

def main(seperate_many):
    # 시작시간
    start_tm = time.time()
    # 결과 건수
    result_cnt = seperate_many(NATION_LS)
    # 종료 시간
    end_tm = time.time() - start_tm
    
    msg = '\n{} csv seperated in {:.2f}s'.format(result_cnt, end_tm)
    # 최종 결과 출력
    print(msg)    

if __name__ == '__main__':
    main(seperate_many)

Canada France Germany Israel Mexico Norway Singapore Spain 
8 csv seperated in 30.63s


### Concurrent.future 방법1 (map을 통해)=> THreadPoolExecutor, ProcessPoolExecutor
* 서로 다른 스레드 또는 프로세스에서 실행 가능
* 내부 과정 알 필요가 없으며, 고수준으로 인터페이스 제공
* 지연시간(block) CPU 및 리소스 낭비 방지 -> FIle I/O, Network I/O 관련 작업 동시성 활용 권장
* 적합한 작업일 경우 순차 진행보다 압도적인 성능 향상

In [4]:
from concurrent import futures

In [4]:
# 국가별 csv파일 저장
def save_csv(data, filename):
    # 최종 경로 생성
    path = os.path.join(DEST_DIR, filename)
    
    with open(path, 'w', newline='') as  fp:
        writer = csv.DictWriter(fp, fieldnames=HEADER)
        writer.writeheader()
        for row in data:
            writer.writerow(row)

def get_sales_data(nt):
    with open(TARGET_CSV, 'r') as f:
        reader = csv.DictReader(f)
        # dict를 리스트로 적재
        data = []
        # Header 확인
        #print(reader.fieldnames)
        for r in reader:
            #print(r)
            if r['Country'] == nt:
                data.append(r)
    return data

def show(text):
    print(text, end=' ')
    sys.stdout.flush()

def seperate_many(nt):
    # 분리 데이터
    data = get_sales_data(nt)
    # 상황 출력
    show(nt)
    # 파일 저장
    save_csv(data, nt.lower()+'.csv')
        
    return nt

def main(seperate_many):
    # worker 개수 
    worker = min(20, len(NATION_LS))
    # 시작시간
    start_tm = time.time()
    # 결과 건수
    with futures.ThreadPoolExecutor(worker) as executor:
        resuit_cnt = executor.map(seperate_many, sorted(NATION_LS))
    # 종료 시간
    end_tm = time.time() - start_tm
    
    msg = '\n{} csv seperated in {:.2f}s'.format(list(resuit_cnt), end_tm)
    # 최종 결과 출력
    print(msg)    

if __name__ == '__main__':
    main(seperate_many)

Norway France Germany Israel Spain Singapore Mexico Canada 
['Canada', 'France', 'Germany', 'Israel', 'Mexico', 'Norway', 'Singapore', 'Spain'] csv seperated in 27.30s


* 위의 코드는 순차 실행할때보다 속도가 늦다. 
* 이유 : 스레드 9개가 nations.csv 파일 1개만 바라보기 때문에 mutex lock이 걸려서 + Context Switching 비용 발생으로 늦어지게됨(Pyhton GiL)
* 파일이 9개였다면 더 빠르겠지만 아니다.
* GIL을 우회하기 하기 위해 멀티 프로세싱 방법을 사용!

* ThreadPoolExecutor : GIL 종속
* ProcessPoolExecutor : GIL 우회, 변경 후 ->

### ProcessPoolExecutor로 실행
* 단!! 쥬피터로는 동작은 안되고 py파일을 만들어서 동작해야한다

In [12]:
import os
import time
import sys
import csv
from concurrent import futures

# 추출 국가 정보
NATION_LS = 'Singapore Germany Israel Norway Canada France Spain Mexico'.split()
# 대상 CSV 경로
TARGET_CSV = './resources/nations.csv'
# 저장 폴더 위치
DEST_DIR = './csvs/'
# CSV 파일 헤더
HEADER = ['Region','Country','Item Type','Sales Channel','Order Priority','Order Date','Order ID','Ship Date','Units Sold','Unit Price','Unit Cost','Total Revenue','Total Cost','Total Profit']

# 국가별 csv파일 저장
def save_csv(data, filename):
    # 최종 경로 생성
    path = os.path.join(DEST_DIR, filename)
    
    with open(path, 'w', newline='') as  fp:
        writer = csv.DictWriter(fp, fieldnames=HEADER)
        writer.writeheader()
        for row in data:
            writer.writerow(row)

def get_sales_data(nt):
    with open(TARGET_CSV, 'r') as f:
        reader = csv.DictReader(f)
        # dict를 리스트로 적재
        data = []
        # Header 확인
        #print(reader.fieldnames)
        for r in reader:
            #print(r)
            if r['Country'] == nt:
                data.append(r)
    return data

def show(text):
    print(text, end=' ')
    sys.stdout.flush()

def seperate_many(nt):
    # 분리 데이터
    data = get_sales_data(nt)
    # 상황 출력
    show(nt)
    # 파일 저장
    save_csv(data, nt.lower()+'.csv')
        
    return nt

def main(seperate_many):
    # worker 개수 
    worker = min(20, len(NATION_LS))
    # 시작시간
    start_tm = time.time()
    # 결과 건수
    with futures.ProcessPoolExecutor() as executor:
        # map -> 작업 순서 유지, 즉시 실행
        result_cnt = executor.map(seperate_many, sorted(NATION_LS))
    # 종료 시간
    end_tm = time.time() - start_tm
    
#     msg = "\n{} csv seperate in {:.2f}s"
#     print(msg.format(list(result_cnt), end_tm))  

if __name__ == '__main__':
    main(seperate_many)

### Concurrent.future 방법2 (submit을 통해)=> THreadPoolExecutor, ProcessPoolExecutor
* 마찬가지로 쥬피터 노트북 아닌 .py로 결과 확인해야 한다!

In [16]:
import os
import time
import sys
import csv
from concurrent import futures

# 추출 국가 정보
NATION_LS = 'Singapore Germany Israel Norway Canada France Spain Mexico'.split()
# 대상 CSV 경로
TARGET_CSV = './resources/nations.csv'
# 저장 폴더 위치
DEST_DIR = './csvs/'
# CSV 파일 헤더
HEADER = ['Region','Country','Item Type','Sales Channel','Order Priority','Order Date','Order ID','Ship Date','Units Sold','Unit Price','Unit Cost','Total Revenue','Total Cost','Total Profit']

# 국가별 csv파일 저장
def save_csv(data, filename):
    # 최종 경로 생성
    path = os.path.join(DEST_DIR, filename)
    
    with open(path, 'w', newline='') as  fp:
        writer = csv.DictWriter(fp, fieldnames=HEADER)
        writer.writeheader()
        for row in data:
            writer.writerow(row)

def get_sales_data(nt):
    with open(TARGET_CSV, 'r') as f:
        reader = csv.DictReader(f)
        # dict를 리스트로 적재
        data = []
        # Header 확인
        #print(reader.fieldnames)
        for r in reader:
            #print(r)
            if r['Country'] == nt:
                data.append(r)
    return data

def show(text):
    print(text, end=' ')
    sys.stdout.flush()

def seperate_many(nt):
    # 분리 데이터
    data = get_sales_data(nt)
    # 상황 출력
    show(nt)
    # 파일 저장
    save_csv(data, nt.lower()+'.csv')
        
    return nt

def main(seperate_many):
    # worker 개수 
    worker = min(20, len(NATION_LS))
    # 시작시간
    start_tm = time.time()
    # 결과 건수
    future_list = []
    with futures.ProcessPoolExecutor() as executor:
        # Submit -> Callable 객체 스케줄링(실행 예약) -> future로 반환
        # future -> result(), drop(), as_complete() 주로 사용
        for nt in sorted(NATION_LS):
            # future 반환
            future = executor.submit(seperate_many, nt)
            # 스케줄링
            future_list.append(future)
            # 출력1
            print("Scheduled for {} : {}".format(nt, future))
            print()
        
        for future in futures.as_completed(future_list):
            result = future.result()
            done = future.done()
            cancelled = future.cancelled
            print("Future Result : {}, Done : {}".format(result, done))
            print("Future Cancelled : {}".format(cancelled))
            print()

    #종료 시간
    end_tm = time.time() - start_tm

    msg = "\n{} csv seperate in {:.2f}s"
    print(msg.format(list(future_list), end_tm))

if __name__ == '__main__':
    main(seperate_many)

Scheduled for Canada : <Future at 0x269ba878808 state=running>

Scheduled for France : <Future at 0x269ba8c5848 state=pending>

Scheduled for Germany : <Future at 0x269ba8c9988 state=pending>

Scheduled for Israel : <Future at 0x269ba8c9a08 state=pending>

Scheduled for Mexico : <Future at 0x269ba8be5c8 state=pending>

Scheduled for Norway : <Future at 0x269ba88bc48 state=pending>

Scheduled for Singapore : <Future at 0x269ba88bc88 state=pending>

Scheduled for Spain : <Future at 0x269ba88eb08 state=pending>



BrokenProcessPool: A process in the process pool was terminated abruptly while the future was running or pending.