In [1]:
# Chapter06-01
# 병행성(Concurrency)
# 이터레이터, 제네레이터
# Iterator, Generator

# 파이썬 반복 가능한 타입 (iterable: __iter__ 존재)
# for, collections, text file, List, Dict, Set, Tuple, unpacking, *args

# Generator: a simple way of creating iterators.
#            a function that returns an object(iterator) which can iterate over (one value at a time)


In [28]:
# 반복 가능한 이유? -> iter(x) 함수 호출

t = 'ABCDEFGHIJKLMNOPQRSTUVWXYZ'

for c in t:
    print(c, end=" ")  # A B C D E F G H I J K L M N O P Q R S T U V W X Y Z

print()

w = iter(t)
print(type(w))  # <class 'str_iterator'> --> __iter__, __next__ 존재

while True:
    try:
        print(next(w), end=" ") # A B C D E F G H I J K L M N O P Q R S T U V W X Y Z 
    except StopIteration:
        print()
        break


from collections import abc

# iterable 확인
print(hasattr(t, '__iter__'))       # True
print(isinstance(t, abc.Iterable))  # True

# iterator 확인
print(hasattr(w, '__iter__'))       # True
print(isinstance(w, abc.Iterable))  # True
print(hasattr(w, '__next__'))       # True
print(isinstance(w, abc.Iterator))  # True

# generator 확인
gen = (i for i in range(1, 6))      
print(gen)                            # <generator object <genexpr> at 0x000002675B8EFF90>
print(all(hasattr(gen, attribute)     # True
    for attribute in ['gi_code', 'gi_frame', 'gi_running', 'gi_yieldfrom', 'send', 'throw']))
print(isinstance(gen, abc.Iterable))  # True
print(isinstance(gen, abc.Iterator))  # True
print(isinstance(gen, abc.Generator)) # True

# next pattern
class WordSplitter:
    
    def __init__(self, text: str) -> None:
        self._text = text.split(' ')
        self._index = 0

    def __next__(self):
        print('Called __next__')
        try: 
            word = self._text[self._index]
            self._index += 1
        except IndexError:
            raise StopIteration('Stopped Iteration.')
        return word

    def __repr__(self) -> str:
        return 'WordSplit(%s)' % (self._text)

wi = WordSplitter('Do today what you could do tommorrow')

try:
    while True:
        print(next(wi))
except StopIteration as e:
    print(e)
    print('Iteration finished.')

"""
Called __next__
Do
Called __next__
today
Called __next__
what
Called __next__
you
Called __next__
could
Called __next__
do
Called __next__
tommorrow
Called __next__
Stopped Iteration.
Iteration finished.
"""


A B C D E F G H I J K L M N O P Q R S T U V W X Y Z 
<class 'str_iterator'>
A B C D E F G H I J K L M N O P Q R S T U V W X Y Z 
True
True
True
True
True
True
<generator object <genexpr> at 0x000002675B9E47B0>
True
True
True
True
Called __next__
Do
Called __next__
today
Called __next__
what
Called __next__
you
Called __next__
could
Called __next__
do
Called __next__
tommorrow
Called __next__
Stopped Iteration.
Iteration finished.


'\nCalled __next__\nDo\nCalled __next__\ntoday\nCalled __next__\nwhat\nCalled __next__\nyou\nCalled __next__\ncould\nCalled __next__\ndo\nCalled __next__\ntommorrow\nCalled __next__\nStopped Iteration.\nIteration finished.\n'

In [41]:
# Generator 패턴
# 1. list comprehension, 딕셔너리, 집합 -> 데이터 양 증가 후 메모리 사용량 증가 -> 제네레이터 사용 권장
# 2. 단위 실행 가능한 코루틴(Coroutine) 구현과 연동
# 3. 작은 메모리 조각 사용

class WordSplitGenerator:

    def __init__(self, text: str):
        self._text = text.split(' ')
    
    def __iter__(self):
        # print('Called __iter__')
        for word in self._text:
           yield word # 제네레이터

    
    def __repr__(self):
        return 'WordSplitGenerator(%s)' % (self._text)

wg = WordSplitGenerator('Do today what you could do tomorrow')


from collections import abc

# generator 확인
print(all(hasattr(wg, attribute)     # False
    for attribute in ['gi_code', 'gi_frame', 'gi_running', 'gi_yieldfrom', 'send', 'throw']))
print(isinstance(wg, abc.Iterable))  # True
print(isinstance(wg, abc.Iterator))  # False
print(isinstance(wg, abc.Generator)) # False

wt = iter(wg)
print(wt)                            # <generator object WordSplitGenerator.__iter__ at 0x000002675B9FF200>
print(all(hasattr(wt, attribute)     # True
    for attribute in ['gi_code', 'gi_frame', 'gi_running', 'gi_yieldfrom', 'send', 'throw']))
print(isinstance(wt, abc.Iterable))  # True
print(isinstance(wt, abc.Iterator))  # True
print(isinstance(wt, abc.Generator)) # True

for word in wt:
    print(word, end=" ")             # Do today what you could do tomorrow
    
    

False
True
False
False
<generator object WordSplitGenerator.__iter__ at 0x000002675B9FECF0>
True
True
True
True
Do today what you could do tomorrow 

In [45]:
# Chapter06-02
# 병행성(Concurrency): 한 컴퓨터가 여러 일을 동시에 수행
# 병렬성(Parallelism): 여러 컴퓨터가 여러 일을 동시에 수행


In [46]:
# Generator Ex1

def generator_ex1():
    print('Start')
    yield 'A Point'
    print('Continue')
    yield 'B Print'
    print('End')

temp = iter(generator_ex1())  
print(temp)        # <generator object generator_ex1 at 0x000002675BA13D60>
print(next(temp))  # Start A Point
print(next(temp))  # Continue B Print
try: 
    print(next(temp))  # End StopIteration 
except StopIteration:
    pass

for v in generator_ex1():
    print(v)  

<generator object generator_ex1 at 0x000002675BA20120>
Start
A Point
Continue
B Print
End
Start
A Point
Continue
B Print
End


In [50]:
# Generator Ex2

def generator_ex2():
    print('Start')
    yield '[A Point]'
    print('Continue')
    yield '[B Print]'
    print('End')

temp2 = [x * 3 for x in generator_ex2()]
temp3 = (x * 3 for x in generator_ex2())

print(temp2)  # ['[A Point][A Point][A Point]', '[B Print][B Print][B Print]']
print(temp3)  # <generator object <genexpr> at 0x000002675BA20F20>

for i in temp3:
    print(i)

"""
Start
[A Point][A Point][A Point]
Continue
[B Print][B Print][B Print]
End
"""



Start
Continue
End
['[A Point][A Point][A Point]', '[B Print][B Print][B Print]']
<generator object <genexpr> at 0x000002675BA1FC80>
Start
[A Point][A Point][A Point]
Continue
[B Print][B Print][B Print]
End


'\nStart\n[A Point][A Point][A Point]\nContinue\n[B Print][B Print][B Print]\nEnd\n'

In [68]:
# Generator Ex3

from itertools import count, takewhile, filterfalse, accumulate, chain, product, groupby

# count: Return a count object whose .__next__() method returns consecutive values.
gen1 = count(1, 2.5)  

# takewhile: Return successive entries from an iterable as long as the predicate evaluates to true for each entry.
gen2 = takewhile(lambda n: n < 10, count(1, 2.5))

# filterfalse: Return those items of iterable for which function(item) is false.
gen3 = filterfalse(lambda n: n < 3, [1, 2, 3, 4, 5])

# accumulate: Return series of accumulated sums (or other binary function results).
gen4 = accumulate([x for x in range(1, 101)])

# chain: Return a chain object whose .__next__() method returns elements 
#   from the first iterable until it is exhausted, 
#   then elements from the next iterable, 
#   until all of the iterables are exhausted.
gen5 = chain('ABCDE', range(1, 11, 2)) 
print(list(gen5))  # ['A', 'B', 'C', 'D', 'E', 1, 3, 5, 7, 9]

gen6 = chain(enumerate('ABCDE'))
print(list(gen6))  # [(0, 'A'), (1, 'B'), (2, 'C'), (3, 'D'), (4, 'E')]

# product: Cartesian product of input iterables. Equivalent to nested for-loops.
gen7 = product('ABC', 'YZ')  
print(list(gen7))  # [('A', 'Y'), ('A', 'Z'), ('B', 'Y'), ('B', 'Z'), ('C', 'Y'), ('C', 'Z')]

gen8 = product('AB', repeat=3)  
print(list(gen8))  # [('A', 'A', 'A'), ('A', 'A', 'B'), ('A', 'B', 'A'), ('A', 'B', 'B'), ('B', 'A', 'A'), ('B', 'A', 'B'), ('B', 'B', 'A'), ('B', 'B', 'B')]

# groupby: make an iterator that returns consecutive keys and groups from the iterable
gen9 = groupby('AAABBCCCCDDEEE')
for key, group in gen9:  # [('A', <itertools._grouper object at 0x000002675B9BA8E0>), ...]
    print(key, list(group))

"""
A ['A', 'A', 'A']
B ['B', 'B']
C ['C', 'C', 'C', 'C']
D ['D', 'D']
E ['E', 'E', 'E']
"""

gen10 = groupby('ABCAABCCBBB')
for key, group in gen10:
    print(key, list(group))

"""
A ['A']
B ['B']
C ['C']
A ['A', 'A']
B ['B']
C ['C', 'C']
B ['B', 'B', 'B']
"""



['A', 'B', 'C', 'D', 'E', 1, 3, 5, 7, 9]
[(0, 'A'), (1, 'B'), (2, 'C'), (3, 'D'), (4, 'E')]
[('A', 'Y'), ('A', 'Z'), ('B', 'Y'), ('B', 'Z'), ('C', 'Y'), ('C', 'Z')]
[('A', 'A', 'A'), ('A', 'A', 'B'), ('A', 'B', 'A'), ('A', 'B', 'B'), ('B', 'A', 'A'), ('B', 'A', 'B'), ('B', 'B', 'A'), ('B', 'B', 'B')]
A ['A', 'A', 'A']
B ['B', 'B']
C ['C', 'C', 'C', 'C']
D ['D', 'D']
E ['E', 'E', 'E']
A ['A']
B ['B']
C ['C']
A ['A', 'A']
B ['B']
C ['C', 'C']
B ['B', 'B', 'B']


In [69]:
# Chapter06-03
# 흐름제어, 병행성(Concurrency)
# 코루틴(Coroutine)

# yield, send : 메인 <-> 서브
# 코루틴 제어, 상태, 양방향 전송
# yield from

# 서브루틴 : 메인루틴에서 호출 -> 서브루틴에서 수행(흐름제어)
# 코루틴 : 루틴 실행 중 중지 ->  동시성 프로그래밍
# 코루틴 : 쓰레드에 비해 오버헤드 감소
# 쓰레드 : 싱글쓰레드 -> 멀티쓰레드 -> 복잡 -> 공유되는 자원 -> 교착 상태 발생 가능성, 
#          컨텍스트 스위칭 비용 발생, 자원 소비 가능성 증가


In [79]:
# Coroutine Ex1

def coroutine1():
    # inside subroutine
    print('>>> coroutine started.')
    i = yield
    print('>>> coroutine received: {}'.format(i))

# main routine
# declare coroutine(generator) object
cr1 = coroutine1()
print(type(cr1), cr1)  # <class 'generator'> <generator object coroutine1 at 0x000002675BA430B0>

next(cr1)       # >>> coroutine started.
try:
    next(cr1)   # >>> coroutine received: None -> StopIteration
except StopIteration:
    pass

cr2 = coroutine1()
next(cr2)           # >>> coroutine started.
try:
    cr2.send(100)   # >>> coroutine received: 100 -> StopIteration
except StopIteration:
    pass

# next() --> yield 지점까지 서브루틴 수행, 서브루틴에서 메인루틴으로 값 전송
# send() --> = yield 지점까지 서브루틴 수행, 메인루틴에서 서브루틴으로 값 전송

# 잘못된 사용
cr3 = coroutine1()
# cr3.send(100)  # TypeError: can't send non-None value to a just-started generator


<class 'generator'> <generator object coroutine1 at 0x000002675BA2F3C0>
>>> coroutine started.
>>> coroutine received: None
>>> coroutine started.
>>> coroutine received: 100


In [90]:
# Coroutine Ex2

"""
GEN_CREATED: 처음 대기 상태
GEN_RUNNING: 실행 상태
GEN_SUSPENDED: yield 대기 상태
GEN_CLOSED: 실행 완료 상태
"""

from inspect import getgeneratorstate


def coroutine2(x):
    print('>>> coroutine started : {}'.format(x))
    y = yield x
    print('>>> coroutine received : {}'.format(y))
    z = yield x + y
    print('>>> coroutine received : {}'.format(z))

cr1 = coroutine2(10)  # --> GEN_CREATED
print(getgeneratorstate(cr1))  # GEN_CREATED
print(next(cr1))      # --> GEN_RUNNING, return x = 10, GEN_SUSPENDED
print(getgeneratorstate(cr1))  # GEN_SUSPENDED
print(cr1.send(100))  # --> save y = 100, GEN_RUNNING, return x + y = 110, GEN_SUSPENDED
print(getgeneratorstate(cr1))  # GEN_SUSPENDED
try:
    print(cr1.send(1000))  # --> save z = 1000, GEN_RUNNING, GEN_CLOSED, raise StopIteration
except StopIteration:
    print(getgeneratorstate(cr1)) # GEN_CLOSED
    pass

# Python 3.5 이상에서는 async, await으로 def, yield를 대체해 사용할 수 있음.
#                      StopIteration도 await을 사용할 시 자동으로 처리함.


GEN_CREATED
>>> coroutine started : 10
10
GEN_SUSPENDED
>>> coroutine received : 100
110
GEN_SUSPENDED
>>> coroutine received : 1000
GEN_CLOSED


In [95]:
# Coroutine Ex3
# 중첩 코루틴 처리

def coroutine3():
    for x in 'AB':
        yield x
    for y in range(1, 4):
        yield y

t1 = coroutine3()
print(next(t1))  # A
print(next(t1))  # B
print(next(t1))  # 1
print(next(t1))  # 2
print(next(t1))  # 3
# print(next(t1))  # --> StopIteration

t2 = coroutine3()
print(list(t2))  # ['A', 'B', 1, 2, 3]


def coroutine4():
    yield from 'AB'
    yield from range(1, 4)

t3 = coroutine3()
print(next(t3))  # A
print(next(t3))  # B
print(next(t3))  # 1
print(next(t3))  # 2
print(next(t3))  # 3
# print(next(t3))  # --> StopIteration


A
B
1
2
3
['A', 'B', 1, 2, 3]
A
B
1
2
3


In [96]:
# Chapter06-04, 05
# Futures 동시성
# 비동기 작업 실행
# 지연시간(Block) CPU 및 리소스 낭비 방지 -> (File)Network I/O 관련 작업 -> 동시성 활용 권장
# 비동기 작업과 적합한 프로그램일 경우 압도적으로 성능 향상

# futures : 비동기 실행을 위한 API를 고수준으로 작성 -> 사용하기 쉽도록 개선
# concurrent.Futures
# 1. 멀티스레딩/멀티프로세싱 API 통일 -> 매우 사용하기 쉬움
# 2. 실행중인 작업 취소, 완료 여부 체크, 타임아웃 옵션, 콜백추가, 동기화 코드 매우 쉽게 작성 -> Promise 개념

# 2가지 패턴 실습
# concurrent.futures map
# concurrent.futures wait, as_completed

# GIL : 두 개 이상의 스레드가 동시에 실행 될 때 하나의 자원을 엑세스 하는 경우 -> 문제점을 방지하기 위해
#       GIL 실행 , 리소스 전체에 락이 걸린다. -> Context Switch(문맥 교환)

# GIL : 멀티프로세싱 사용, CPython


In [2]:

import time
from concurrent import futures
from chap06_defs import sum_from_one


WORK_LIST = [100000, 1000000, 10000000, 100000000]

# def sum_from_one(n):
#     print(f'>>> sum from 1 to {n}\n')
#     res = sum(n for n in range(1, n + 1))
#     print(f'>>> sum from 1 to {n} finished.\n')
#     return res

def main_with_thread_pool():
    worker_num = min(10, len(WORK_LIST))
    start_tm = time.time()

    with futures.ThreadPoolExecutor(worker_num) as executor:
        # map: Returns an iterator equivalent to map(fn, iter).
        #     작업 순서 유지, 즉시 실행
        result = executor.map(sum_from_one, WORK_LIST) 

    end_tm = time.time()
    elapsed_tm = end_tm - start_tm
    msg = '\n Thread Pool Result -> {}, Time: {:.2f}s'
    print(msg.format(list(result), elapsed_tm))


def main_with_process_pool():
    worker_num = min(10, len(WORK_LIST))
    start_tm = time.time()

    with futures.ProcessPoolExecutor(worker_num) as executor:
        # map: Returns an iterator equivalent to map(fn, iter).
        #     작업 순서 유지, 즉시 실행
        result = executor.map(sum_from_one, WORK_LIST) 


    end_tm = time.time()
    elapsed_tm = end_tm - start_tm
    msg = '\n Process Pool Result -> {}, Time: {:.2f}s'
    print(msg.format(list(result), elapsed_tm))


def main_with_for_loop():
    start_tm = time.time()
    result = list(map(sum_from_one, WORK_LIST))
    end_tm = time.time()
    elapsed_tm = end_tm - start_tm
    msg = '\n For Loop Result -> {}, Time: {:.2f}s'
    print(msg.format(list(result), elapsed_tm))


main_with_thread_pool()  # Thread Pool Result -> [5000050000, 500000500000, 50000005000000, 5000000050000000], Time: 6.48s
main_with_process_pool() # Process Pool Result -> [5000050000, 500000500000, 50000005000000, 5000000050000000], Time: 5.76s
main_with_for_loop()     # For Loop Result -> [5000050000, 500000500000, 50000005000000, 5000000050000000], Time: 6.40s



>>> sum from 1 to 100000

>>> sum from 1 to 100000 finished.

>>> sum from 1 to 1000000

>>> sum from 1 to 10000000
>>> sum from 1 to 100000000


>>> sum from 1 to 1000000 finished.

>>> sum from 1 to 10000000 finished.

>>> sum from 1 to 100000000 finished.


 Thread Pool Result -> [5000050000, 500000500000, 50000005000000, 5000000050000000], Time: 6.48s

 Process Pool Result -> [5000050000, 500000500000, 50000005000000, 5000000050000000], Time: 5.76s
>>> sum from 1 to 100000

>>> sum from 1 to 100000 finished.

>>> sum from 1 to 1000000

>>> sum from 1 to 1000000 finished.

>>> sum from 1 to 10000000

>>> sum from 1 to 10000000 finished.

>>> sum from 1 to 100000000

>>> sum from 1 to 100000000 finished.


 For Loop Result -> [5000050000, 500000500000, 50000005000000, 5000000050000000], Time: 6.40s


In [19]:
# using wait

import time
from concurrent import futures
from concurrent.futures import wait


WORK_LIST = [100000, 1000000, 10000000, 100000000]

def sum_from_one(n):
    print(f'>>> sum from 1 to {n}\n')
    res = sum(n for n in range(1, n + 1))
    print(f'>>> sum from 1 to {n} finished.\n')
    return res

def main_with_thread_pool(timeout: float):
    worker_num = min(10, len(WORK_LIST))
    start_tm = time.time()
    futures_list = []

    with futures.ThreadPoolExecutor(worker_num) as executor:
        for work in WORK_LIST:
            """
            Excutor.submit(fn, *args, **kwargs):

            Submits a callable to be executed with the given arguments.
            Schedules the callable to be executed as fn(*args, **kwargs) and returns a Future instance 
                representing the execution of the callable.

            Returns:
                A Future representing the given call.
            """
            future = executor.submit(sum_from_one, work)
            futures_list.append(future)
            print('Scheduled for {}: {}\n'.format(work, future))

        # wait: Wait for the futures in the given sequence to complete.
        result = wait(futures_list, timeout=timeout)
        print('Completed Tasks: {}'.format(result.done))
        print('Not Completed Tasks: {}'.format(result.not_done))

    end_tm = time.time()
    elapsed_tm = end_tm - start_tm
    msg = '\n Thread Pool Result -> {}, Time: {:.2f}s'
    print(msg.format([future.result() for future in result.done], elapsed_tm))



main_with_thread_pool(1)  # Thread Pool Result -> [500000500000, 5000050000], Time: 6.28s
main_with_thread_pool(2)  # Thread Pool Result -> [50000005000000, 5000050000, 500000500000], Time: 6.39s
main_with_thread_pool(10) # Thread Pool Result -> [5000050000, 5000000050000000, 50000005000000, 500000500000], Time: 6.50s

>>> sum from 1 to 100000

Scheduled for 100000: <Future at 0x2bf2ae6fc70 state=running>
>>> sum from 1 to 100000 finished.


>>> sum from 1 to 1000000

>>> sum from 1 to 1000000 finished.
Scheduled for 1000000: <Future at 0x2bf2aea8520 state=running>

Scheduled for 10000000: <Future at 0x2bf2aeeb790 state=pending>

>>> sum from 1 to 10000000


>>> sum from 1 to 100000000

Scheduled for 100000000: <Future at 0x2bf2aeeb2e0 state=running>

Completed Tasks: {<Future at 0x2bf2aea8520 state=finished returned int>, <Future at 0x2bf2ae6fc70 state=finished returned int>}
Not Completed Tasks: {<Future at 0x2bf2aeeb790 state=running>, <Future at 0x2bf2aeeb2e0 state=running>}
>>> sum from 1 to 10000000 finished.

>>> sum from 1 to 100000000 finished.


 Thread Pool Result -> [500000500000, 5000050000], Time: 6.40s


In [23]:
# using as_completed

import time
from concurrent import futures
from concurrent.futures import as_completed


WORK_LIST = [100000, 1000000, 10000000, 100000000]

def sum_from_one(n):
    return sum(n for n in range(1, n + 1))

def main_with_thread_pool():
    worker_num = min(10, len(WORK_LIST))
    start_tm = time.time()
    futures_list = []

    with futures.ThreadPoolExecutor(worker_num) as executor:
        for work in WORK_LIST:
            """
            Excutor.submit(fn, *args, **kwargs):

            Submits a callable to be executed with the given arguments.
            Schedules the callable to be executed as fn(*args, **kwargs) and returns a Future instance 
                representing the execution of the callable.

            Returns:
                A Future representing the given call.
            """
            future = executor.submit(sum_from_one, work)
            futures_list.append(future)
            print('Scheduled for {}: {}\n'.format(work, future))

        # as_completed: An iterator over the given futures that yields each as it completes.
        for future in as_completed(futures_list):
            result = future.result()
            done = future.done()
            cancelled = future.cancelled()
            print('Future Result: {}, Done: {}, Cancelled: {}'.format(result, done, cancelled))

    end_tm = time.time()
    elapsed_tm = end_tm - start_tm
    msg = '\n Thread Pool Time: {:.2f}s'
    print(msg.format(elapsed_tm))



main_with_thread_pool()
"""
Scheduled for 100000: <Future at 0x2bf2aeae310 state=finished returned int>

Scheduled for 1000000: <Future at 0x2bf2aea8580 state=pending>

Scheduled for 10000000: <Future at 0x2bf2ae76f10 state=running>

Scheduled for 100000000: <Future at 0x2bf2a11cc40 state=pending>

Future Result: 5000050000, Done: True, Cancelled: False
Future Result: 500000500000, Done: True, Cancelled: False
Future Result: 50000005000000, Done: True, Cancelled: False
Future Result: 5000000050000000, Done: True, Cancelled: False

 Thread Pool Time: 6.40s
"""

Scheduled for 100000: <Future at 0x2bf2aeae310 state=finished returned int>

Scheduled for 1000000: <Future at 0x2bf2aea8580 state=pending>

Scheduled for 10000000: <Future at 0x2bf2ae76f10 state=running>

Scheduled for 100000000: <Future at 0x2bf2a11cc40 state=pending>

Future Result: 5000050000, Done: True, Cancelled: False
Future Result: 500000500000, Done: True, Cancelled: False
Future Result: 50000005000000, Done: True, Cancelled: False
Future Result: 5000000050000000, Done: True, Cancelled: False

 Thread Pool Time: 6.40s
