In [10]:
# Chapter06-01
# 병행성(Concurrency)
# 이터레이터, 제너레이터
# Iterator, Generator

# 파이썬 반복 가능한 타입
# Collection, text file, list, Dict, Set, Tuple, Unpacking, *args... : iterable

#t = 'ABCDEFGHIJKLMNOPQRSTUVWXYZ'
t = 'ABCD'
for c in t:
    print(c)
    
# while
w = iter(t)
while True:
    try:
        print(next(w))
    except StopIteration:
        break
        
# 반복형 확인
from collections import abc

#print(dir(t))
print(hasattr(t, '__iter__'))
print(isinstance(t, abc.Iterable))

A
B
C
D
A
B
C
D
True
True


In [17]:
# next
class WordSplitter:
    def __init__(self, text):
        self._idx = 0
        self._text = text.split(' ')
        
    def __next__(self):
        #print('Called __next__')
        try:
            word = self._text[self._idx]
        except IndexError:
            raise StopIteration('Stopped Iteration')
        self._idx += 1
        return word
    def __repr__(self):
        return 'WordSplit(%s)' % (self._text)
    
wi = WordSplitter('Do today what you could do tommorrow')
print(wi)
print(next(wi))
print(next(wi))
print(next(wi))
print(next(wi))
print(next(wi))
print(next(wi))
print(next(wi))
#print(next(wi))

WordSplit(['Do', 'today', 'what', 'you', 'could', 'do', 'tommorrow'])
Do
today
what
you
could
do
tommorrow


In [22]:
# Generator 패턴
# 1. 지능형 리스트, 딕셔너리, 집합 -> 데이터량 증가 증가 후 메모리 사용량 증가 -> 제너레이터 사용권장
# 2. 단위 실행 가능한 코루틴(Corotine) 구현과 연동
# 3. 작은 메모리 조각 사용

class WordSplitGenerator:
    def __init__(self, text):
        self._text = text.split(' ')
        
    def __iter__(self):
        for word in self._text:
            yield word # generator
        return
    def __repr__(self):
        return 'WordSplitGenerator(%s)' % (self._text)
    
wg = WordSplitGenerator('Do today what you could do tommorrow')
wt = iter(wg)

print(wt, wg)
print(next(wt))
print(next(wt))
print(next(wt))
print(next(wt))
print(next(wt))
print(next(wt))
print(next(wt))
#print(next(wt))

<generator object WordSplitGenerator.__iter__ at 0x7f9f3d6318d0> WordSplitGenerator(['Do', 'today', 'what', 'you', 'could', 'do', 'tommorrow'])
Do
today
what
you
could
do
tommorrow


In [27]:
# Chapter06-02
# 병행성(Concurrency): 한 컴퓨터가 여러 일을 동시에 수행 -> 단일 프로그램 안에서 여러 일을 쉽게 해결
# 병렬성(Parallelism): 여러 컴퓨터가 여러 작업을 동시에 수행 -> 속도
# Generator Ex1
def generator_ex1():
    print('start')
    yield 'A Point'
    print('Continue')
    yield 'B Point'
    print('End')
    
temp = iter(generator_ex1())
#print(temp)
print(next(temp))
print(next(temp))
# print(next(temp))

# Generaotro Ex2
temp2 = [x * 3 for x in generator_ex1()]
temp3 = (x * 3 for x in generator_ex1())

print(temp2)
print(temp3)

for i in temp3:
    print(i)

start
A Point
Continue
B Point
start
Continue
End
['A PointA PointA Point', 'B PointB PointB Point']
<generator object <genexpr> at 0x7f9f3ee2da50>
start
A PointA PointA Point
Continue
B PointB PointB Point
End


In [38]:
# Generator Ex3(중요 함수)
# filterfalse, accumulate, chain, product, groupby...
import itertools
gen1 = itertools.count(1, 2.5)
print(next(gen1))
print(next(gen1))
print(next(gen1))
print(next(gen1))

# 조건
gen2 = itertools.takewhile(lambda n: n < 1000, itertools.count(1, 2.5))
for v in gen2:
    #print(v)
    pass

# 필터 반대
gen3 = itertools.filterfalse(lambda n: n < 3, [1,2,3,4,5])
for v in gen3:
    print(v)
    
# 누적 합계
gen4 = itertools.accumulate([x for x in range(1, 101)])
for v in gen4:
    print(v)
    
# 연결1
gen5 = itertools.chain('ABCDE', range(1, 11, 2))
print(list(gen5))

# 연결2
gen6 = itertools.chain(enumerate('ABCDE'))
print(list(gen6))

# 개별
gen7 = itertools.product('ABCDE')
print(list(gen7))

# 연산(경우의 수)
gen8 = itertools.product('ABCDE', repeat=2)
print(list(gen8))

# 그룹화
gen9 = itertools.groupby('AAABBCCCCDDEEE')
# print(list(gen9))
for chr, group in gen9:
    print(chr, ': ', list(group))

1
3.5
6.0
8.5
3
4
5
1
3
6
10
15
21
28
36
45
55
66
78
91
105
120
136
153
171
190
210
231
253
276
300
325
351
378
406
435
465
496
528
561
595
630
666
703
741
780
820
861
903
946
990
1035
1081
1128
1176
1225
1275
1326
1378
1431
1485
1540
1596
1653
1711
1770
1830
1891
1953
2016
2080
2145
2211
2278
2346
2415
2485
2556
2628
2701
2775
2850
2926
3003
3081
3160
3240
3321
3403
3486
3570
3655
3741
3828
3916
4005
4095
4186
4278
4371
4465
4560
4656
4753
4851
4950
5050
['A', 'B', 'C', 'D', 'E', 1, 3, 5, 7, 9]
[(0, 'A'), (1, 'B'), (2, 'C'), (3, 'D'), (4, 'E')]
[('A',), ('B',), ('C',), ('D',), ('E',)]
[('A', 'A'), ('A', 'B'), ('A', 'C'), ('A', 'D'), ('A', 'E'), ('B', 'A'), ('B', 'B'), ('B', 'C'), ('B', 'D'), ('B', 'E'), ('C', 'A'), ('C', 'B'), ('C', 'C'), ('C', 'D'), ('C', 'E'), ('D', 'A'), ('D', 'B'), ('D', 'C'), ('D', 'D'), ('D', 'E'), ('E', 'A'), ('E', 'B'), ('E', 'C'), ('E', 'D'), ('E', 'E')]
A :  ['A', 'A', 'A']
B :  ['B', 'B']
C :  ['C', 'C', 'C', 'C']
D :  ['D', 'D']
E :  ['E', 'E', 'E']


In [50]:
# Chapter06-03
# 코루틴: 단일(싱글) 스레드, 스택을 기반으로 동작하는 비동기 작업
# 쓰레드: OS 관리, CPU 코어에서 실시간, 시분할 비동기 작업 -> 멀티 쓰레드
# yield, send: 메인 <-> 서브
# 코루틴 제어, 상태, 양방향 전송

# 서브루틴: 메인루틴 호출 -> 서브루틴에서 수행(흐름제어)
# 코루틴: 루틴 실행 중 중지 -> 동시성 프로그래밍
# 코루틴: 쓰레드에 비해 오버헤드 감소
# 쓰레드: 싱글쓰레드 -> 멀티쓰레드 -> 복잡 -> 공유되는 자원 -> 교착 상태 발생 가능성, 컨택스트 스위칭 비용 발생, 자원 소비 가능성 증가
# def -> async, yield -> await

# 코루틴 Ex1
def coroutine1():
    print('>>> coroutine stated.')
    i = yield
    print('>>> corouitne received: {}'.format(i))
    
# 메인 루틴
# 제너레이터 선언
cr1 = coroutine1()
print(cr1, type(cr1))
# yield 지점까지 서브루틴 수행
#next(cr1)

# 기본 전달 값 None
# 값 전송
# cr1.send(100)

# 잘못된 사용
cr2 = coroutine1()
#cr2.send(100)

<generator object coroutine1 at 0x7f9f3f925350> <class 'generator'>


In [44]:
# 코루틴 Ex2
# GEN_CREATED: 처음 대기 상태
# GEN_RUNNING: 실행 상태
# GEN_SUSPENDED: yield 대기 상태
# GEN_CLOSED: 실행 완료 상태
def coroutine2(x):
    print('>>> coroutine stated: {}'.format(x))
    y = yield x
    print('>>> coroutine received: {}'.format(i))
    z = yield x + y
    print('>>> coroutine received: {}'.format(z))

cr3 = coroutine2(10)
from inspect import getgeneratorstate
print(getgeneratorstate(cr3))
print(next(cr3))
cr3.send(100)

GEN_CREATED
>>> coroutine stated: 10
10
>>> coroutine received: B PointB PointB Point


110

In [46]:
# 코루틴 Ex3
# StopIteration 자동 처리(3.5 -> await)
# 중첩 코루틴 처리
def generator1():
    for x in 'AB':
        yield x
    for y in range(1, 4):
        yield y
t1 = generator1()
print(next(t1))
print(next(t1))
print(next(t1))
print(next(t1))
print(next(t1))

t2 = generator1()
print(list(t2))

A
B
1
2
3
['A', 'B', 1, 2, 3]


In [49]:
def generator2():
    yield from 'AB'
    yield from range(1, 4)
t3 = generator2()
print(next(t3))
print(next(t3))
print(next(t3))
print(next(t3))
print(next(t3))

A
B
1
2
3


In [51]:
# Chapter06-04
# Futures 동시성
# 비동기 작업 실행
# 지연시간(Block) CPU 및 리소스 낭비 방지 -> (File)Network I/O 관련 작업 -> 동시성 활용 권장
# 비동기 작업과 적합한 프로그램일 경우 압도적으로 성능 향상

# futures: 비동기 실행을 위한 API를 고수준으로 작성 -> 사용하기 쉽도록 개선
# concurrent.Futures
# 1. 멀티스레딩/멀티프로세싱 API 통일 -> 매우 사용하기 쉬움
# 2. 실행중인 작업 취소, 완료 여부 체크, 타임아웃 옵션, 콜백추가, 동기화 코드 매우 쉽게 작성 -> Promise개념


# 2가지 패턴 실습
# concurrent.futures 사용법1
# concurrent.futures 사용법2

# GIL:  두 개 이상의 스레드가 동시에 실행될 때 하나의 자원을 엑세스 하는 경우 ->
#            문제점을 방지하기 위해 GIL 실행, 리소스 전체에 락이 걸린다. -> 
#                Context Switch(문맥 교환)

# GIL: 멀티프로세싱 사용, CPython

In [53]:
import os
import time
from concurrent import futures

WORK_LIST = [10**5, 10**6, 10**7, 10**8]

# 동시성 합계 계산 메인 함수
# 누적 합계 함수(제너레이터)
def sum_generator(n):
    return sum(n for n in range(1, n+1))

def main():
    # Worker Count
    worker = min(10, len(WORK_LIST))

    # 시작 시간
    start_tm = time.time()
    
    # 결과 건수
    # ProcessPoolExecutor
    with futures.ThreadPoolExecutor() as excutor:
        result = excutor.map(sum_generator, WORK_LIST)
    
    # 종료 시간
    end_tm = time.time() - start_tm
    # 출력 포맷
    msg = '\n Result -> {} Time: {:.2f}s'
    # 최종 결과 출력
    print(msg.format(list(result), end_tm))
    
# 실행
if __name__ == "__main__":
    main()


 Result -> [5000050000, 500000500000, 50000005000000, 5000000050000000] Time: 8.36s


In [2]:
# Chapter06-05
# Futures 동시성
# 비동기 작업 실행
# 지연시간(Block) CPU 및 리소스 낭비 방지 -> (File)Network I/O 관련 작업 -> 동시성 활용 권장
# 비동기 작업과 적합한 프로그램일 경우 압도적으로 성능 향상

# futures: 비동기 실행을 위한 API를 고수준으로 작성 -> 사용하기 쉽도록 개선
# concurrent.Futures
# 1. 멀티스레딩/멀티프로세싱 API 통일 -> 매우 사용하기 쉬움
# 2. 실행중인 작업 취소, 완료 여부 체크, 타임아웃 옵션, 콜백추가, 동기화 코드 매우 쉽게 작성 -> Promise개념


# 2가지 패턴 실습
# concurrent.futures map
# concurrent.futures wait, as_completed

# GIL:  두 개 이상의 스레드가 동시에 실행될 때 하나의 자원을 엑세스 하는 경우 ->
#            문제점을 방지하기 위해 GIL 실행, 리소스 전체에 락이 걸린다. -> 
#                Context Switch(문맥 교환)

# GIL: 멀티프로세싱 사용, CPython

In [8]:
import os
import time
from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor, wait, as_completed

WORK_LIST = [10**4, 10**5, 10**6, 10**7]

# 동시성 합계 계산 메인 함수
# 누적 합계 함수(제너레이터)
def sum_generator(n):
    return sum(n for n in range(1, n+1))

def main():
    # Worker Count
    worker = min(10, len(WORK_LIST))

    # 시작 시간
    start_tm = time.time()
    # Futures
    futures_list = []
    
    # 결과 건수
    # ProcessPoolExecutor
    with ThreadPoolExecutor() as excutor:
        for work in WORK_LIST:
            # future 반환
            future = excutor.submit(sum_generator, work)
            # 스케쥴링
            futures_list.append(future)
            # 스케쥴링 확인
            print('Scheduled for {}: {}'.format(work, future))
        '''
        # wait 결과 출력
        result = wait(futures_list, timeout=7)
        # 성공
        print('Completed Tasks: ' + str(result.done))
        # 실패
        print('Pending ones after waiting for 7seconds: ' + str(result.not_done))
        # 결과 값 출력
        print([future.result() for future in result.done])
        '''
        # as_completed 결과 출력
        for future in as_completed(futures_list):
            result = future.result()
            done = future.done()
            cancelled = future.cancelled
            
            # future 결과 확인
            print('Future Result: {}, Done: {}'.format(result, done))
            print('Future Cancelled: {}'.format(cancelled))
            
    
        
    
    # 종료 시간
    end_tm = time.time() - start_tm
    # 출력 포맷
    msg = '\n Time: {:.2f}s'
    # 최종 결과 출력
    print(msg.format(end_tm))
    
# 실행
if __name__ == "__main__":
    main()

Scheduled for 10000: <Future at 0x7ff924c89b50 state=finished returned int>
Scheduled for 100000: <Future at 0x7ff924c89d90 state=finished returned int>
Scheduled for 1000000: <Future at 0x7ff9254149d0 state=running>
Scheduled for 10000000: <Future at 0x7ff922f25a90 state=running>
Completed Tasks: {<Future at 0x7ff924c89d90 state=finished returned int>, <Future at 0x7ff922f25a90 state=finished returned int>, <Future at 0x7ff924c89b50 state=finished returned int>, <Future at 0x7ff9254149d0 state=finished returned int>}
Pending ones after waiting for 7seconds: set()
[5000050000, 50000005000000, 50005000, 500000500000]

 Time: 0.49s
