`高内聚，低耦合`

`对扩展开放，对修改封闭`

# 1 工厂模式

## 1.1 工厂方法

### 1.1.1 现实生活中的例子

In [12]:
try:
    import django
except:
    !pip install django

In [2]:
import abc

from django import forms

class PersonForm(forms.Form):
    name = forms.CharField(max_length=100)
    birth_date = forms.DateField(required=False)

### 1.1.2 用例

In [3]:
class A:
    pass

a = A()
b = A()
print(id(a) == id(b))
print(a, b)

False
<__main__.A object at 0x785a841ed330> <__main__.A object at 0x785a864d7640>


### 1.1.3 实现

In [4]:
import json, xml.etree.ElementTree as etree

class JSONDataExtractor:
    def __init__(self, filepath):
        self.data = dict()
        with open(filepath, mode='r', encoding='utf-8') as f:
            self.data = json.load(f)
            
    @property
    def parsed_data(self):
        return self.data
    
class XMLDataExtractor:
    def __init__(self, filepath):
        self.tree = etree.parse(filepath)
        
    @property
    def parsed_data(self):
        return self.tree
    
def dataextraction_factory(filepath):
    if filepath.endswith('json'):# 核心代码 工厂：根据字符串返回不同的类
        extractor = JSONDataExtractor
    elif filepath.endswith('xml'):
        extractor = XMLDataExtractor
    else:
        raise ValueError('Cannot extract data from {}'.format(filepath))
    return extractor(filepath)

def extract_data_from(filepath):
    factory_obj = None
    try:
        factory_obj = dataextraction_factory(filepath)
    except ValueError as e:
        print(e)
    return factory_obj

def main():
    sqlite_factory = extract_data_from('data/person.sq3')
    print()
    json_factory = extract_data_from('data/movies.json')
    json_data = json_factory.parsed_data
    print('Found: {} movies'.format(len(json_data)))
    for movie in json_data:
        print('Title: {}'.format(movie['title']))
        year = movie['year']
        if year:
            print('Year: {}'.format(year))
        director = movie['director']
        if director:
            print('Director: {}'.format(director))
        genre = movie['genre']
        if genre:
            print('Genre: {}'.format(genre))
        print()
        
    xml_factory = extract_data_from('data/person.xml')
    xml_data = xml_factory.parsed_data
    liars = xml_data.findall(".//{}[{}='{}']".format('person', 'lastName', 'Liar'))
    print('Found: {} persons'.format(len(liars)))
    for liar in liars:
        firstname = liar.find('firstName').text
        print('First name: {}'.format(firstname))
        lastname = liar.find('lastName').text
        print('Last name: {}'.format(lastname))
        [print('Phone number ({})'.format(p.attrib['type']), p.text) for p in liar.find('phoneNumbers')]
        print()

## 1.2 抽象工厂

### 1.2.3 实现

In [5]:
class Frog:
    def __init__(self, name):
        self.name = name
    
    def __str__(self):
        return self.name
    
    def interact_with(self, obstacle):
        act = obstacle.action()
        msg = f'{self} the Frog encounters {obstacle} and {act}!'
        print(msg)
        
class Bug:
    def __str__(self):
        return 'a bug'
    
    def action(self):
        return 'eats it'
    
class FrogWorld:
    def __init__(self, name):
        print(self)
        self.player_name = name
        
    def __str__(self):
        return '\n\n\t------ Frog World ------'
    
    def make_character(self):
        return Frog(self.player_name)
    
    def make_obstacle(self):
        return Bug()

In [6]:
class Wizard:
    def __init__(self, name):
        self.name = name
        
    def __str__(self):
        return self.name
    
    def interact_with(self, obstacle):
        act = obstacle.action()
        msg = f'{self} the Wizard battles against {obstacle} and {act}!'
        print(msg)
        
class Ork:
    def __str__(self):
        return 'an ork'
    
    def action(self):
        return 'kills it'
    
class WizardWorld:
    def __init__(self, name):
        print(self)
        self.player_name = name
        
    def __str__(self):
        return '\n\n\t------ Wizard World ------'
    
    def make_character(self):
        return Wizard(self.player_name)
    
    def make_obstacle(self):
        return Ork()

In [7]:
class GameEnvironment:
    def __init__(self, factory):
        self.hero = factory.make_character()# 核心代码 抽象工厂：创建指定环境下所有工厂
        self.obstacle = factory.make_obstacle()
        
    def play(self):
        self.hero.interact_with(self.obstacle)
        
def validate_age(name):
    try:
        age = input(f'Welcome {name}. How old are you? ')
        age = int(age)
    except ValueError as e:
        print(f"Age {age} is invalid, please try again...")
        return (False, age)
    return (True, age)

def main():
    name = input("Hello. What's your name? ")
    valid_input = False
    while not valid_input:
        valid_input, age = validate_age(name)
    game = FrogWorld if age < 18 else WizardWorld
    environment = GameEnvironment(game(name))
    environment.play()

In [8]:
main()



	------ Frog World ------
3 the Frog encounters a bug and eats it!


# 2 建造者模式

## 2.2 用例

In [9]:
# 工厂模式
MINI14 = '1.4GHz Mac mini'

class AppleFactory:
    class MacMini14:
        def __init__(self):
            self.memory = 4 # GB
            self.hdd = 500 # GB
            self.gpu = 'Intel HD Graphics 5000'
            
        def __str__(self):
            info = (f'Model: {MINI14}',
                    f'Memory: {self.memory}GB',
                    f'Hard Disk: {self.hdd}GB',
                    f'Graphics Card: {self.gpu}')
            return '\n'.join(info)
        
    def build_computer(self, model):
        if model == MINI14:
            return self.MacMini14()
        else:
            msg = f'I dont know how to build {model}'
            print(msg)
            
afac = AppleFactory()
mac_mini = afac.build_computer(MINI14)
print(mac_mini)

Model: 1.4GHz Mac mini
Memory: 4GB
Hard Disk: 500GB
Graphics Card: Intel HD Graphics 5000


In [10]:
# 建造者模式
class Computer:
    def __init__(self, serial_number):
        self.serial = serial_number
        self.memory = None # GB
        self.hdd = None # GB
        self.gpu = None
        
    def __str__(self):
        info = (f'Memory: {self.memory}GB',
                f'Hard Disk: {self.hdd}GB',
                f'Graphics Card: {self.gpu}')
        return '\n'.join(info)
    
class ComputerBuilder:
    def __init__(self):
        self.computer = Computer('AG23385193')
        
    def configure_memory(self, amount):
        self.computer.memory = amount
        
    def configure_hdd(self, amount):
        self.computer.hdd = amount
        
    def configure_gpu(self, gpu_model):
        self.computer.gpu = gpu_model
        
class HardwareEngineer:
    def __init__(self):
        self.builder = None
        
    def construct_computer(self, memory, hdd, gpu):# 核心代码 建造者：按照步骤构建对象（Python几乎用不到，因为Python有可选参数：调用函数时，可以不传入的参数，这些参数有默认值）
        self.builder = ComputerBuilder()
        steps = (self.builder.configure_memory(memory),
                self.builder.configure_hdd(hdd),
                self.builder.configure_gpu(gpu), print('step 0'))
        print('step 1')
        [step for step in steps]
        
    @property
    def computer(self):
        return self.builder.computer
    
def main():
    engineer = HardwareEngineer()
    engineer.construct_computer(hdd=500, memory=8, gpu='GeForce GTX 650 Ti')
    computer = engineer.computer
    print(computer)
    
main()

step 0
step 1
Memory: 8GB
Hard Disk: 500GB
Graphics Card: GeForce GTX 650 Ti


## 2.3 实现

In [11]:
from enum import Enum
import time
PizzaProgress = Enum('PizzaProgress', 'queued preparation baking ready')# 空格分隔的字符串，每个字符串都是一个PizzaProgress成员
PizzaDough = Enum('PizzaDough', 'thin thick')
PizzaSauce = Enum('PizzaSauce', 'tomato creme_fraiche')
PizzaTopping = Enum('PizzaTopping', 'mozzarella double_mozzarella bacon ham mushrooms red_onion oregano')
STEP_DELAY = 3 # in seconds for the sake of the example

class Pizza:
    def __init__(self, name):
        self.name = name
        self.dough = None
        self.sauce = None
        self.topping = []
        
    def __str__(self):
        return self.name
    
    def prepare_dough(self, dough):
        self.dough = dough
        print(f'preparing the {self.dough.name} dough of your {self}...')
        time.sleep(STEP_DELAY)
        print(f'done with the {self.dough.name} dough')
        
class MargaritaBuilder:
    def __init__(self):
        self.pizza = Pizza('margarita')
        self.progress = PizzaProgress.queued
        self.baking_time = 5 # in seconds for the sake of the example
        
    def prepare_dough(self):
        self.progress = PizzaProgress.preparation
        self.pizza.prepare_dough(PizzaDough.thin)
        
    def add_sauce(self):
        print('adding the tomato sauce to your margarita...')
        self.pizza.sauce = PizzaSauce.tomato
        time.sleep(STEP_DELAY)
        print('done with the tomato sauce')
        
    def add_topping(self):
        topping_desc = 'double mozzarella, oregano'
        topping_items = (PizzaTopping.double_mozzarella, PizzaTopping.oregano)
        print(f'adding the topping ({topping_desc}) to your margarita')
        self.pizza.topping.append([t for t in topping_items])
        time.sleep(STEP_DELAY)
        print(f'done with the topping ({topping_desc})')
        
    def bake(self):
        self.progress = PizzaProgress.baking
        print(f'baking your margarita for {self.baking_time} seconds')
        time.sleep(self.baking_time)
        self.progress = PizzaProgress.ready
        print('your margarita is ready')
        
class CreamyBaconBuilder:
    def __init__(self):
        self.pizza = Pizza('creamy bacon')
        self.progress = PizzaProgress.queued
        self.baking_time = 7 # in seconds for the sake of the example
        
    def prepare_dough(self):
        self.progress = PizzaProgress.preparation
        self.pizza.prepare_dough(PizzaDough.thick)
        
    def add_sauce(self):
        print('adding the crème fraîche sauce to your creamy bacon')
        self.pizza.sauce = PizzaSauce.creme_fraiche
        time.sleep(STEP_DELAY)
        print('done with the crème fraîche sauce')
        
    def add_topping(self):
        topping_desc = 'mozzarella, bacon, ham, mushrooms, red onion, oregano'
        topping_items = (PizzaTopping.mozzarella, PizzaTopping.bacon, PizzaTopping.ham, PizzaTopping.mushrooms, PizzaTopping.red_onion, PizzaTopping.oregano)
        print(f'adding the topping ({topping_desc}) to your creamy bacon')
        self.pizza.topping.append([t for t in topping_items])
        time.sleep(STEP_DELAY)
        print(f'done with the topping ({topping_desc})')
        
    def bake(self):
        self.progress = PizzaProgress.baking
        print(f'baking your creamy bacon for {self.baking_time} seconds')
        time.sleep(self.baking_time)
        self.progress = PizzaProgress.ready
        print('your creamy bacon is ready')
        
class Waiter:
    def __init__(self):
        self.builder = None
        
    def construct_pizza(self, builder):
        self.builder = builder
        steps = (builder.prepare_dough, builder.add_sauce, builder.add_topping, builder.bake)
        [step() for step in steps]
        
    @property
    def pizza(self):
        return self.builder.pizza
    
def validate_style(builders):
    try:
        input_msg = 'What pizza would you like, [m]argarita or [c]reamy bacon? '
        pizza_style = input(input_msg)
        builder = builders[pizza_style]()
        valid_input = True
    except KeyError as e:
        error_msg = 'Sorry, only margarita (key m) and creamy bacon (key c) are available'
        print(error_msg)
        return (False, None)
    return (True, builder)

def main():
    builders = dict(m=MargaritaBuilder, c=CreamyBaconBuilder)
    valid_input = False
    while not valid_input:
        valid_input, builder = validate_style(builders)
    print()
    waiter = Waiter()
    waiter.construct_pizza(builder)
    pizza = waiter.pizza
    print()
    print(f'Enjoy your {pizza}!')
    
main()


preparing the thick dough of your creamy bacon...
done with the thick dough
adding the crème fraîche sauce to your creamy bacon
done with the crème fraîche sauce
adding the topping (mozzarella, bacon, ham, mushrooms, red onion, oregano) to your creamy bacon
done with the topping (mozzarella, bacon, ham, mushrooms, red onion, oregano)
baking your creamy bacon for 7 seconds
your creamy bacon is ready

Enjoy your creamy bacon!


In [12]:
# 流畅建造者
class Pizza:
    def __init__(self, builder):
        self.garlic = builder.garlic
        self.extra_cheese = builder.extra_cheese
    def __str__(self):
        garlic = 'yes' if self.garlic else 'no'
        cheese = 'yes' if self.extra_cheese else 'no'
        info = (f'Garlic: {garlic}',
                f'Extra cheese: {cheese}')
        return '\n'.join(info)
    
class PizzaBuilder:
    def __init__(self):
        self.extra_cheese = False
        self.garlic = False
        
    def add_garlic(self):
        self.garlic = True
        return self
    
    def add_extra_cheese(self):
        self.extra_cheese = True
        return self
    
    def build(self):
        return Pizza(self)
    
pizza = PizzaBuilder().add_garlic().add_extra_cheese().build()
print(pizza)

Garlic: yes
Extra cheese: yes


# 3 其他创建型模式

## 3.1 原型模式

In [13]:
import copy

class Website:
    def __init__(self, name, domain, description, author, **kwargs):
        '''Examples of optional attributes (kwargs): category, creation_date, technologies, keywords'''
        self.name = name
        self.domain = domain
        self.description = description
        self.author = author
        
        for key in kwargs:
            setattr(self, key, kwargs[key])
            
    def __str__(self):
        summary = [f'Website "{self.name}"\n', ]
        infos = vars(self).items()
        ordered_infos = sorted(infos)
        for attr, val in ordered_infos:
            if attr == 'name':
                continue
            summary.append(f'{attr}: {val}\n')
        return ''.join(summary)
    
class Prototype:
    def __init__(self):
        self.objects = dict()
        
    def register_object(self, identifier, obj):
        self.objects[identifier] = obj
        
    def unregister_object(self, identifier):
        del self.objects[identifier]
        
    def clone(self, identifier, **attrs):
        found = self.objects.get(identifier)
        if not found:
            raise ValueError(f'Incorrect object identifier: {identifier}')
        obj = copy.deepcopy(found)# 核心代码 原型：复制已被证明有效的内容
        for key in attrs:
            setattr(obj, key, attrs[key])
        return obj
    
def main():
    keywords = ('python', 'data', 'apis', 'automation')
    site1 = Website('ContentGardening', 
                    domain='contentgardening.com', 
                    description='Automation and data-driven apps', 
                    author='Kamon Ayeva', 
                    category='Blog',
                    keywords=keywords)
    
    prototype = Prototype()
    identifier = 'ka-cg-1'
    prototype.register_object(identifier, site1)
    site2 = prototype.clone(identifier, 
                            name='ContentGardeningPlayground', 
                            domain='play.contentgardening.com', 
                            description='Experimentation for techniques featured on the blog',
                            category='Membership site',
                            creation_date='2018-08-01')
    
    for site in (site1, site2):
        print(site)
    print(f'ID site1 : {id(site1)} != ID site2 : {id(site2)}')
    
main()

Website "ContentGardening"
author: Kamon Ayeva
category: Blog
description: Automation and data-driven apps
domain: contentgardening.com
keywords: ('python', 'data', 'apis', 'automation')

Website "ContentGardeningPlayground"
author: Kamon Ayeva
category: Membership site
creation_date: 2018-08-01
description: Experimentation for techniques featured on the blog
domain: play.contentgardening.com
keywords: ('python', 'data', 'apis', 'automation')

ID site1 : 132330195805728 != ID site2 : 132330195811344


## 3.2 单例模式

### 3.2.3 实现

In [14]:
# 非单例
import urllib.parse
import urllib.request

class URLFetcher:
    def __init__(self):
        self.urls = []
        
    def fetch(self, url):
        req = urllib.request.Request(url)
        with urllib.request.urlopen(req) as response:
            if response.getcode() == 200:
                the_page = response.read()
                print(the_page)
                urls = self.urls
                urls.append(url)
                self.urls = urls
                
def main():
    f1 = URLFetcher()
    f2 = URLFetcher()
    print(f1 is f2)
    print(URLFetcher() is URLFetcher())
    
main()

False
False


In [15]:
# 单例
class SingletonType(type):
    _instances = {} # 原理：类的可变属性是类属性，所有实例共享，且惰性
    def __call__(cls, *args, **kwargs):
        if cls not in cls._instances:
            # print(f'super(SingletonType, cls): {super(SingletonType, cls)}')
            # cls._instances[cls] = super(SingletonType, cls).__call__(*args, **kwargs)
            # print(f'super(): {super()}')
            cls._instances[cls] = super().__call__(*args, **kwargs)# 核心代码 单例：只有第一次__call__创建实例
            print(f'SingletonType __call__')
        return cls._instances[cls]
    
class URLFetcher(metaclass=SingletonType):
    def __init__(self):
        self.urls = []
        print(f'URLFetcher __init__')
        
    def fetch(self, url):
        req = urllib.request.Request(url)
        with urllib.request.urlopen(req) as response:
            if response.getcode() == 200:
                the_page = response.read()
                # print(the_page)
                urls = self.urls
                urls.append(url)
                self.urls = urls
                
    def dump_url_registry(self):
        return ', '.join(self.urls)
    

def main():
    MY_URLS = ['http://www.voidspace.org.uk',
                'http://google.com',
                'http://python.org',
                'https://www.python.org/error',
                ]
    print('before URLFetcher()')
    fetcher = URLFetcher()
    print('after URLFetcher()')
    
    print(URLFetcher() is URLFetcher())
    
    for url in MY_URLS:
        try:
            fetcher.fetch(url)
        except Exception as e:
            print(e)
    print('-'*7)
    done_urls = fetcher.dump_url_registry()
    print(f'Done URLs: {done_urls}')
    
main()

before URLFetcher()
URLFetcher __init__
SingletonType __call__
after URLFetcher()
True


<urlopen error [Errno -3] Temporary failure in name resolution>
HTTP Error 404: Not Found
-------
Done URLs: http://google.com, http://python.org


In [16]:
# self和cls
# self
class MyClass:
    def __init__(a, x):# 通常为self
        a.x = x

    def my_method(a):
        print("Instance object:", a)
        print("Argument x:", a.x)

obj = MyClass(10)
obj.my_method()  # 输出: 10

# cls
class MyClass:
    class_variable = 10

    @classmethod
    def my_class_method(b):# 通常为cls
        print(b.class_variable)

MyClass.my_class_method()  # 输出: 10

Instance object: <__main__.MyClass object at 0x785a7c0812d0>
Argument x: 10
10


In [65]:
class A:
    x, y = [], 10
    print(f'id(x): {id(x)}, id(y): {id(y)}')

o = A()
o2 = A()

id(x): 132329661240384, id(y): 132330221847056


In [54]:
class D:
    def __init__(self):
        self.val = 0

    def __get__(self, obj, owner=None):
        # print(self, obj, owner)
        print(f'self: {self}, \nobj: {obj}, \nowner: {owner}')
        return self.val

    def __set__(self, obj, value):
        self.val = value

class A:
    x = D()
    y = 10
    print(f'id(x): {id(x)}, id(y): {id(y)}')

o = A()
print()
o2 = A()
print(o.x)
o.x = 1
print(o.x)
print(o2.x)

o.y = 20
print(o.y)
print(o2.y)
print(f'id(o.y): {id(o.y)}')
print(f'id(o2.y): {id(o2.y)}')

id(x): 132329666903680, id(y): 132330221847056

self: <__main__.D object at 0x785a66c9fe80>, 
obj: <__main__.A object at 0x785a66c9d5d0>, 
owner: <class '__main__.A'>
0
self: <__main__.D object at 0x785a66c9fe80>, 
obj: <__main__.A object at 0x785a66c9d5d0>, 
owner: <class '__main__.A'>
1
self: <__main__.D object at 0x785a66c9fe80>, 
obj: <__main__.A object at 0x785a66a7c310>, 
owner: <class '__main__.A'>
1
20
10
id(o.y): 132330221847376
id(o2.y): 132330221847056


# 4 适配器模式

改名

## 4.3 实现

In [17]:
class Club:
    def __init__(self, name):
        self.name = name
        
    def __str__(self):
        return f'the club {self.name}'
    
    def organize_event(self):
        return 'hires an artist to perform for the people'
    
class Musician:
    def __init__(self, name):
        self.name = name
        
    def __str__(self):
        return f'the musician {self.name}'
    
    def play(self):
        return 'plays music'
    
class Dancer:
    def __init__(self, name):
        self.name = name
        
    def __str__(self):
        return f'the dancer {self.name}'
    
    def dance(self):
        return 'does a dance performance'
    
class Adapter:# 适配器，打包时传入对象和适配方法
    def __init__(self, obj, adapted_methods):
        self.obj = obj
        self.__dict__.update(adapted_methods)
        
    def __str__(self):
        return str(self.obj)
    
def main():
    objects = [Club('Jazz Cafe'), Musician('Roy Ayers'), Dancer('Shane Sparks')]
    for obj in objects:
        if hasattr(obj, 'play') or hasattr(obj, 'dance'):
            if hasattr(obj, 'play'):# 核心代码 适配器：以选定方法更新统一接口
                adapted_methods = dict(organize_event=obj.play)
            elif hasattr(obj, 'dance'):
                adapted_methods = dict(organize_event=obj.dance)
            obj = Adapter(obj, adapted_methods)
        print(f'{obj} {obj.organize_event()}')
        
main()

the club Jazz Cafe hires an artist to perform for the people
the musician Roy Ayers plays music
the dancer Shane Sparks does a dance performance


# 5 装饰器模式

添加/装饰 非特定标的

常用装饰器：权限管理、缓存、计时、单例模式、日志

## 5.3 实现

In [18]:
def number_sum(n):
    '''返回前n个数字的和'''
    assert (n >= 0), 'n must be >= 0'
    if n == 0:
        return 0
    else:
        return n + number_sum(n-1)

In [19]:
%%timeit
number_sum(30)

3.67 µs ± 324 ns per loop (mean ± std. dev. of 7 runs, 100,000 loops each)


In [20]:
sum_cache = {0: 0}
def number_sum(n):
    '''返回前n个数字的和'''
    assert (n >= 0), 'n must be >= 0'
    if n in sum_cache:
        return sum_cache[n]
    res = n + number_sum(n-1)
    # 将值加入缓存
    sum_cache[n] = res
    return res

In [21]:
%%timeit
number_sum(300)

117 ns ± 7.5 ns per loop (mean ± std. dev. of 7 runs, 10,000,000 loops each)


In [22]:
cache_fib = {0:0, 1:1}
def fibonacci(n):
    '''返回斐波那契数列的第n个数'''
    assert (n >= 0), 'n must be >= 0'
    if n in cache_fib:
        return cache_fib[n]
    res = fibonacci(n-1) + fibonacci(n-2)
    cache_fib[n] = res
    return res

import functools
def memoize(fn):
    '''缓存装饰器'''
    cache = {}
    @functools.wraps(fn)# 核心代码 装饰器：返回添加功能后的函数
    def memoizer(*args):
        if args not in cache:
            cache[args] = fn(*args)
        return cache[args]
    return memoizer

@memoize
def number_sum(n):
    '''返回前n个数字的和'''
    assert (n >= 0), 'n must be >= 0'
    if n == 0:
        return 0
    else:
        return n + number_sum(n-1)
    
@memoize
def fibonacci(n):
    '''返回斐波那契数列的第n个数'''
    assert (n >= 0), 'n must be >= 0'
    if n in (0, 1):
        return n
    return fibonacci(n-1) + fibonacci(n-2)

In [23]:
%%timeit
number_sum(300)

169 ns ± 3.37 ns per loop (mean ± std. dev. of 7 runs, 10,000,000 loops each)


In [24]:
%%timeit
fibonacci(30)

165 ns ± 3.83 ns per loop (mean ± std. dev. of 7 runs, 10,000,000 loops each)


In [25]:
# 使用lru_cache
import functools
@functools.lru_cache(maxsize=None)
def number_sum(n):
    '''返回前n个数字的和'''
    assert (n >= 0), 'n must be >= 0'
    if n == 0:
        return 0
    else:
        return n + number_sum(n-1)
    
@functools.lru_cache(maxsize=None)
def fibonacci(n):
    '''返回斐波那契数列的第n个数'''
    assert (n >= 0), 'n must be >= 0'
    if n in (0, 1):
        return n
    return fibonacci(n-1) + fibonacci(n-2)

In [26]:
%%timeit
number_sum(300)

70.3 ns ± 6.99 ns per loop (mean ± std. dev. of 7 runs, 10,000,000 loops each)


In [27]:
%%timeit
fibonacci(30)

70 ns ± 2.88 ns per loop (mean ± std. dev. of 7 runs, 10,000,000 loops each)


# 6 桥接模式

脱裤子放屁的无需适配的适配器

## 6.3 实现

In [28]:
class ResourceContent:
    """
    定义抽象接口
    维护一个实现者的对象引用
    """
    def __init__(self, imp):
        self._imp = imp
        
    def show_content(self, path):
        return self._imp.fetch(path)
    
class ResourceContentFetcher(metaclass=abc.ABCMeta):# metaclass=abc.ABCMeta表示ResourceContentFetcher是一个抽象类，不能实例化。桥接模式，从抽象类派生出实现类，方法名相同
    """
    为获取内容的实现类定义接口
    """
    @abc.abstractmethod# abstractmethod：抽象方法，子类须实现；classmethod：类方法(cls)，访问类属性；staticmethod：静态方法()，不访问类与实例属性
    def fetch(path):
        pass
    
class URLFetcher(ResourceContentFetcher):
    """
    实现实现者接口并定义其具体实现
    """
    def fetch(self, path):
        # path是一个url
        req = urllib.request.Request(path)
        with urllib.request.urlopen(req) as response:
            if response.getcode() == 200:
                the_page = response.read()
                print(the_page[:100])
                
class LocalFileFetcher(ResourceContentFetcher):
    """
    实现实现者接口并定义其具体实现
    """
    def fetch(self, path):
        # path是一个文本文件的文件路径
        with open(path) as f:
            print(f.read()[:100])
            
def main():
    url_fetcher = URLFetcher()
    iface = ResourceContent(url_fetcher)
    iface.show_content('http://python.org')
    
    print('='*20)
    localfs_fetcher = LocalFileFetcher()
    iface = ResourceContent(localfs_fetcher)
    iface.show_content('./chapter06/file.txt')
    
main()

b'\x1f\x8b\x08\x00\x00\x00\x00\x00\x00\x03\xed}\xf9r\x1bG\x92\xf7\xff\x8e\x98w(C\xf1\x89\xd4X\r\x10\xe0)\x8a\x84G\x92)\x99\x1e\x1d\x1cS\xb2v>\xafC\xd1h4\x80&\x81n\xa8\x0f\x92\xf0\xcc<\xd2>\xc5\xbe\xd8\xfe2\xab\xaa\xbb\xfa\x04 \xd2\xb2\x1d\xa1\x9d\xb5\x084\xaa\xeb\xc8\xca\xbb2\xb3\x8e\xbe\x1e\x06N\xbc\x98\xbbb\x12\xcf\xa6'
Lorem ipsum dolor sit amet, consectetur adipiscing elit. Proin in nibh in enim euismod mattis placer


# 7 外观模式

松散耦合

## 7.3 实现

In [29]:
from enum import Enum
from abc import ABCMeta, abstractmethod

State = Enum('State', 'new running sleeping restart zombie')
class Server(metaclass=ABCMeta):
    @abstractmethod
    def __init__(self):
        pass
    
    def __str__(self):
        return self.name
    
    @abstractmethod
    def boot(self):
        pass
    
    @abstractmethod
    def kill(self):
        pass

class FileServer(Server):
    def __init__(self):
        '''初始化文件服务器所需的操作'''
        self.name = 'FileServer'
        self.state = State.new
        
    def boot(self):
        '''启动文件服务器所需的操作'''
        print(f'booting the {self}')
        self.state = State.running
        
    def kill(self):
        '''终止文件服务器所需的操作'''
        print(f'killing the {self}')
        self.state = State.restart if self.state == State.running else State.zombie

    def create_file(self, user, name, permissions):
        '''检查权限并创建文件'''
        print(f'trying to create the file "{name}" for user "{user}" with permissions {permissions}')

class ProcessServer(Server):
    def __init__(self):
        '''初始化进程服务器所需的操作'''
        self.name = 'ProcessServer'
        self.state = State.new
        
    def boot(self):
        '''启动进程服务器所需的操作'''
        print(f'booting the {self}')
        self.state = State.running
        
    def kill(self):
        '''终止进程服务器所需的操作'''
        print(f'killing the {self}')
        self.state = State.restart if self.state == State.running else State.zombie

    def create_process(self, user, name):
        '''检查用户权限、生成PID等'''
        print(f'trying to create the process "{name}" for user "{user}"')

class OperatingSystem:
    '''外观'''
    def __init__(self):
        self.fs = FileServer()
        self.ps = ProcessServer()
        
    def start(self):
        [server.boot() for server in (self.fs, self.ps)]
        
    def create_file(self, user, name, permissions):
        self.fs.create_file(user, name, permissions)
        
    def create_process(self, user, name):
        self.ps.create_process(user, name)

def main():
    os = OperatingSystem()
    os.start()
    os.create_file('foo', 'hello', '-rw-r-r')
    os.create_process('bar', 'ls /tmp')

main()

booting the FileServer
booting the ProcessServer
trying to create the file "hello" for user "foo" with permissions -rw-r-r
trying to create the process "ls /tmp" for user "bar"


# 8 其他结构型模式

## 8.1 享元模式

单例模式集合

In [30]:
import random


CarType = Enum('CarType', 'subcompact compact suv')

class Car:
    pool = dict()
    
    def __new__(cls, car_type, *args, **kwargs):
        obj = cls.pool.get(car_type, None)
        if not obj:
            obj = object.__new__(cls)# 核心代码 享元：只有第一次创建实例
            cls.pool[car_type] = obj
            obj.car_type = car_type
        return obj
    
    def render(self, color, x, y):
        type = self.car_type
        msg = f'render a car of type {type} and color {color} at ({x}, {y})'
        print(msg)

def main():
    rnd = random.Random()
    colors = 'white black silver gray red blue brown beige yellow green'.split()
    min_point, max_point = 0, 100
    car_counter = 0

    for _ in range(10):
        c1 = Car(CarType.subcompact)
        c1.render(random.choice(colors), 
                  rnd.randint(min_point, max_point), 
                  rnd.randint(min_point, max_point))
        car_counter += 1

    for _ in range(3):
        c2 = Car(CarType.compact)
        c2.render(random.choice(colors), 
                  rnd.randint(min_point, max_point), 
                  rnd.randint(min_point, max_point))
        car_counter += 1

    for _ in range(5):
        c3 = Car(CarType.suv)
        c3.render(random.choice(colors), 
                  rnd.randint(min_point, max_point), 
                  rnd.randint(min_point, max_point))
        car_counter += 1

    print(f'cars rendered: {car_counter}')
    print(f'cars actually created: {len(Car.pool)}')

    c4 = Car(CarType.subcompact)
    c5 = Car(CarType.subcompact)
    c6 = Car(CarType.suv)
    print(f'{id(c4)} == {id(c5)}? {id(c4) == id(c5)}')
    print(f'{id(c5)} == {id(c6)}? {id(c5) == id(c6)}')

main()

render a car of type CarType.subcompact and color blue at (17, 27)
render a car of type CarType.subcompact and color yellow at (27, 51)
render a car of type CarType.subcompact and color gray at (14, 69)
render a car of type CarType.subcompact and color beige at (70, 42)
render a car of type CarType.subcompact and color brown at (44, 37)
render a car of type CarType.subcompact and color yellow at (21, 96)
render a car of type CarType.subcompact and color black at (4, 30)
render a car of type CarType.subcompact and color green at (72, 49)
render a car of type CarType.subcompact and color yellow at (77, 3)
render a car of type CarType.subcompact and color yellow at (36, 70)
render a car of type CarType.compact and color blue at (36, 46)
render a car of type CarType.compact and color green at (2, 88)
render a car of type CarType.compact and color black at (83, 3)
render a car of type CarType.suv and color red at (95, 49)
render a car of type CarType.suv and color silver at (29, 12)
render 

## 8.2 MVC模式

Model View Controller

模型 视图 控制器

模型可以访问数据并管理应用程序的状态。

视图是模型的表示。视图不需要是图形化的，文本输出也被认为是非常好的视图。

控制器是模型和视图之间的桥梁。

正确使用 MVC 可以保证我们最终得到一个易于维护和扩展的应用程序。

In [2]:
quotes = (
    'A man is not complete until he is married. Then he is finished.',
    'As I said before, I never repeat myself.',
    'Behind a successful man is an exhausted woman.',
    'Black holes really suck...',
    'Facts are stubborn things.'
)

class QuoteModel:
    def get_quote(self, n):
        try:
            value = quotes[n]
        except IndexError as e:
            value = 'Not found'
        return value

class QuoteTerminalView:
    def show(self, quote):
        print(f'And the quote is: "{quote}"')

    def error(self, msg):
        print(f'Error: {msg}')

    def select_quote(self):
        return input('Which quote number would you like to see? ')

class QuoteTerminalController:
    def __init__(self):
        self.model = QuoteModel()
        self.view = QuoteTerminalView()

    def run(self):
        valid_input = False
        while not valid_input:
            try:
                n = self.view.select_quote()
                n = int(n)
                valid_input = True
            except ValueError as e:
                self.view.error(f"Incorrect index '{n}'")
        quote = self.model.get_quote(n)
        self.view.show(quote)

def main():
    controller = QuoteTerminalController()
    while True:
        controller.run()# 核心代码 MVC：分离数据、视图、控制器

main()

And the quote is: "As I said before, I never repeat myself."
Error: Incorrect index ''
And the quote is: "Black holes really suck..."


KeyboardInterrupt: Interrupted by user

## 8.3 代理模式

增加控制功能的装饰器，控制特定标的

常用代理：劫持、过滤、安全检查（智能/引用代理）、（远程代理）、惰性/延迟创建（虚拟代理）

In [70]:
class LazyProperty:
    def __init__(self, method):
        self.method = method
        self.method_name = method.__name__
        # print(f'function overriden: {self.fget})
        # print(f'function name: {self.func_name}')

    def __get__(self, obj, cls):
        if not obj:
            return None
        value = self.method(obj)
        print(f'value: {value}')
        setattr(obj, self.method_name, value)
        return value

class Test:
    def __init__(self):
        self.x = 'foo'
        self.y = 'bar'
        self._resource = None

    @LazyProperty
    def resource(self):
        print(f'initializing self._resource which is: {self._resource}')
        self._resource = tuple(range(5)) # 开销大
        return self._resource

def main():
    t = Test()
    print(t.x)
    print(t.y)
    # 更多工作......
    print(t.resource)
    print(t.resource)

main()

foo
bar
initializing self._resource which is: None
value: (0, 1, 2, 3, 4)
(0, 1, 2, 3, 4)
(0, 1, 2, 3, 4)


### 8.3.3 实现

In [4]:
class SensitiveInfo:
    def __init__(self):
        self.users = ['nick', 'tom', 'ben', 'mike']

    def read(self):
        nb = len(self.users)
        print(f'There are {nb} users: {", ".join(self.users)}')

    def add(self, user):
        self.users.append(user)
        print(f'Added user {user}')

class Info:
    '''SensitiveInfo的保护代理'''
    def __init__(self):
        self.protected = SensitiveInfo()
        self.secret = '0xdeadbeef'

    def read(self):
        self.protected.read()

    def add(self, user):
        sec = input('What is the secret? ')
        self.protected.add(user) if sec == self.secret else print("That's wrong!")# 核心代码 代理：控制对敏感信息的访问

def main():
    info = Info()
    while True:
        print('1. read list |==| 2. add user |==| 3. quit')
        key = input('choose option: ')
        if key == '1':
            info.read()
        elif key == '2':
            name = input('choose username: ')
            info.add(name)
        elif key == '3':
            exit()

main()

1. read list |==| 2. add user |==| 3. quit
Added user 0xdeadbeef
1. read list |==| 2. add user |==| 3. quit
There are 5 users: nick, tom, ben, mike, 0xdeadbeef
1. read list |==| 2. add user |==| 3. quit
1. read list |==| 2. add user |==| 3. quit
1. read list |==| 2. add user |==| 3. quit
1. read list |==| 2. add user |==| 3. quit
1. read list |==| 2. add user |==| 3. quit


: 

# 9 职责链模式

In [4]:
class Event:
    def __init__(self, name):
        self.name = name

    def __str__(self) -> str:
        return self.name

class Widget:
    def __init__(self, parent=None):
        self.parent = parent

    def handle(self, event):
        handler = f'handle_{event}'
        if hasattr(self, handler):
            method = getattr(self, handler)
            method(event)
        elif self.parent is not None:# 核心代码 责任链：传递事件给父类。与抽象工厂区别：责任链是按事件类型传递事件，抽象工厂是按固定顺序创建对象
            self.parent.handle(event)
        elif hasattr(self, 'handle_default'):
            self.handle_default(event)

class MainWindow(Widget):
    def handle_close(self, event):
        # 关闭事件
        print(f'MainWindow: {event}')

    def handle_default(self, event):
        # 默认事件
        print(f'MainWindow Default: {event}')

class SendDialog(Widget):
    def handle_paint(self, event):
        # 绘图事件
        print(f'SendDialog: {event}')

class MsgText(Widget):
    def handle_down(self, event):
        # 键盘事件
        print(f'MsgText: {event}')

def main():
    mw = MainWindow()
    sd = SendDialog(mw)
    msg = MsgText(sd)# 注意msg对象的父对象是SendDialog

    for e in ('down', 'paint', 'unhandled', 'close'):
        evt = Event(e)
        print(f'\nSending event -{evt}- to MainWindow')
        mw.handle(evt)
        print(f'Sending event -{evt}- to SendDialog')
        sd.handle(evt)
        print(f'Sending event -{evt}- to MsgText')
        msg.handle(evt)

main()


Sending event -down- to MainWindow
MainWindow Default: down
Sending event -down- to SendDialog
MainWindow Default: down
Sending event -down- to MsgText
MsgText: down

Sending event -paint- to MainWindow
3
MainWindow Default: paint
Sending event -paint- to SendDialog
1
SendDialog: paint
Sending event -paint- to MsgText
2
self.parent:<__main__.SendDialog object at 0x7af594e6f2b0>
1
SendDialog: paint

Sending event -unhandled- to MainWindow
MainWindow Default: unhandled
Sending event -unhandled- to SendDialog
MainWindow Default: unhandled
Sending event -unhandled- to MsgText
MainWindow Default: unhandled

Sending event -close- to MainWindow
MainWindow: close
Sending event -close- to SendDialog
MainWindow: close
Sending event -close- to MsgText
MainWindow: close


# 10 命令模式

## 10.2 用例

- GUI 按钮和菜单项：
  - PyQt 使用命令模式来实现按钮和菜单项上的操作。
- 其他操作：
  - 除了撤销之外，还可以使用命令来实现任何操作。例如，剪切、复制、粘贴、恢复和大写文本。
- 事务行为和日志：
  - 事务行为和日志对于保存所有更改操作的持久日志非常重要。
    - 操作系统使用它们从系统崩溃中恢复
    - 关系数据库使用它们来实现事务
    - 文件系统使用它们实现快照
    - 安装程序（向导）使用它们恢复已取消的安装。
- 宏： 
  - 这里，我们所说的宏是指可以随时按需记录和执行的一系列操作。流行的编辑器（如Emacs 和 Vim）都支持宏。

In [1]:
import os
verbose = True

class RenameFile:
    def __init__(self, src, dest):
        self.src = src
        self.dest = dest

    def execute(self):
        if verbose:
            print(f'renaming {self.src} to {self.dest}')
        os.rename(self.src, self.dest)

    def undo(self):
        if verbose:
            print(f'renaming {self.dest} to {self.src}')
        os.rename(self.dest, self.src)

def delete_file(path):
    if verbose:
        print(f'deleting file {path}')
    os.remove(path)

class CreateFile:
    def __init__(self, path, txt='hello world\n'):
        self.path = path
        self.txt = txt

    def execute(self):
        if verbose:
            print(f'creating file {self.path}')
        with open(self.path, mode='w', encoding='utf-8') as out_file:
            out_file.write(self.txt)

    def undo(self):
        delete_file(self.path)

class ReadFile:
    def __init__(self, path):
        self.path = path

    def execute(self):
        if verbose:
            print(f'reading file {self.path}')
        with open(self.path, mode='r', encoding='utf-8') as in_file:
            print(in_file.read(), end='')

def main():
    orig_name, new_name = 'file1', 'file2'

    commands = (
        CreateFile(orig_name),# 核心代码 命令：将操作过程封装成对象
        ReadFile(orig_name),
        RenameFile(orig_name, new_name)
    )

    [c.execute() for c in commands]

    answer = input('reverse the executed commands? [y/n] ')
    if answer not in 'yY':
        print(f'the result is {new_name}')
        exit()
    else:
        for c in reversed(commands):
            try:
                c.undo()
            except AttributeError as e:
                print('Error:', str(e))

main()

creating file file1
reading file file1
hello world
renaming file1 to file2


renaming file2 to file1
Error: 'ReadFile' object has no attribute 'undo'
deleting file file1


# 11 观察者模式

单个对象（发布者，也称为**主题**或**可观察对象**）和一个或多个对象（订阅者，也称为**观察者**）之间的发布订阅关系。
对于 MVC，发布者是模型，订阅者是视图。
观察者背后的思想与关注点分离原则背后的思想是相同的，即增加发布者和订阅者之间的解耦，并使在运行时添加/删除订阅者变得容易。

## 11.2 用例

新闻流、事件驱动系统

## 11.3 实现

In [10]:
class Publisher:
    def __init__(self):
        self.observers = []

    def add(self, observer):
        if observer not in self.observers:
            self.observers.append(observer)
        else:
            print(f'failed to add: {observer}')

    def remove(self, observer):
        try:
            self.observers.remove(observer)
        except ValueError as e:
            print(f'failed to remove: {observer}')

    def notify(self):
        [o.notify(self) for o in self.observers]

class DefaultFormatter(Publisher):
    def __init__(self, name):
        super().__init__()
        self.name = name
        self._data = 0

    def __str__(self):
        return f'{type(self).__name__}: "{self.name}" has data = {self._data}'

    @property
    def data(self):
        return self._data

    @data.setter
    def data(self, new_value):
        try:
            self._data = int(new_value)
        except ValueError as e:
            print('Error:', str(e))
        else:
            self.notify()# 核心代码 观察者：在数据变化时通知观察者

class HexFormatter:
    def notify(self, publisher):
        print(f'{type(self).__name__}: {publisher.name} has now hex data = {hex(publisher.data)}')

class BinaryFormatter:
    def notify(self, publisher):
        print(f'{type(self).__name__}: {publisher.name} has now bin data = {bin(publisher.data)}')

def main():
    df = DefaultFormatter('test1')
    print(df)

    print()
    hf = HexFormatter()
    df.add(hf)
    df.data = 3
    print(df)

    print()
    bf = BinaryFormatter()
    df.add(bf)
    df.data = 21
    print(df)

    print()
    df.remove(hf)
    df.data = 40
    print(df)

    print()
    df.remove(hf)
    df.add(bf)

    print()
    df.data = 'hello'
    print(df)

    print()
    df.data = 15.8
    print(df)

main()

DefaultFormatter: "test1" has data = 0

HexFormatter: test1 has now hex data = 0x3
DefaultFormatter: "test1" has data = 3

HexFormatter: test1 has now hex data = 0x15
BinaryFormatter: test1 has now bin data = 0b10101
DefaultFormatter: "test1" has data = 21

BinaryFormatter: test1 has now bin data = 0b101000
DefaultFormatter: "test1" has data = 40

failed to remove: <__main__.HexFormatter object at 0x73611c3ee0e0>
failed to add: <__main__.BinaryFormatter object at 0x73611c3ee020>

Error: invalid literal for int() with base 10: 'hello'
DefaultFormatter: "test1" has data = 40

BinaryFormatter: test1 has now bin data = 0b1111
DefaultFormatter: "test1" has data = 15


In [8]:
# try后的else是try成功后执行的代码，不是try失败后执行的代码
try:
    a = 1 / 0
except ZeroDivisionError as e:
    print('Error:', str(e))
else:# 如果没有错误发生
    print('No error')
finally:
    print('Finally')
print('------------------')

try:
    a = 1 / 1
except ZeroDivisionError as e:
    print('Error:', str(e))
else:
    print('No error')
finally:
    print('Finally')

Error: division by zero
Finally
------------------
No error
Finally


# 12 状态模式

## 12.2 用例

Thomas Jaeger ：
“状态设计模式能够在上下文中对无限数量的状态进行完全封装，以提高可维护性和灵活性。”

## 12.3 实现

In [11]:
try:
    import state_machine
except:
    !pip install state-machine

In [32]:
from state_machine import State, Event, acts_as_state_machine, after, before, InvalidStateTransition

@acts_as_state_machine # 使用 state_machine 模块创建状态机的第一步
class Process: # 每个创建的进程都有自己的状态机。
    # 定义状态。在状态图中看到的一对一映射。
    created = State(initial=True)# 初始状态
    waiting = State()
    running = State()
    terminated = State()
    blocked = State()
    swapped_out_waiting = State()
    swapped_out_blocked = State()
    
    # 定义转换。在 state_machine 模块中，转换是 Event 类的一个实例
    wait = Event(from_states=(created, 
                              running, 
                              blocked, 
                              swapped_out_waiting), 
                 to_state=waiting)
    run = Event(from_states=waiting, # 核心代码 状态机：定义状态和事件
                to_state=running)
    terminate = Event(from_states=running, 
                      to_state=terminated)
    block = Event(from_states=(running,
                                swapped_out_waiting),
                    to_state=blocked)
    swap_wait = Event(from_states=waiting, 
                      to_state=swapped_out_waiting)
    swap_block = Event(from_states=blocked, 
                       to_state=swapped_out_blocked)
    
    #  定义初始化方法
    def __init__(self, name):
        self.name = name
        
    # 定义提供状态的方法
    @after('wait')
    def wait_info(self):
        print(f'{self.name} entered waiting mode')
        
    @after('run')
    def run_info(self):
        print(f'{self.name} is running')
        
    @before('terminate')
    def terminate_info(self):
        print(f'{self.name} terminated')
        
    @after('block')
    def block_info(self):
        print(f'{self.name} is blocked')
        
    @after('swap_wait')
    def swap_wait_info(self):
        print(f'{self.name} is swapped out and waiting')
        
    @after('swap_block')
    def swap_block_info(self):
        print(f'{self.name} is swapped out and blocked')
        
def transition(process, event, event_name):
    '''如果在尝试执行事件时出错，则输出事件的名称'''
    try:
        event()
    except InvalidStateTransition as err:
        print(f'Error: transition of {process.name} from {process.current_state} to {event_name} failed')
        
def state_info(process):
    '''显示进程当前（激活）状态的一些基本信息'''
    print(f'state of {process.name}: {process.current_state}')
    
def main():
    RUNNING = 'running'
    WAITING = 'waiting'
    BLOCKED = 'blocked'
    TERMINATED = 'terminated'
    
    p1, p2 = Process('process1'), Process('process2')
    [state_info(p) for p in (p1, p2)]
    
    print()
    transition(p1, p1.wait, WAITING)
    transition(p2, p2.terminate, TERMINATED)
    [state_info(p) for p in (p1, p2)]
    print()
    transition(p1, p1.run, RUNNING)
    transition(p2, p2.wait, WAITING)
    [state_info(p) for p in (p1, p2)]
    print()
    [transition(p, p.block, BLOCKED) for p in (p1, p2)]
    [state_info(p) for p in (p1, p2)]
    print()
    [transition(p, p.terminate, TERMINATED) for p in (p1, p2)]
    [state_info(p) for p in (p1, p2)]
    
main()

state of process1: created
state of process2: created

process1 entered waiting mode
Error: transition of process2 from created to terminated failed
state of process1: waiting
state of process2: created

process1 is running
process2 entered waiting mode
state of process1: running
state of process2: waiting

process1 is blocked
Error: transition of process2 from waiting to blocked failed
state of process1: blocked
state of process2: waiting

Error: transition of process1 from blocked to terminated failed
Error: transition of process2 from waiting to terminated failed
state of process1: blocked
state of process2: waiting


# 13 其他行为型模式

## 13.1 解释器模式