 ## Chapter 5 类与接口

### 37. 用组合起来的类实现多层结构， 不要使用嵌套的内置类型

*使用字典作为动态内部状态，如成绩记录册，不知道会出现哪些姓名*

In [1]:
from typing import List
class SimplrGradebook:
    def __init__(self) -> None:
        self._grades:dict[str, list[float]] = {}

    def add_student(self, name: int):
        self._grades[name] = []

    def report_grade(self, name:str, score: float):
        self._grades[name].append(score)

    def average_grade(self, name: str) -> float:
        grades: List[float] = self._grades[name]
        return sum(grades) / len(grades)

In [2]:
# 使用方法
book = SimplrGradebook()
book.add_student('Isaac Newton')
book.report_grade('Isaac Newton', 90)
book.report_grade('Isaac Newton', 95)
book.report_grade('Isaac Newton', 85)

print(book.average_grade('Isaac Newton'))

90.0


In [8]:
# 添加新功能，记录每名学生各门课程的分数 需要引用defaultdict结构
from collections import defaultdict


class BySubjectGradebook:
    def __init__(self) -> None:
        self._grades: dict[str, defaultdict[str, list]] = {}

    def add_student(self, name):
        self._grades[name] = defaultdict(list)

    def report_grade(self, name: str, subject: str, grade: float):
        """
        使用defaultdict, 将一个科目的多次考试存入一个数据结构中
        """
        by_subject = self._grades[name]
        grade_list = by_subject[subject]
        grade_list.append(grade)

    def average_grade(self, name: str) -> float:
        by_subject = self._grades[name]
        total_grades, counts = 0, 0
        print(by_subject.values())
        for grades in by_subject.values():
            counts += len(grades)
            total_grades += sum(grades)
        return total_grades / counts

In [9]:
# 使用方法
book = BySubjectGradebook()
book.add_student('Albert Einstein')
book.report_grade('Albert Einstein', 'Math', 75)
book.report_grade('Albert Einstein', 'Math', 65)
book.report_grade('Albert Einstein', 'Gym', 90)
book.report_grade('Albert Einstein', 'Gym', 95)
book.average_grade('Albert Einstein')
# 需求再次变化， 需要记录权重

dict_values([[75, 65], [90, 95]])


81.25

In [10]:
class WeightGradebook(BySubjectGradebook):
    def __init__(self) -> None:
        super().__init__()

    def report_grade(self, name: str, subject: str, grade: float, weight: float):
        by_subject = self._grades[name]
        grade_list = by_subject[subject]
        grade_list.append((grade, weight))

    def average_grade(self, name):
        bysubject = self._grades[name]
        score_sum, score_count = 0, 0
        for subject, scores in bysubject.items():
            subject_avg, total_weight = 0, 0
            for score, weight in scores:
                subject_avg += score * weight
                total_weight += weight
            score_sum += subject_avg / total_weight
            score_count += 1
        return score_sum / score_count

In [12]:
book = WeightGradebook()
book.add_student('Albert Einstein')
book.report_grade('Albert Einstein', 'Math', 75, 0.05)
book.report_grade('Albert Einstein', 'Math', 65, 0.15)
book.report_grade('Albert Einstein', 'Math', 70, 0.8)
book.report_grade('Albert Einstein', 'Gym', 100, 0.4)
book.report_grade('Albert Einstein', 'Gym', 85, 0.6)
book.average_grade('Albert Einstein')

80.25

In [18]:
# 上述实现方法嵌套过多, 可以通过将多层嵌套的内置类型重构为类体系
# 将单个学生构建成一个类， 并结合nametuple作为学生成绩的容器
from collections import namedtuple

Grade = namedtuple("Grade", ("score, weight"))


# 先以每个科目为单位定义
class Subject:
    def __init__(self) -> None:
        self._grade: list[Grade] = []

    def report_grade(self, grade: float, weight: float):
        self._grade.append(Grade(grade, weight))

    def average_grade(self):
        total, total_weight = 0, 0
        for grade in self._grade:
            total += grade.weight * grade.score
            total_weight += grade.weight
        return total / total_weight


# 再定义代表每个学生的类
class Students:
    def __init__(self) -> None:
        self._subject: defaultdict[str, Subject] = defaultdict(Subject)

    def get_subject(self, subject: str):
        return self._subject[subject]

    def average_grade(self):
        total_grade, total_counts = 0, 0
        for _, subject in self._subject.items():
            total_grade += subject.average_grade()
            total_counts += 1
        return total_grade / total_counts


# 最后定义成绩册类
class GradeBook:
    def __init__(self) -> None:
        self._students: defaultdict[str, Students] = defaultdict(Students)

    def get_student(self, name: str):
        return self._students[name]

In [19]:
# 使用上述类进行记录和计算
book = GradeBook()
albert = book.get_student('Albert Einstein')
math = albert.get_subject('Math')
math.report_grade(75, 0.05)
math.report_grade(65, 0.15)
math.report_grade(70, 0.80)
gym = albert.get_subject('Gym')
gym.report_grade(100, 0.4)
gym.report_grade(85, 0.6)
print(albert.average_grade())

80.25


1. 不要再字典内嵌套长元组和字典
2. 如果部分数据不会变化, 可以考虑使用nametuple做容器
3. 如果使用字典、列表等容器的维护越来越复杂, 可以考虑使用多个类实现

## 38. 让简单的接口接受函数,而不是类的实例

In [25]:
def log_missing():
    print('Key added')
    return 0
current = {'green': 12, 'blue': 3}
increments = [('red', 5), ('blue', 17), ('orange', 9)]
result = defaultdict(log_missing, current)
print(f'Before: {dict(result)}')
for key, amount in increments:
    result[key] = amount
print(f'After: {dict(result)}')


Before: {'green': 12, 'blue': 3}
After: {'green': 12, 'blue': 17, 'red': 5, 'orange': 9}


In [26]:
# 当需要统计键名缺失现象出现次数时， 考虑使用有状态闭包实现
def increment_with_report(current, increments):
    added_count = 0

    def missing():
        nonlocal added_count  # 有状态闭包
        added_count += 1
        return 0
    result = defaultdict(missing, current)
    for key, amount in increments:
        result[key] += amount
    return result, added_count

In [27]:
# 使用结果
result, count = increment_with_report(current, increments)
assert count == 2

In [35]:
# 为了让代码更清晰, 可以维护一个小类把闭包用到变量以类内实例变量封装起来
class CountMissing:
    def __init__(self) -> None:
        self.added = 0
    
    def missing(self):
        self.added += 1
        return 0

counter = CountMissing()
result = defaultdict(counter.missing, current)
print(f"{result=}")
for key, amount in increments:
    print(key)
    result[key] += amount  # 注意 += 才会触发missing函数 因为有两步 1. 访问key 2.相加
    # 与此对应, '='不会触发missing 因为直接赋值未产生访问不存在key的场景
print(counter.added)
print(f'After {result=}')
assert counter.added == 2


result=defaultdict(<bound method CountMissing.missing of <__main__.CountMissing object at 0x000001ADB8F43280>>, {'green': 12, 'blue': 3})
red
blue
orange
2
After result=defaultdict(<bound method CountMissing.missing of <__main__.CountMissing object at 0x000001ADB8F43280>>, {'green': 12, 'blue': 20, 'red': 5, 'orange': 9})


In [36]:
# 使用魔术方法中的"__call__"将类转化为可调用对象
from typing import Any


class BetterCountMissing:
    def __init__(self) -> None:
        self.added = 0

    def __call__(self, *args: Any, **kwds: Any) -> Any:
        self.added += 1
        return 0
    
counter = BetterCountMissing()
assert counter() == 0
assert callable(counter)

In [37]:
counter = BetterCountMissing()
result = defaultdict(counter, current)

In [38]:
for key, amount in increments:
    print(key)
    result[key] += amount 
assert counter.added == 2

red
blue
orange


1. 考虑让组件之间通过接口交互, 让接口接受挂钩函数, 不一定要定义新类
2. python的函数和方法都是first class 可以像其他类一样用在表达式中
3. '__call__'方法定义后相关实例就可以在调用时执行方法下的操作
4. 可以考虑使用class + __call__维护状态, 毕竟闭包太抽象了。

## 39. 通过@classmethod多态来构造同一体系的各类对象
[多态]： 使同一体系的多个类可以按照各自独有的方式来实现同一个方法。

In [49]:
# 进行一个MapReduce(映射->归纳/映射->化简)流程并以类表示输入数据时, 首先定义一个InputData类, 把read方法留给子类
class InputData:
    """
    输入数据的基类
    """
    def read(self):
        raise NotImplementedError
    
# 然后再编写具体的输入数据子类
class PathInputData(InputData):
    def __init__(self, path: str) -> None:
        super().__init__()
        self.path = path

    def read(self):
        with open(self.path) as f:
            return f.read()
        
# 让处理文件的Worker也拥有一套标准接口
# 定义Worker基类
class Worker:
    """  
    处理输入数据的Worker基类  

    Args:  
        input_data (InputData or its subclasses): 输入数据的实例, 应为InputData或其子类的实例  
    """  
    def __init__(self, input_data: InputData) -> None:
        self.input_data = input_data

    def map(self):
        raise NotImplementedError
    
    def reduce(self, other):
        raise NotImplementedError
    
# 定义一种具体的worker子类
class LineCountWorker(Worker):
    def map(self):
        data: str = self.input_data.read()
        self.result = data.count('\n')

    def reduce(self, other):
        self.result += other.result

In [52]:
# 现在两个体系都抽象出来合理的接口，但必须落实到具体的对象上
# 如何让使用辅助函数构造对象并编排MapReduce流程？
# [x] 1. 手动构建对象并拼接
import os


def generate_inputs(data_dir):
    """
    产生迭代器用以生成数据及其对应的class
    """
    for name in os.listdir(data_dir):
        yield PathInputData(os.path.join(data_dir, name))


from typing import Generator


def creat_workers(input_list: Generator):
    """
    针对传入的实例创建对应的worker
    """
    workers: list[LineCountWorker] = []
    for input_data in input_list:
        workers.append(LineCountWorker(input_data))
    return workers


# 将worker映射到每个线程中去
from threading import Thread


def execute(workers: list[LineCountWorker]):
    """
    分线程计算换行, 最后进行reduce
    """
    threads = [Thread(target=w.map) for w in workers]
    for thread in threads:
        thread.start()
    for thread in threads:
        thread.join()

    first, *rest = workers
    for worker in rest:
        first.reduce(worker)
    return first.result


# 最后编写函数将上述三个过程连接
def mapreduce(data_path):
    inputs = generate_inputs(data_dir=data_path)
    workers = creat_workers(inputs)
    return execute(workers)

In [None]:
# 造一个用于统计的文件
import random


def write_test_files(out_path):
    os.makedirs(out_path)
    for i in range(100):
        with open(os.path.join(out_path, str(i)), "w") as f:
            f.write("\n" * random.randint(0, 100))

write_test_files('./data/ch5_demo')

In [54]:
result = mapreduce('./data/ch5_demo')
print(f'{result =}')

result =5050


In [58]:
# 由于python的类只支持一个构造方法'__init__',所以最好使用类方法的多态来解决
class GenericInputData:
    """
    带工厂函数的数据基类
    """

    def read(self):
        raise NotImplementedError

    @classmethod
    def generate_inputs(cls, config):
        raise NotImplementedError


class PathInputData(GenericInputData):
    def __init__(self, path: str) -> None:
        super().__init__()
        self.path = path

    def read(self):
        with open(self.path) as f:
            return f.read()

    @classmethod
    def generate_inputs(cls, config):
        data_dir = config["data_dir"]
        for name in os.listdir(data_dir):
            yield cls(os.path.join(data_dir, name))

In [61]:
# 对worker进行类似改写
class GenericWorker:
    def __init__(self, input_data: PathInputData) -> None:
        self.input_data = input_data
        self.result = None

    def map(self):
        raise NotImplementedError

    def reduce(self, other):
        raise NotImplementedError

    @classmethod
    def creat_workers(cls, input_class: PathInputData, config)-> list:
        workers = []
        for input_data in input_class.generate_inputs(config):
            workers.append(cls(input_data))
        return workers


class LineCountWorker(GenericWorker):
    def map(self):
        data: str = self.input_data.read()
        self.result = data.count("\n")

    def reduce(self, other):
        self.result += other.result

In [62]:
def mapreduce_1(worker_class: LineCountWorker, input_class: PathInputData, config):
    workers = worker_class.creat_workers(input_class, config)
    return execute(workers=workers)

config = {'data_dir': './data/ch5_demo'}
result = mapreduce_1(LineCountWorker, PathInputData, config)
print(f'With Classmenthod {result= }')

With Classmenthod result= 5050
