# Adavanced Design Patterns

### Loading Libraries

In [18]:
# Math
import math
from math import hypot, factorial

# Numerical Computing
import numpy as np

# Data Manipulation
import pandas as pd

# Data Visualization
import seaborn
import matplotlib.pyplot as plt

#
from pprint import pprint

# OS
import io
import re
import sys
import abc
import csv
import time
import gzip
import queue
import heapq
import socket
import string
import random
import bisect
import operator
import datetime
import contextlib
import subprocess
from decimal import Decimal
from abc import ABC, abstractmethod

# Types & Annotations
import collections
from __future__ import annotations
from collections import defaultdict, Counter
from collections.abc import Container, Mapping, Hashable
from typing import TYPE_CHECKING
from typing import Pattern, Match
from typing import Hashable, Mapping, TypeVar, Any, overload, Union, Sequence, Dict, Deque, TextIO, Callable
from typing import List, Protocol, NoReturn, Union, Set, Tuple, Optional, Iterable, Iterator, cast, NamedTuple
# from typing import 

# Functional Tools
from functools import wraps, total_ordering, lru_cache

# Files & Path
import logging
import zipfile
import fnmatch
from pathlib import Path
from urllib.request import urlopen
from urllib.parse import urlparse

# Dataclass
from dataclasses import dataclass, field

# Enum
from enum import Enum, auto

## The Adapter Pattern

### An Adapter Example

In [2]:
class TimeSince:
    """Expects time as six digits, no punctuation."""

    def parse_time(self, time: str) -> tuple[float, float, float]:
        return (
            float(time[0:2]),
            float(time[2:4]),
            float(time[4:]),
        )

    def __init__(self, starting_time: str) -> None:
        self.hr, self.min, self.sec = self.parse_time(starting_time)
        self.start_seconds = ((self.hr * 60) + self.min) * 60 + self.sec

    def interval(self, log_time: str) -> float:
        log_hr, log_min, log_sec = self.parse_time(log_time)
        log_seconds = ((log_hr * 60) + log_min) * 60 + log_sec
        return log_seconds - self.start_seconds

In [3]:
ts = TimeSince("000123")

In [4]:
ts.interval("020304")

7301.0

In [5]:
ts.interval("030405")

10962.0

In [6]:
data = [
    ("000123", "INFO", "Gila Flats 1959-08-20"),
    ("000142", "INFO", "test block 15"),
    ("004201", "ERROR", "intrinsic field chamber door locked"),
    ("004210.11", "INFO", "generator power active"),
    ("004232.33", "WARNING", "extra mass detected")
]

In [7]:
class LogProcessor:
    def __init__(self, log_entries: list[tuple[str, str, str]]) -> None:
        self.log_entries = log_entries
        self.time_convert = IntervalAdapter()

    def report(self) -> None:
        first_time, first_sev, first_msg = self.log_entries[0]
        for log_time, severity, message in self.log_entries:
            if severity == "ERROR":
                first_time = log_time
            interval = self.time_convert.time_offset(first_time, log_time)
            print(f"{interval:8.2f} | {severity:7s} {message}")

In [8]:
class IntervalAdapter:
    def __init__(self) -> None:
        self.ts: Optional[TimeSince] = None

    def time_offset(self, start: str, now: str) -> float:
        if self.ts is None:
            self.ts = TimeSince(start)
        else:
            h_m_s = self.ts.parse_time(start)
            if h_m_s != (self.ts.hr, self.ts.min, self.ts.sec):
                self.ts = TimeSince(start)
        return self.ts.interval(now)

In [9]:
class LogProcessor:
    def __init__(self, log_entries: list[tuple[str, str, str]]) -> None:
        self.log_entries = log_entries
        self.time_convert = IntervalAdapter()

    def report(self) -> None:
        first_time, first_sev, first_msg = self.log_entries[0]
        for log_time, severity, message in self.log_entries:
            if severity == "ERROR":
                first_time = log_time
            interval = self.time_convert.time_offset(first_time, log_time)
            print(f"{interval:8.2f} | {severity:7s} {message}")

## The Façade Pattern

### A Façade Example

In [10]:
import subprocess

In [11]:
class FindUML:
    def __init__(self, base: Path) -> None:
        self.base = base
        self.start_pattern = re.compile(r"@startuml *(.*)")

    def uml_file_iter(self) -> Iterator[tuple[Path, Path]]:
        for source in self.base.glob("**/*.uml"):
            if any(n.startswith(".") for n in source.parts):
                continue
            body = source.read_text()
            for output_name in self.start_pattern.findall(body):
                if output_name:
                    target = source.parent / output_name
                else:
                    target = source.with_suffix(".png")
                yield (source.relative_to(self.base), target.relative_to(self.base))

In [12]:
class PlantUML:
    conda_env_name = "CaseStudy"
    base_env = Path.home() / "miniconda3" / "envs" / conda_env_name

    def __init__(
        self,
        graphviz: Path = Path("bin") / "dot",
        plantjar: Path = Path("share") / "plantuml.jar",
    ) -> None:
        self.graphviz = self.base_env / graphviz
        self.plantjar = self.base_env / plantjar

    def process(self, source: Path) -> None:
        env = {
            "GRAPHVIZ_DOT": str(self.graphviz),
        }
        command = ["java", "-jar", str(self.plantjar), "-progress", str(source)]
        subprocess.run(command, env=env, check=True)
        print()

In [13]:
class GenerateImages:
    def __init__(self, base: Path, verbose: int = 0) -> None:
        self.finder = FindUML(base)
        self.painter = PlantUML()
        self.verbose = verbose

    def make_all_images(self) -> None:
        for source, target in self.finder.uml_file_iter():
            if not target.exists() or source.stat().st_mtime > target.stat().st_mtime:
                print(f"Processing {source} -> {target}")
                self.painter.process(source)
            else:
                if self.verbose > 0:
                    print(f"Skipping {source} -> {target}")


if __name__ == "__main__":
    g = GenerateImages(Path.cwd())
    g.make_all_images()

## The Flyweight Pattern

### A Flyweight Example

In [14]:
class Buffer(Sequence[int]):
    def __init__(self, content: bytes) -> None:
        self.content = content

    def __len__(self) -> int:
        return len(self.content)

    def __iter__(self) -> Iterator[int]:
        return iter(self.content)

    @overload
    def __getitem__(self, index: int) -> int:
        ...

    @overload
    def __getitem__(self, index: slice) -> bytes:
        ...

    def __getitem__(self, index: Union[int, slice]) -> Union[int, bytes]:
        return self.content[index]

In [15]:
class Message(abc.ABC):
    def __init__(self) -> None:
        self.buffer: weakref.ReferenceType[Buffer]
        self.offset: int
        self.end: Optional[int]
        self.commas: list[int]

    def from_buffer(self, buffer: Buffer, offset: int) -> "Message":
        self.buffer = weakref.ref(buffer)
        self.offset = offset
        self.commas = [offset]
        self.end = None
        for index in range(offset, offset + 82):
            if buffer[index] == ord(b","):
                self.commas.append(index)
            elif buffer[index] == ord(b"*"):
                self.commas.append(index)
                self.end = index + 3
                break
        if self.end is None:
            raise GPSError("Incomplete")
        # TODO: confirm checksum.
        return self

    def __getitem__(self, field: int) -> bytes:
        if not hasattr(self, "buffer") or (buffer := self.buffer()) is None:
            raise RuntimeError("Broken reference")
        start, end = self.commas[field] + 1, self.commas[field + 1]
        return buffer[start:end]

    def get_fix(self) -> Point:
        return Point.from_bytes(
            self.latitude(), self.lat_n_s(), self.longitude(), self.lon_e_w()
        )

    @abc.abstractmethod
    def latitude(self) -> bytes:
        ...

    @abc.abstractmethod
    def lat_n_s(self) -> bytes:
        ...

    @abc.abstractmethod
    def longitude(self) -> bytes:
        ...

    @abc.abstractmethod
    def lon_e_w(self) -> bytes:
        ...

In [16]:
class GPGLL(Message):
    def latitude(self) -> bytes:
        return self[1]

    def lat_n_s(self) -> bytes:
        return self[2]

    def longitude(self) -> bytes:
        return self[3]

    def lon_e_w(self) -> bytes:
        return self[4]

In [17]:
def message_factory(header: bytes) -> Optional[Message]:
    # TODO: Add functools.lru_cache to save storage and time
    if header == b"GPGGA":
        return GPGGA()
    elif header == b"GPGLL":
        return GPGLL()
    elif header == b"GPRMC":
        return GPRMC()
    else:
        return None

## The Abstract Factory Pattern

### An Abstract Factory Example

In [19]:
class Suit(str, Enum):
    Clubs = "\N{Black Club Suit}"
    Diamonds = "\N{Black Diamond Suit}"
    Hearts = "\N{Black Heart Suit}"
    Spades = "\N{Black Spade Suit}"

In [21]:
class Card(NamedTuple):
    rank: int
    suit: Suit

    def __str__(self) -> str:
        return f"{self.rank}{self.suit}"

In [22]:
class Trick(int, Enum):
    pass

In [23]:
class Hand(List[Card]):
    def __init__(self, *cards: Card) -> None:
        super().__init__(cards)

    def scoring(self) -> list[Trick]:
        pass

In [24]:
class CardGameFactory(abc.ABC):
    @abc.abstractmethod
    def make_card(self, rank: int, suit: Suit) -> "Card":
        ...

    @abc.abstractmethod
    def make_hand(self, *cards: Card) -> "Hand":
        ...

In [25]:
class CribbageCard(Card):
    @property
    def points(self) -> int:
        return self.rank


class CribbageAce(Card):
    @property
    def points(self) -> int:
        return 1


class CribbageFace(Card):
    @property
    def points(self) -> int:
        return 10

In [26]:
class CribbageHand(Hand):
    starter: Card

    def upcard(self, starter: Card) -> "Hand":
        self.starter = starter
        return self

    def scoring(self) -> list[Trick]:
        """15's. Pairs. Runs. Right Jack."""

        def trick_iter(cards: list[CribbageCard]) -> Iterator[Trick]:
            for subset in powerset(cards):
                if sum(c.points for c in subset) == 15:
                    yield CribbageTrick.Fifteen
            for c1, c2 in itertools.combinations(cards, 2):
                if c1.rank == c2.rank:
                    yield CribbageTrick.Pair

        def run_length(sorted_cards: list[CribbageCard]) -> int:
            card_iter = iter(sorted_cards)
            base = next(card_iter)
            for offset, card in enumerate(card_iter, start=1):
                if base.rank + offset != card.rank:
                    break
            return offset + 1

        hand_plus_starter = cast(List[CribbageCard], self + [self.starter])
        hand_plus_starter.sort()
        tricks = list(trick_iter(hand_plus_starter))
        if run_length(hand_plus_starter) == 5:
            tricks += [CribbageTrick.Run_5]
        elif (
            run_length(hand_plus_starter) == 4 or run_length(hand_plus_starter[1:]) == 4
        ):
            tricks += [CribbageTrick.Run_4]
        elif (
            run_length(hand_plus_starter) == 3
            or run_length(hand_plus_starter[1:]) == 3
            or run_length(hand_plus_starter[2:]) == 3
        ):
            tricks += [CribbageTrick.Run_3]
        right_jack = any(c.rank == 11 and c.suit == self.starter.suit for c in self)
        if right_jack:
            tricks += [CribbageTrick.Right_Jack]
        return tricks

In [27]:
class CribbageFactory(CardGameFactory):
    def make_card(self, rank: int, suit: Suit) -> "Card":
        if rank == 1:
            return CribbageAce(rank, suit)
        elif 2 <= rank < 11:
            return CribbageCard(rank, suit)
        else:
            return CribbageFace(rank, suit)

    def make_hand(self, *cards: Card) -> "Hand":
        return CribbageHand(*cards)

In [28]:
factory = CribbageFactory()

In [29]:
cards = [
    factory.make_card(6, Suit.Clubs),
    factory.make_card(7, Suit.Diamonds),
    factory.make_card(8, Suit.Hearts),
    factory.make_card(9, Suit.Spades),
]

In [30]:
starter = factory.make_card(5, Suit.Spades)

In [31]:
hand = factory.make_hand(*cards)

### Abstract Factories in Python

In [34]:
class CardGameFactoryProtocol(Protocol):
    def make_card(self, rank: int, suit: Suit) -> "Card":
        ...

    def make_hand(self, *cards: Card) -> "Hand":
        ...

## The Composite Pattern

### A Composite Example

In [37]:
class Node(abc.ABC):
    def __init__(
        self,
        name: str,
    ) -> None:
        self.name = name
        self.parent: Optional["Folder"] = None

    def move(self, new_place: "Folder") -> None:
        previous = self.parent
        new_place.add_child(self)
        if previous:
            del previous.children[self.name]

    @abc.abstractmethod
    def copy(self, new_folder: "Folder") -> None:
        ...

    @abc.abstractmethod
    def remove(self) -> None:
        ...

    @abc.abstractmethod
    def tree(self, indent: int = 0, last: bool = False, outer: bool = False) -> None:
        ...

    @abc.abstractmethod
    def dot(self) -> None:
        ...

In [38]:
class Folder(Node):
    def __init__(self, name: str, children: Optional[dict[str, "Node"]] = None) -> None:
        super().__init__(name)
        self.children = children or {}

    def __repr__(self) -> str:
        return f"Folder({self.name!r}, {self.children!r})"

    def add_child(self, node: "Node") -> "Node":
        node.parent = self
        return self.children.setdefault(node.name, node)

    def copy(self, new_folder: "Folder") -> None:
        target = cast(Folder, new_folder.add_child(Folder(self.name)))
        for c in self.children:
            self.children[c].copy(target)

    def remove(self) -> None:
        names = list(self.children)
        for c in names:
            self.children[c].remove()
        if self.parent:
            del self.parent.children[self.name]

    def tree(self, indent: int = 0, last: bool = False, outer: bool = False) -> None:
        indent_text = "     " if outer else " |   "
        print((indent * indent_text) + " +--", self.name)
        if self.children:
            *first, final = list(self.children)
            for c in first:
                self.children[c].tree(indent + 1, last=False, outer=outer)
            self.children[final].tree(indent + 1, last=True, outer=outer)

    def dot(self) -> None:
        for c in self.children:
            print(f"    n{id(self)} -> n{id(self.children[c])};")
            self.children[c].dot()
        print(f'    n{id(self)} [label = "{self.name}"];')

In [39]:
class File(Node):
    def __repr__(self) -> str:
        return f"File({self.name!r})"

    def copy(self, new_folder: "Folder") -> None:
        new_folder.add_child(File(self.name))

    def remove(self) -> None:
        if self.parent:
            del self.parent.children[self.name]

    def tree(self, indent: int = 0, last: bool = False, outer: bool = False) -> None:
        indent_text = "     " if outer else " |   "
        print((indent * indent_text) + " +--", self.name)

    def dot(self) -> None:
        print(f'    n{id(self)} [shape=box,label="{self.name}"];')