 # Generators, Iterators, and Asynchronous Programming
 - generator는 다른 programming 언어들과 달리 python이 갖는 특별한 특징임
 - 이 챕터에서는 generator의 근원과 왜 필요한지와 어떻게 generator를 사용해서 문제를 해결하는지를 다루려고 함
 - 또한 iteration(iterator pattern)가 동작하는 방식을 이해함으로써, generator가 coroutine이나 asynchronous programming의 근본이 되는 과정을 이해하는 것을 목표로 하고 있음

## Creating generators
- geneator는 `iterable` 객체를 만드는게 기본 아이디어임  
$\Rightarrow$  한 번에 하나씩 반복적으로 element를 생성
- generator는 사용하는 큰 이유는 large list를 사용하는것 대비 메모리를 절약할 수 있기 때문  
(sequence에서 개별 element를 생성하는 방법을 알고 필요할때 마다 하나씩 반환한다고 생각하면 됨)
- 이를 lazy computation이라고도 부르며, infinite sequence에 대한 연산도 물론 가능함

## A first look at generators
- 다음과 같은 예제를 생각해보자
- 아래와 같이 구매 이력에 관한 아주 큰 csv 파일이 있고 우리는 `lowest sale`, `highest sale`, `average price of sale` 이렇게 세 개의 값을 구하고 싶다고 하자
```
<purchase_date>, <price>
...
```

In [None]:
import os
from tempfile import gettempdir
import logging

In [None]:
logger =logging.basicConfig(level=logging.INFO, format="%(message)s")

In [None]:
# sample data 만드는 함수
# - 날짜는 2018-01-01로 고정이고, 가격은 0~ 999,999 까지
def create_purchases_file(filename, entries=1_000_000):
    if os.path.exists(PURCHASES_FILE):
        return

    with open(filename, "w+") as f:
        for i in range(entries):
            line = f"2018-01-01,{i}\n"
            f.write(line)

In [None]:
# 위에서 만든 sample data를 읽어서 purchase value를 list로 반환하는 함수
# 다음의 performance issue가 있을 수 있음
#   1. large dataset에 대해서는 시간이 좀 걸릴 수 있음
#   2. 너무 커서 memory에 올릴 수 없으면 fail이 날수도
def _load_purchases(filename):
    purchases = []
    with open(filename) as f:
        for line in f:
            *_, price_raw = line.partition(",")
            purchases.append(float(price_raw))

    return purchases


# 위에서 만든 sample data를 읽어서 purchase value를 generator로 반환하는 함수
#   => 한번에 모든 파일을 읽는 대신 iteration 한 번에 하나의 결과만을 생성하는 방식
# 다음의 장점이 있다고 함
#   1. code가 간단해짐 (list define, return 구문이 필요 없음)
#   2. 비교해보면 memory 사용량도 훨씬 적어짐
# CPU time은 늘어날 수 있음
def load_purchases(filename):
    with open(filename) as f:
        for line in f:
            *_, price_raw = line.partition(",")
            yield float(price_raw)

In [None]:
 PURCHASES_FILE = os.path.join(gettempdir(), "purchases.csv")

In [None]:
create_purchases_file(PURCHASES_FILE)

In [None]:
purchases_list = _load_purchases(PURCHASES_FILE)

In [None]:
print(f'num data points: {len(purchases_list)}' )
print(f'first 5 points: {purchases_list[:5]}' )
print(f'last 5 points: {purchases_list[-5:]}' )

num data points: 1000000
first 5 points: [0.0, 1.0, 2.0, 3.0, 4.0]
last 5 points: [999995.0, 999996.0, 999997.0, 999998.0, 999999.0]


- `generator` 객체는 `iterable` 임을 기억 => `for loops`에 넘겨줄 수 있음

In [None]:
purchases_generator = load_purchases(PURCHASES_FILE)

In [None]:
for val in purchases_generator:
  print(val)

  if val == 10:
    break

0.0
1.0
2.0
3.0
4.0
5.0
6.0
7.0
8.0
9.0
10.0


In [None]:
purchases_generator = load_purchases(PURCHASES_FILE)

In [None]:
purchases_generator

<generator object load_purchases at 0x7f0d623ab1d0>

In [None]:
class PurchasesStats:
    def __init__(self, purchases):
        self.purchases = iter(purchases)
        self.min_price: float = None
        self.max_price: float = None
        self._total_purchases_price: float = 0.0
        self._total_purchases = 0
        self._initialize()

    def _initialize(self):
        try:
            first_value = next(self.purchases)
        except StopIteration:
            raise ValueError("no values provided")

        self.min_price = self.max_price = first_value
        self._update_avg(first_value)

    def process(self):
        for purchase_value in self.purchases:
            self._update_min(purchase_value)
            self._update_max(purchase_value)
            self._update_avg(purchase_value)
        return self

    def _update_min(self, new_value: float):
        if new_value < self.min_price:
            self.min_price = new_value

    def _update_max(self, new_value: float):
        if new_value > self.max_price:
            self.max_price = new_value

    @property
    def avg_price(self):
        return self._total_purchases_price / self._total_purchases

    def _update_avg(self, new_value: float):
        self._total_purchases_price += new_value
        self._total_purchases += 1

    def __str__(self):
        return (
            f"{self.__class__.__name__}({self.min_price}, "
            f"{self.max_price}, {self.avg_price})"
        )

In [None]:
stats = PurchasesStats(purchases_list).process()

In [None]:
print(f'min price: {stats.min_price}')
print(f'max price: {stats.max_price}')
print(f'total price: {stats._total_purchases_price}')
print(f'num data points: {stats._total_purchases}')

min price: 0.0
max price: 999999.0
total price: 499999500000.0
num data points: 1000000


In [None]:
# min, max, avg
print(str(stats))

PurchasesStats(0.0, 999999.0, 499999.5)


In [None]:
stats = PurchasesStats(purchases_generator).process()

In [None]:
print(f'min price: {stats.min_price}')
print(f'max price: {stats.max_price}')
print(f'total price: {stats._total_purchases_price}')
print(f'num data points: {stats._total_purchases}')

min price: 0.0
max price: 999999.0
total price: 499999500000.0
num data points: 1000000


In [None]:
# min, max, avg
print(str(stats))

PurchasesStats(0.0, 999999.0, 499999.5)


In [None]:
next(purchases_generator)

StopIteration: ignored

## Generator expressions
- list, tuple, set 등 다른 자료구조와 마찬가지로 comprehension이 가능 (generator comprehension)
- list comprehension에서 `sqaure bracket`을 `parentheses`로 변경
- sum, max 등의 함수에 바로 generator를 넘길수도 있음

In [None]:
[x**2 for x in range(10)]

[0, 1, 4, 9, 16, 25, 36, 49, 64, 81]

In [None]:
(x**2 for x in range(10))

<generator object <genexpr> at 0x7f0d5d1c90d0>

In [None]:
sum(x**2 for x in range(10))

285

In [None]:
(({x**2 for x in range(10)}))

{0, 1, 4, 9, 16, 25, 36, 49, 64, 81}

In [None]:
class SequenceIterator:

    def __init__(self, start=0, step=1):
        self.current = start
        self.step = step

    # object를 iterator로 만들어줌
    def __next__(self):
        value = self.current
        self.current += self.step
        return value

    # iteration을 support하기 위해 필요 (e.g. zip)
    def __iter__(self):
      return self

In [None]:
seq = SequenceIterator()

In [None]:
for i in range(10):
  print(next(seq))

0
1
2
3
4
5
6
7
8
9


In [None]:
list(zip(seq, 'abc'))

[(10, 'a'), (11, 'b'), (12, 'c')]

In [None]:
# generator는 iterable임!
def sequence(start=0):

    while True:
        yield start
        start += 1

In [None]:
list(zip(sequence(), 'abcde'))

[(0, 'a'), (1, 'b'), (2, 'c'), (3, 'd'), (4, 'e')]

In [None]:
from itertools import islice, tee

In [None]:
# 1000 이상인 첫 2개 선택
list(islice(filter(lambda p: p > 1000., [0, 1000, 500, 2000, 1500, 1200]), 2))

[2000, 1500]

In [None]:
# 1000 이상인 첫 3개 선택
list(islice(filter(lambda p: p > 1000., [0, 1000, 500, 2000, 1500, 1200]), 3))

[2000, 1500, 1200]

In [None]:
def sequence(start=0, end=20):

    while True:
        yield start
        start += 1

        if start == end-1:
          break

In [None]:
# 독립적인 객체 n개 만들어 줌
# 재사용 불가
t1, t2, t3 = tee(sequence(), 3)

In [None]:
print(list(t1))
print(list(t1))

[0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18]
[]


In [None]:
print(list(t2))
print(list(t2))

[0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18]
[]


In [None]:
print(list(t3))
print(list(t3))

[0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18]
[]


## Sequence objects as iterables
- `__getitem__`, `__len__` magic을 갖고 있으면 iterable임 (iterator는 아님)
- iterable: `__iter__` (for ... in ...)
- iterator: `__next__` (next())

In [None]:
import logging

In [None]:
logger = logging.getLogger()

In [None]:
class SequenceWrapper:
    def __init__(self, original_sequence):
        self.seq = original_sequence

    def __getitem__(self, item):
        value = self.seq[item]
        logger.debug("%s getting %s", self.__class__.__name__, item)
        return value

    def __len__(self):
        return len(self.seq)


class MappedRange:
    """Apply a transformation to a range of numbers."""

    def __init__(self, transformation, start, end):
        self._transformation = transformation
        self._wrapped = range(start, end)

    def __getitem__(self, index):
        value = self._wrapped.__getitem__(index)
        result = self._transformation(value)
        logger.debug("Index %d: %s", index, result)
        return result

    def __len__(self):
        return len(self._wrapped)


In [None]:
seq = SequenceWrapper([1,2,3,4,5])

In [None]:
for i in seq:
  print(i)

1
2
3
4
5


In [None]:
next(seq)

TypeError: ignored

In [None]:
mr = MappedRange(abs, -10, 5)

In [None]:
mr[0]

10

In [None]:
mr[-1]

4

In [None]:
list(mr)

[10, 9, 8, 7, 6, 5, 4, 3, 2, 1, 0, 1, 2, 3, 4]

In [None]:
len(mr)

15

## Coroutines
- 실행을 중간에 멈췄다가 나중에 재개할 수 있음
- `.close()`, `.throw(ex_type[, ex_value[, ex_traceback]])`, `.send(value)`이 coroutine의 basic method들임

In [None]:
import time
import logging

logging.basicConfig(level=logging.INFO, format="%(message)s")
logger = logging.getLogger()

In [None]:
# close example
class DBHandler:
    """Simulate reading from the database by pages."""

    def __init__(self, db):
        self.db = db
        self.is_closed = False

    def read_n_records(self, limit):
        return [(i, f"row {i}") for i in range(limit)]

    def close(self):
        logger.debug("closing connection to database %r", self.db)
        self.is_closed = True


def stream_db_records(db_handler):
    """Example of .close()

    >>> streamer = stream_db_records(DBHandler("testdb"))  # doctest: +ELLIPSIS
    >>> len(next(streamer))
    10

    >>> len(next(streamer))
    10
    """
    try:
        while True:
            yield db_handler.read_n_records(10)
            time.sleep(.1)
    except GeneratorExit:
        db_handler.close()

In [None]:
streamer = stream_db_records(DBHandler('testDB'))

In [None]:
next(streamer)

[(0, 'row 0'),
 (1, 'row 1'),
 (2, 'row 2'),
 (3, 'row 3'),
 (4, 'row 4'),
 (5, 'row 5'),
 (6, 'row 6'),
 (7, 'row 7'),
 (8, 'row 8'),
 (9, 'row 9')]

In [None]:
next(streamer)

[(0, 'row 0'),
 (1, 'row 1'),
 (2, 'row 2'),
 (3, 'row 3'),
 (4, 'row 4'),
 (5, 'row 5'),
 (6, 'row 6'),
 (7, 'row 7'),
 (8, 'row 8'),
 (9, 'row 9')]

In [None]:
streamer.close()

In [None]:
next(streamer)

StopIteration: ignored

In [None]:
# throw example
class CustomException(Exception):
    """An exception of the domain model."""


def stream_data(db_handler):
    """Test the ``.throw()`` method.

    >>> streamer = stream_data(DBHandler("testdb"))
    >>> len(next(streamer))
    10
    """
    while True:
        try:
            yield db_handler.read_n_records(10)
        # 특정 예외상황에서 로그를 남기고 계속 상태 유지
        except CustomException as e:
            logger.info("controlled error %r, continuing", e)
        except Exception as e:
            logger.info("unhandled error %r, stopping", e)
            db_handler.close()
            break
 d

In [None]:
streamer = stream_data(DBHandler('testDB'))

In [None]:
next(streamer)

[(0, 'row 0'),
 (1, 'row 1'),
 (2, 'row 2'),
 (3, 'row 3'),
 (4, 'row 4'),
 (5, 'row 5'),
 (6, 'row 6'),
 (7, 'row 7'),
 (8, 'row 8'),
 (9, 'row 9')]

In [None]:
streamer.throw(CustomException)

controlled error CustomException(), continuing


[(0, 'row 0'),
 (1, 'row 1'),
 (2, 'row 2'),
 (3, 'row 3'),
 (4, 'row 4'),
 (5, 'row 5'),
 (6, 'row 6'),
 (7, 'row 7'),
 (8, 'row 8'),
 (9, 'row 9')]

In [None]:
next(streamer)

[(0, 'row 0'),
 (1, 'row 1'),
 (2, 'row 2'),
 (3, 'row 3'),
 (4, 'row 4'),
 (5, 'row 5'),
 (6, 'row 6'),
 (7, 'row 7'),
 (8, 'row 8'),
 (9, 'row 9')]

In [None]:
streamer.throw(RuntimeError)

unhandled error RuntimeError(), stopping


StopIteration: ignored

In [None]:
next(streamer)

StopIteration: ignored

In [None]:
# send example
def _stream_db_records(db_handler):
    retrieved_data = None
    previous_page_size = 10
    try:
        while True:
            page_size = yield retrieved_data
            if page_size is None:
                page_size = previous_page_size

            previous_page_size = page_size

            retrieved_data = db_handler.read_n_records(page_size)
    except GeneratorExit:
        db_handler.close()


def stream_db_records(db_handler):
    retrieved_data = None
    page_size = 10
    try:
        while True:
            page_size = (yield retrieved_data) or page_size
            retrieved_data = db_handler.read_n_records(page_size)
    except GeneratorExit:
        db_handler.close()


def prepare_coroutine(coroutine):
    def wrapped(*args, **kwargs):
        advanced_coroutine = coroutine(*args, **kwargs)
        next(advanced_coroutine)
        return advanced_coroutine

    return wrapped


@prepare_coroutine
def auto_stream_db_records(db_handler):
    """This coroutine is automatically advanced so it doesn't need the first
    next() call.
    """
    retrieved_data = None
    page_size = 10
    try:
        while True:
            page_size = (yield retrieved_data) or page_size
            retrieved_data = db_handler.read_n_records(page_size)
    except GeneratorExit:
        db_handler.close()


In [None]:
def coro():
  y = yield

In [None]:
c = coro()

In [None]:
c.send(1)

TypeError: ignored

In [74]:
streamer = stream_db_records(DBHandler('testDB'))

In [75]:
next(streamer)

In [76]:
streamer.send(1)

[(0, 'row 0')]

In [77]:
next(streamer)

[(0, 'row 0')]

In [78]:
streamer.send(10)

[(0, 'row 0'),
 (1, 'row 1'),
 (2, 'row 2'),
 (3, 'row 3'),
 (4, 'row 4'),
 (5, 'row 5'),
 (6, 'row 6'),
 (7, 'row 7'),
 (8, 'row 8'),
 (9, 'row 9')]

In [79]:
next(streamer)

[(0, 'row 0'),
 (1, 'row 1'),
 (2, 'row 2'),
 (3, 'row 3'),
 (4, 'row 4'),
 (5, 'row 5'),
 (6, 'row 6'),
 (7, 'row 7'),
 (8, 'row 8'),
 (9, 'row 9')]

In [88]:
# generator가 값을 return할수 있음
def generator():
  yield 1
  yield 2
  return 3

In [89]:
val = generator()

In [90]:
next(val)

1

In [91]:
next(val)

2

In [92]:
try:
  next(val)
except StopIteration as e:
  print(e.value)

3


In [93]:
try:
  next(val)
except StopIteration as e:
  print(e.value)

None


In [94]:
# simplest use of yield from
def chain(*iterables):
  for it in iterables:
    for value in it:
      yield value

In [98]:
list(chain('hello', ['world'], ('tuple', 'of', 'values')))

['h', 'e', 'l', 'l', 'o', 'world', 'tuple', 'of', 'values']

In [99]:
def all_powers(n, pow):
  # yield from generator is possible!
  yield from (n**i for i in range(pow+1))

In [101]:
list(all_powers(3, 5))

[1, 3, 9, 27, 81, 243]

In [133]:
# capturing the value returned by a sub-generator
def sequence(name, start, end):
  logger.info('%s started at %i', name, start)
  yield from range(start, end)
  logger.info('%s finished at %i', name, end)
  return end

def main():
  step1 = yield from sequence('first', 0, 2)
  step2 = yield from sequence('second', step1, 5)
  print(step1, step2)
  return step1 + step2

In [134]:
g = main()

In [135]:
next(g)

first started at 0


0

In [136]:
next(g)

1

In [137]:
next(g)

first finished at 2
second started at 2


2

In [138]:
next(g)

3

In [139]:
next(g)

4

In [140]:
next(g)

second finished at 5


2 5


StopIteration: ignored