In [28]:
import re
import reprlib

RE_WORD = re.compile(r'\w+')


class Sentence:
    def __init__(self, text) -> None:
        self.text = text
        # self.words = RE_WORD.findall(text)  # 返回字符串列表，不够惰性

    # def __getitem__(self, index):
        # return self.words[index]

    # def __len__(self):
        # return len(self.words)

    def __repr__(self) -> str:
        return f'Sentence({reprlib.repr(self.text)})'

    def __iter__(self):
        # return SentenceIterator(self.words)  # 使用yield生成器函数代替SentenceIterator类
        # for word in self.words:
        #     yield word  # 不管有没有return语句，生成器函数都不抛出StopIteration异常，在全部值生成完毕后直接退出
        # for match in RE_WORD.finditer(self.text):  # findall的惰性版本
        #     yield match.group()  # 从MatchObject实例中提取匹配的文本
        return (match.group() for match in RE_WORD.finditer(self.text))  # 惰性生成器表达式

# class SentenceIterator:
#     def __init__(self, words) -> None:
#         self.words = words
#         self.index = 0

#     def __next__(self):
#         try:
#             word = self.words[self.index]
#         except IndexError:
#             raise StopIteration()
#         self.index += 1
#         return word

#     def __iter__(self):
#         return self

In [20]:
def gen_123():
    print('start')
    yield 1
    print('continue')
    yield 2
    print('end.')
    yield 3

In [30]:
class ArithmeticProgression:
    def __init__(self, begin, step, end=None) -> None:
        self.begin = begin
        self.step = step
        self.end = end

    def __iter__(self):
        result_type = type(self.begin + self.step)
        result = result_type(self.begin)
        forever = self.end is None
        index = 0
        while forever or result < self.end:
            yield result
            index += 1
            result = self.begin + self.step * index  # 降低处理浮点数时累计效应导致的风险


In [34]:
# 更简单的方法：直接使用生成器函数
def aritprog_gen(begin, step, end=None):
    result = type(begin + step)(begin)
    forever = end is None
    while forever or result < end:
        yield result
        index += 1
        result = begin + step * index

In [38]:
# 使用库中的生成器实现一个更简短的版本：不要重复发明轮子
import itertools

def aritprog_gen(begin, step, end=None):
    result = type(begin + step)(begin)
    # forever = end is None
    # while forever or result < end:
    #     yield result
    #     index += 1
    #     result = begin + step * index
    ap_gen = itertools.count(begin, step)
    if end is None:
        return ap_gen
    return itertools.takewhile(lambda n: n < end, ap_gen)
# gen = itertools.count(1, .5)
# # next(gen)
# gen = itertools.takewhile(lambda n: n < 3, itertools.count(1, .5))
# list(gen)

In [36]:
next(gen)

1.5

In [31]:
100*1.1

110.00000000000001

In [33]:
sum(1.1 for _ in range(100))

109.99999999999982

In [39]:
def vowel(c):
    return c.lower() in 'aeiou'

In [41]:
test_str = 'Aardvark'

list(filter(vowel, test_str))

['A', 'a', 'a']

In [43]:
import itertools
list(itertools.filterfalse(vowel, test_str))

['r', 'd', 'v', 'r', 'k']

In [44]:
list(itertools.dropwhile(vowel, test_str))

['r', 'd', 'v', 'a', 'r', 'k']

In [45]:
list(itertools.takewhile(vowel, test_str))

['A', 'a']

In [46]:
list(itertools.compress(test_str, (1, 0, 1, 1, 0, 1)))

['A', 'r', 'd', 'a']

In [47]:
list(itertools.islice(test_str, 4))

['A', 'a', 'r', 'd']

In [48]:
list(itertools.islice(test_str, 4, 7))

['v', 'a', 'r']

In [49]:
list(itertools.islice(test_str, 1, 7, 2))

['a', 'd', 'a']

In [50]:
sample = [5, 4, 2, 8, 7, 6, 3, 0, 9, 1]
list(itertools.accumulate(sample))

[5, 9, 11, 19, 26, 32, 35, 35, 44, 45]

In [51]:
list(itertools.accumulate(sample, min))

[5, 4, 2, 2, 2, 2, 2, 0, 0, 0]

In [52]:
list(itertools.accumulate(sample, max))

[5, 5, 5, 8, 8, 8, 8, 8, 9, 9]

In [53]:
import operator
list(itertools.accumulate(sample, operator.mul))

[5, 20, 40, 320, 2240, 13440, 40320, 0, 0, 0]

In [54]:
list(itertools.accumulate(range(1, 11), operator.mul))

[1, 2, 6, 24, 120, 720, 5040, 40320, 362880, 3628800]

In [55]:
list(enumerate('asdfas', 1))

[(1, 'a'), (2, 's'), (3, 'd'), (4, 'f'), (5, 'a'), (6, 's')]

In [56]:
list(map(operator.mul, range(11), range(11)))

[0, 1, 4, 9, 16, 25, 36, 49, 64, 81, 100]

In [57]:
list(map(operator.mul, range(11), [2, 4, 8]))

[0, 4, 16]

In [58]:
list(itertools.starmap(operator.mul, enumerate('albatrota', 1)))

['a',
 'll',
 'bbb',
 'aaaa',
 'ttttt',
 'rrrrrr',
 'ooooooo',
 'tttttttt',
 'aaaaaaaaa']

In [59]:
list(itertools.starmap(lambda a, b: b / a, enumerate(itertools.accumulate(sample), 1)))

[5.0,
 4.5,
 3.6666666666666665,
 4.75,
 5.2,
 5.333333333333333,
 5.0,
 4.375,
 4.888888888888889,
 4.5]

In [60]:
test_str = 'ABC'
list(itertools.chain('ABC', range(2)))

['A', 'B', 'C', 0, 1]

In [61]:
list(itertools.chain(enumerate(test_str)))

[(0, 'A'), (1, 'B'), (2, 'C')]

In [62]:
list(itertools.chain.from_iterable(enumerate(test_str)))

[0, 'A', 1, 'B', 2, 'C']

In [63]:
list(zip(test_str, range(5), [10, 20, 30, 40]))

[('A', 0, 10), ('B', 1, 20), ('C', 2, 30)]

In [64]:
list(itertools.zip_longest(test_str, range(5), [10, 20, 30, 40]))

[('A', 0, 10), ('B', 1, 20), ('C', 2, 30), (None, 3, 40), (None, 4, None)]

In [65]:
list(itertools.zip_longest(test_str, range(5), [10, 20, 30, 40], fillvalue='?'))


[('A', 0, 10), ('B', 1, 20), ('C', 2, 30), ('?', 3, 40), ('?', 4, '?')]

In [66]:
list(itertools.product(test_str, range(2)))

[('A', 0), ('A', 1), ('B', 0), ('B', 1), ('C', 0), ('C', 1)]

In [67]:
list(itertools.product('ABC'))

[('A',), ('B',), ('C',)]

In [68]:
list(itertools.product(test_str, repeat=2))

[('A', 'A'),
 ('A', 'B'),
 ('A', 'C'),
 ('B', 'A'),
 ('B', 'B'),
 ('B', 'C'),
 ('C', 'A'),
 ('C', 'B'),
 ('C', 'C')]

In [69]:
a = list(itertools.product(range(2), repeat=3))
len(a), a

(8,
 [(0, 0, 0),
  (0, 0, 1),
  (0, 1, 0),
  (0, 1, 1),
  (1, 0, 0),
  (1, 0, 1),
  (1, 1, 0),
  (1, 1, 1)])

In [71]:
rows = itertools.product('AB', range(2), repeat=2)
for i, row in enumerate(rows): print(i, row)

0 ('A', 0, 'A', 0)
1 ('A', 0, 'A', 1)
2 ('A', 0, 'B', 0)
3 ('A', 0, 'B', 1)
4 ('A', 1, 'A', 0)
5 ('A', 1, 'A', 1)
6 ('A', 1, 'B', 0)
7 ('A', 1, 'B', 1)
8 ('B', 0, 'A', 0)
9 ('B', 0, 'A', 1)
10 ('B', 0, 'B', 0)
11 ('B', 0, 'B', 1)
12 ('B', 1, 'A', 0)
13 ('B', 1, 'A', 1)
14 ('B', 1, 'B', 0)
15 ('B', 1, 'B', 1)


In [72]:
ct = itertools.count()
next(ct), next(ct), next(ct)

(0, 1, 2)

In [73]:
list(itertools.islice(itertools.count(1, .3), 3))

[1, 1.3, 1.6]

In [74]:
cy = itertools.cycle('ABC')
next(cy), next(cy)

('A', 'B')

In [75]:
list(itertools.islice(cy, 10))

['C', 'A', 'B', 'C', 'A', 'B', 'C', 'A', 'B', 'C']

In [76]:
list(itertools.islice(itertools.repeat(7), 10))

[7, 7, 7, 7, 7, 7, 7, 7, 7, 7]

In [77]:
list(itertools.islice(itertools.repeat(7, 5), 10))

[7, 7, 7, 7, 7]

In [78]:
list(map(operator.mul, range(11), itertools.repeat(5)))

[0, 5, 10, 15, 20, 25, 30, 35, 40, 45, 50]

In [79]:
list(itertools.combinations('ABC', 2))

[('A', 'B'), ('A', 'C'), ('B', 'C')]

In [80]:
list(itertools.combinations_with_replacement('ABC', 2))

[('A', 'A'), ('A', 'B'), ('A', 'C'), ('B', 'B'), ('B', 'C'), ('C', 'C')]

In [81]:
list(itertools.permutations('ABC', 2))

[('A', 'B'), ('A', 'C'), ('B', 'A'), ('B', 'C'), ('C', 'A'), ('C', 'B')]

In [82]:
list(itertools.product('ABC', repeat=2))

[('A', 'A'),
 ('A', 'B'),
 ('A', 'C'),
 ('B', 'A'),
 ('B', 'B'),
 ('B', 'C'),
 ('C', 'A'),
 ('C', 'B'),
 ('C', 'C')]

In [83]:
list(itertools.groupby('LLLLAAGGG'))

[('L', <itertools._grouper at 0x148b7af26a0>),
 ('A', <itertools._grouper at 0x148b7af2ca0>),
 ('G', <itertools._grouper at 0x148b7af2b20>)]

In [84]:
for char, group in itertools.groupby('LLLAAAGG'):
    print(char, ': ', list(group))

L :  ['L', 'L', 'L']
A :  ['A', 'A', 'A']
G :  ['G', 'G']


In [87]:
for length, group in itertools.groupby('LLLAAAGG', len):
    print(length, list(group))

1 ['L', 'L', 'L', 'A', 'A', 'A', 'G', 'G']


In [88]:
list(itertools.tee('ABC'))

[<itertools._tee at 0x148b83f5680>, <itertools._tee at 0x148b83f5fc0>]

In [93]:
for a, b in list(zip(*itertools.tee('ABC'))):
    print(a, b)

A A
B B
C C


In [92]:
g1, g2 = itertools.tee('ABC')
for i in g1:
    print(i)

A
B
C


In [100]:
any([0,0,])

False

In [101]:
g = (_ for _ in [0, 0.0, 7, 8])
any(g), next(g)

(True, 8)

In [102]:
reversed([1, 2, 3, 4])

<list_reverseiterator at 0x148b7765790>

In [107]:
def sub_gen():
    yield 1.1
    yield 1.2
    return 'Done!'

def gen():
    yield 1
    for i in sub_gen():
        yield i
    yield 2

In [112]:
for i in gen():
    print(i)

1
1.1
1.2
Done!
2


In [111]:
def gen():
    yield 1
    result = yield from sub_gen()
    print(result)
    yield 2


In [122]:
# def tree(cls):
#     yield cls.__name__, 0
#     yield from sub_tree(cls, 1)

def tree(cls, level=0):
    yield cls.__name__, level
    for sub_cls in cls.__subclasses__():
        yield from tree(sub_cls, level + 1)

def display(cls):
    for cls_name, level in tree(cls):
        indent = ' ' * 4 * level
        print(f'{indent}{cls_name}')

display(BaseException)

BaseException
    Exception
        TypeError
            FloatOperation
            MultipartConversionError
        StopAsyncIteration
        StopIteration
        ImportError
            ModuleNotFoundError
                PackageNotFoundError
                PackageNotFoundError
            ZipImportError
        OSError
            ConnectionError
                BrokenPipeError
                ConnectionAbortedError
                ConnectionRefusedError
                ConnectionResetError
                    RemoteDisconnected
            BlockingIOError
            ChildProcessError
            FileExistsError
            FileNotFoundError
            IsADirectoryError
            NotADirectoryError
            InterruptedError
                InterruptedSystemCall
            PermissionError
            ProcessLookupError
            TimeoutError
            UnsupportedOperation
            herror
            gaierror
            timeout
            Error
                Sam

In [26]:
from collections.abc import Generator

def averager() -> Generator[float, float, None]:
    print('pre')
    total = 0.0
    count = 0
    average = 0.0
    while True:
        print('start')
        term = yield average
        print('end')
        total += term
        count += 1
        average = total / count

In [27]:
coro_avg = averager()
# next(coro_avg)  # 开始协程
# coro_avg.send(None)  # 预激协程，等效上面的代码

In [28]:
coro_avg.send(None)  # 预激协程，等效上面的代码


pre
start


0.0

In [29]:
coro_avg.send(10)

end
start


10.0

In [18]:
coro_avg.send(20)

15.0

In [19]:
coro_avg.send(5)

11.666666666666666

In [31]:
coro_avg.close()

In [32]:
coro_avg.send(5)

StopIteration: 

In [33]:
from collections.abc import Generator
from typing import Union, NamedTuple
from typing import TypeAlias

class Result(NamedTuple):
    count: int  # type: ignore
    average: float

class Sentinel:
    def __repr__(self) -> str:
        return f'<Sentinel>'
STOP = Sentinel()
SendType: TypeAlias = float | Sentinel

In [34]:
def averager2(verbose: bool = False) -> Generator[None, SendType, Result]:
    total = 0.0
    count = 0
    average = 0.0
    while True:
        term = yield  # 产出值为None
        if verbose:
            print('received:', term)
        if isinstance(term, Sentinel):
            break
        total += term
        count += 1
        average = total / count
    return Result(count, average)

In [44]:
def compute():
    res = yield from averager2(True)
    print('computed:', res)
    return res


In [45]:
comp = compute()

In [46]:
for v in [None, 10, 20, 30, STOP]:
    try:
        comp.send(v)
    except StopIteration as exc:
        result = exc.value

received: 10
received: 20
received: 30
received: <Sentinel>
computed: Result(count=3, average=20.0)


In [41]:
coro_avg = averager2()
coro_avg.send(None)
coro_avg.send(10)

In [42]:
coro_avg.send(10)
# coro_avg.close()

In [43]:
try:
    coro_avg.send(STOP)
except StopIteration as exc:
    result = exc.value
result

Result(count=2, average=10.0)

In [47]:
with open(r'example-code-2e\18-with-match\mirror.py') as fp:
    src = fp.read(60)
len(src)

60

In [48]:
fp

<_io.TextIOWrapper name='example-code-2e\\18-with-match\\mirror.py' mode='r' encoding='utf-8'>

In [49]:
fp.closed, fp.encoding

(True, 'utf-8')

In [50]:
fp.mode

'r'

In [51]:
fp.read(6)

ValueError: I/O operation on closed file.

In [2]:
import sys
import contextlib

class LookingGlass:

    def __enter__(self):
        self.original_write = sys.stdout.write
        sys.stdout.write = self.reverse_write
        return 'JABBERWOCKY'

    def reverse_write(self, text):
        self.original_write(text[::-1])

    def __exit__(self, exc_type, exc_value, traceback):
        sys.stdout.write = self.original_write
        if exc_type is ZeroDivisionError:
            print('Please DO NOT divide by zero!')
            return True

# @contextlib.contextmanager
# @contextlib.contextmanager
# def looking_glass():
#     original_write = sys.stdout.write
#     def reverse_write(text):
#         original_write(text[::-1])
#     sys.stdout.write = reverse_write
#     yield 'JABBERWOCKY'
#     sys.stdout.write = original_write

In [3]:
with LookingGlass() as what:
    print('Alice, Kitty and Snowdrop')
    print(what)

pordwonS dna yttiK ,ecilA
YKCOWREBBAJ


In [4]:
@contextlib.contextmanager
def looking_glass():
    original_write = sys.stdout.write
    def reverse_write(text):
        original_write(text[::-1])
    sys.stdout.write = reverse_write
    msg = ''
    try:
        yield 'JABBERWOCKY'
    except ZeroDivisionError:
        msg = 'Please DO'
    finally:
        sys.stdout.write = original_write
        if msg:
            print(msg)

In [5]:
with looking_glass() as what:
    print('Alice, Kitty and Snowdrop')
    print(what)

pordwonS dna yttiK ,ecilA
YKCOWREBBAJ


In [6]:
what

'JABBERWOCKY'

In [7]:
manager = LookingGlass()
manager

<__main__.LookingGlass at 0x198ee03aa20>

In [8]:
monster = manager.__enter__()

In [9]:
monster == 'JABBERWOCKY'

True

In [10]:
manager

<__main__.LookingGlass at 0x198ee03aa20>

In [11]:
manager.__exit__(None, None, None)

In [12]:
@looking_glass()
def verse():
    print('The time has come')

In [13]:
verse()

emoc sah emit ehT


In [14]:
'asdfafsa'

'asdfafsa'

In [15]:
import sys

sys.getswitchinterval()

0.005

# Python并发模型

In [38]:
import itertools
import time
from threading import Thread, Event

def spin(msg: str, done: Event) -> None:
    for char in itertools.cycle(r'\|/-'):
        status = f'\r{char} {msg}'
        print(status, end='', flush=True)
        if done.wait(.1):
            break
        blanks = ' ' * len(status)
        print(f'\r{blanks}\r', end='')

def slow() -> int:
    time.sleep(3)
    return 42

In [39]:
def supervisor() -> int:
    done = Event()
    spinner = Thread(target=spin, args=('thinking!', done))
    print(f'spinner object: {spinner}')
    spinner.start()
    result = slow()
    done.set()
    spinner.join()  # 等待spinner线程结束
    return result

In [40]:
result = supervisor()

spinner object: <Thread(Thread-11 (spin), initial)>
- thinking! 

In [20]:
result

42

In [41]:
import itertools
import time
# from threading import Thread, Event
from multiprocessing import Process, Event
from multiprocessing import synchronize

def spin(msg: str, done: synchronize.Event) -> None:
    for char in itertools.cycle(r'\|/-'):
        status = f'\r{char} {msg}'
        print(status, end='', flush=True)
        if done.wait(.1):
            break
        blanks = ' ' * len(status)
        print(f'\r{blanks}\r', end='')

def slow() -> int:
    time.sleep(3)
    return 42

In [42]:
type(Event)

method

In [43]:
def supervisor() -> int:
    done = Event()
    spinner = Process(target=spin, args=('thinking!', done))
    print(f'spinner object: {spinner}')
    spinner.start()
    result = slow()
    done.set()
    spinner.join()  # 等待spinner线程结束
    return result

In [44]:
result = supervisor()

spinner object: <Process name='Process-3' parent=1480 initial>


In [None]:
result