In [1]:
# default_exp fontlearner
%load_ext autoreload
%autoreload 2

# Font Learner

> Diffvg-based learner for font optimisation

In [26]:
#export
from aifont.core import *
from fastai.data.all import *
from fastai.vision.all import *
import ffmpeg
from nbdev.showdoc import *
import PIL
import pydiffvg
from typing import Callable, List, Protocol, Tuple, Union

## Constants and Utilities

In [4]:
#export
DIFFVG_BLACK = tensor(0., 0., 0., 1.)
EMPTY_TENSOR = tensor([])
tensor(0.)  = tensor(0.)

def get_vocab(dls_or_learn: Union[DataLoaders, Learner]) -> List[str]:
    """Utility for getting the vocab from a Learner or DataLoaders."""
    if isinstance(dls_or_learn, Learner): dls_or_learn = dls_or_learn.dls
    vocab = dls_or_learn.vocab
    if type(vocab) == L and type(vocab[0]) == list: 
        if vocab[0] != vocab[1]: warn("The two vocabs in dls do not match! Using the first one.")
        vocab = vocab[0]
    return vocab

class DebugCB(Callback):
    """A `Callback` for debugging a `FontParamLayer`."""
    d_vals = []
    d_grads = []
    w_vals = []
    w_grads = []
    def before_fit(self):
        # self.model[0].distance_params.retain_grad()
        pass
    def before_step(self):
        m = self.model[0]
        self.d_vals.append(m.distance_params.clone())
        self.d_grads.append(m.distance_params.grad.clone())
        self.w_vals.append(m.width_params.clone())
        self.w_grads.append(m.width_params.grad.clone())
    def plot(self) -> None:
        # val = self.model[0].distance_params[0].item()
        # grad_df.iloc[(grad_df.Vals - val).abs().argmin()]
        num_d = self.d_vals[0].size(0)
        num_w = self.w_vals[0].size(0)
        def _items(tensor_list, idx): return [x[idx].item() for x in tensor_list]
        plt.figure(figsize=(10,10))
        for i in range(num_d): plt.scatter(_items(self.d_vals, i), _items(self.d_grads, i), label=f"Distance param {i}")
        for i in range(num_w): plt.scatter(_items(self.w_vals, i), _items(self.w_grads, i), label=f"Width param {i}")
        plt.legend()

## Rendering

In [14]:
#export
class Scene:
    """Just a utility to hold the different scene arguments together."""
    last_seed: int = None
    def __init__(self, shapes: list[any], shape_groups: list[pydiffvg.ShapeGroup], 
        canvas_width = 256, canvas_height = 256, samples = 2):
        assert shapes is not None and len(shapes) != 0
        assert shape_groups is not None and len(shape_groups) != 0
        store_attr()

    def get_scene_args(self) -> list:
        """Get the serialize scene for passing to `pydiffvg.RenderFunction`."""
        return pydiffvg.RenderFunction.serialize_scene(self.canvas_width, 
                                                       self.canvas_height, 
                                                       self.shapes, 
                                                       self.shape_groups)

    def render(self, seed: int = None, do_render_grad = False) -> Tensor:
        """Render the scene using pydffiv `RenderFunction` or its
           gradient if `do_render_grad`."""
        if seed is None: seed = random.randint(0, 1e6)
        self.last_seed = seed
        scene_args = self.get_scene_args()
        w = self.canvas_width
        h = self.canvas_height
        s = self.samples
        args = [w, h, s, s, seed, None] + scene_args
        rf = pydiffvg.RenderFunction
        return rf.render_grad(torch.ones(w, h, 4, device=pydiffvg.get_device()), *args) \
               if do_render_grad else rf.apply(*args)

    def render_grad(self, seed: int = None) -> Tensor:
        """Render the gradient as raster."""
        return self.render(seed=self.last_seed, do_render_grad=True)
    
add_docs(Scene)

In [27]:
#export
class ImageSaver:
    """Create a callback to pass to `VectorRenderLayer` as `rendered_callback`
       to save rendered images and optionally grads."""
    canvas_height: int = None
    canvas_width: int = None
    iter_files: List[str] = []
    grad_files: List[str] = []
    def __init__(self, folder: str, save_grad = False, iter_name = "iter", 
                 grad_name = "grad"):
        assert folder is not None
        if folder.endswith("/"):
            folder = folder[:-1]
        store_attr()

    def __call__(self, raster: Tensor, batch_i: int, item_i: int, scene: Scene,
                 normalize = False, gamma = 2.2) -> None:
        suffix = f"_{batch_i}_{item_i}"
        self.iter_files.append(self.save_image(raster, f"{self.iter_name}{suffix}", normalize=normalize, gamma=gamma))
        if self.save_grad: 
            grad = scene.render_grad()
            self.grad_files.append(self.save_image(grad, f"{self.grad_name}{suffix}"))

    def save_image(self, raster: Tensor, filename: str, normalize = False, gamma = 2.2) -> Str:
        """Save the `raster` tensor as image file and return filename used."""
        if not self.canvas_height:
            self.canvas_height = raster.size(0)
            self.canvas_width  = raster.size(1)
        fn = f"{self.folder}/{filename}.png"
        pydiffvg.imwrite(raster.cpu(), fn, normalize=normalize, gamma=gamma)
        return fn
    
    def render_result_video(self, delete_imgs = False, frame_rate=24, grad=False) -> None:
        """Render intermediate images as a video."""
        files = self.grad_files if grad else self.iter_files
        out = os.path.join(self.folder, "grads.mp4" if grad else "iters.mp4")
        frames = ffmpeg.input('pipe:', r=str(frame_rate))
        process = ffmpeg.input(f"color=c=white:s={self.canvas_width}x{self.canvas_height}", f="lavfi") \
                        .overlay(frames, eof_action="endall") \
                        .output(out) \
                        .overwrite_output() \
                        .run_async(pipe_stdin=True, quiet=True)
        for in_file in files:
            with open(in_file, 'rb') as f: process.stdin.write(f.read())
        # Close stdin pipe - FFmpeg fininsh encoding the output file.
        process.stdin.close()
        process.wait()
        if delete_imgs: 
            for f in files: os.remove(f)
        print("Rendering video done!")

add_docs(ImageSaver)

## DataLoader

Custom DataLoader for getting letter classes.

In [16]:
#export
class LetterDL(DataLoader):
    """A dummy data loader for use with font vector optimisation.
       Pass the same `vocab` as in the OCR model. Batch size defaults
       to `epoch_len`."""
    current_i = 0
    def __init__(self, vocab: CategoryMap, letters: Tuple[str, ...] = ("A",), 
                 epoch_len = 10, bs = 1, **kwargs):
        assert vocab is not None
        super(LetterDL, self).__init__(bs=bs, n = epoch_len * bs, **kwargs)
        self.categorizer = Categorize(vocab=vocab)
        store_attr()

    def create_item(self, s) -> Tuple[TensorCategory, TensorCategory]:
        """Return the CategoryTensor for a random letter from `letters`."""
        if self.current_i == self.n:
            self.current_i = 0
            stop()
        self.current_i += 1
        r = self.categorizer.encodes(random.choice(self.letters))
        # Return x and y as a copy of x
        return r, r.clone()

add_docs(LetterDL)

## Vector Model

The vector model consists of a `FontParamLayer`, which holds the parameters to optimise, and a subclass of `VectorRenderLayer`, which handles the creation of the letter vectors.

> Note that the utility of this bisection is tentative, and the params might as well be contained within the `VectorRenderLayer`.

### Font Parameters

In [17]:
#export
class FontParamLayer(Module):
    """Creates `n_distance_params` params with tanh activation and 
       `n_width_params` with sigmoid activation and concats these 
       with inputs. Thus, the output is a tensor of size
       `(bs, n_distance_params + n_width_params + 1)` and the param
       ranges are [-1, 1] for `distance_params` and [0, 1] for
       `width_params`, which should be mapped to to min and max
       stroke widths by the next layer. `init_range`
       defines the range of the parameter value space when
       initialised at random by `reset_parameters` as a fraction
       of the total range and centered around the middle.
       NB. The separation of the parameters into there two types
       may not be very reasonable. It's copied from `diffvg` with
       little regard to any underlying motivation except that the
       distance values are treated as offsets from the canvas 
       center."""
    sigmoid = torch.nn.Sigmoid()
    tanh = torch.nn.Tanh()
    weight = True # We need to include this for fastai to recognize this layer as trainable

    def __init__(self, n_distance_params=1, n_width_params=1, seed=None, init_range=.5):
        if seed is not None: torch.random.manual_seed(seed)
        self.distance_params = torch.nn.Parameter(torch.empty(n_distance_params)) if n_distance_params else EMPTY_TENSOR
        self.width_params =    torch.nn.Parameter(torch.empty(n_width_params)) if n_width_params else EMPTY_TENSOR
        n_distance_params =    n_distance_params or 0
        n_width_params =       n_width_params or 0
        store_attr()
        self.reset_parameters()

    def __repr__(self):
        return "\n".join([
            "FontParamLayer",
            "- distance params:",
            *[f"  {x.item()} ({self.tanh(x).item()})" for x in self.distance_params],
            "- width params:",
            *[f"  {x.item()} ({self.sigmoid(x).item()})" for x in self.width_params],
            ])
               
    def reset_parameters(self):
        """Randomly init the parameters around the middle of possible values."""
        d = self.init_range * 2 # We approximate the domain as [-2, 2]
        if self.n_distance_params: self.distance_params.data.uniform_(-d, d)
        if self.n_width_params:    self.width_params.data.uniform_(-d, d)

    def forward(self, x):
        """Convert the input x to size (bs, 1) if it's one-dimensional and
           concat the params before each batch."""
        if x.ndim == 1: x = x.unsqueeze(1)
        elif x.ndim != 2: raise ValueError("Input can only be 1- or 2-dimensional.")
        params = torch.cat((self.tanh(self.distance_params), self.sigmoid(self.width_params)))
        params = params.expand(x.size(0), -1)
        return torch.column_stack((params, x))

add_docs(FontParamLayer)

### Vector Rendering Layer Base

In [19]:
#export

class Normaliser(Protocol):
    """For normalising rasters for the OCR."""
    mean: float
    std: float

class VectorRenderLayerBase(Module):
    """Base for vector render layers. Get's input from a FontParamLayer
       and returns the diffvg rendering. Override `create_scenes` in
       subclasses and save the results in `self.scenes` of which there
       should be `bs`. `forward` calls `render` which renders the ch
       scenes and permutes to match the OCR model. Note that the workflow
       is based on greyscale images and we're only using the alpha value
       of the diffvg render output. Init parameters:
       `canvas_width`, `canvas_height`: rastered canvas dims
       `raster_norm`: use the normaliser from the OCR `dls`
       `clip_raster`: whether to clip color values to [0., 1.] as is done when
            saving image (note that the values produced by the render
            function value wildly up to more than 10. so setting this to
            False is advised against)
       `apply_gamma`: whether to apply `gamma` to the color values similarly
            to clipping above
       `n_distance_params`: number of params with tanh activation
       `n_width_params`: number of params with sigmoid activation
       `eps`: amount of random jitter added to `distance_params` 
       `n_colors_out`: color channels out
       `max_distance`: the maximum fraction [0., 1.] of canvas dims 
            `distance_params` can span
       `fixed_seed`: fixed seed value to pass to `pydiffvg.RenderFunction`
       `gamma`: set to override default gamma of 2.2 for colour images and 1.
            for grayscale ones
       `stroke_width`: stroke width for shape generator helpers
            (note that this is defined as a fraction of `canvas_size`);
            either a float or a tuple of min and max width and used by
            `expand_stroke_width`
       `stroke_color`: default stroke color for shape generator helpers
       `rendered_callback`: set to an ImageSaver to save interim renders"""
    batch_i = -1
    bs: int = None
    eps_tensor: Tensor = None
    scenes: List[Scene] = []
    x: Tensor = None
    def __init__(self, canvas_width: int, canvas_height: int, raster_norm: Normaliser = None,
                 n_distance_params = 1, n_width_params = 1, eps = None, clip_raster = True,
                 apply_gamma = True, n_colors_out = 1, max_distance = 1., fixed_seed: int = None, 
                 gamma: float = None, stroke_width: Union[float, Tuple[float, float]] = 1./28, 
                 stroke_color = DIFFVG_BLACK, 
                 rendered_callback: Callable[[Tensor, int, int, Scene, bool, float], None] = None):
        super(VectorRenderLayerBase, self).__init__()
        self.canvas_size = max(canvas_width, canvas_height)
        stroke_width = tensor(stroke_width)
        if canvas_width != canvas_height: 
            warn(f"When canvas is not square ({canvas_width}x{canvas_height}), "
                  "some dimensions may be expanded outside it.")
        n_width_params = n_width_params or 0
        if gamma is None: gamma = 1. if n_colors_out == 1 else 2.2
        store_attr()

    def get_item(self, i: int) -> Tensor:
        """Get item `i` in the batch `x`"""
        return self.x[i]

    def get_distance_params(self, i: int) -> Tensor:
        """`distance_params` for item `i` in the batch passed by 
           `FontParamLayer` as part of `x`"""
        return self.get_item(i)[:self.n_distance_params]

    def get_width_params(self, i: int) -> Tensor:
        """`width_params` for item `i` in the batch passed by 
           `FontParamLayer` as part of `x`"""
        return self.get_item(i)[self.n_distance_params : self.n_distance_params + self.n_width_params]

    def get_inputs(self, i: int) -> Tensor:
        """`bs` number of letter categories for item `i` in the batch passed 
            by `FontParamLayer` as part of `x`"""
        return self.get_item(i)[self.n_distance_params + self.n_width_params :]

    @property
    def eps(self) -> float:
        """Get the amount of random noise to add when evaluating params."""
        return self._eps

    @eps.setter
    def eps(self, value: float):
        """Set the amount of random noise to add when evaluating params."""
        self._eps = value
        self.eps_tensor = None

    def add_eps(self) -> None:
        """Apply random eps to distance params. Cf. diffvg/apps/generative 
           modeling/rendering.render_lines"""
        assert self.bs is not None
        if not self.eps or not self.n_distance_params: return
        if self.eps_tensor is None:
            # Premake a tensor that has eps values matching distance_params
            # in the input, i.e. the first items on each row, and zeros elsewhere
            self.eps_tensor = torch.column_stack((torch.full((self.bs, self.n_distance_params), self.eps),
                                                  torch.zeros(self.bs, self.x.size(1) - self.n_distance_params)))
        self.x = self.x + self.eps_tensor * torch.randn_like(self.x)

    def expand_distance(self, vals: Tensor) -> Tensor:
        """Expand values based on `[-m, m]` where `m = self.max_distance` 
           central coordinates."""
        return (.5 * (vals + 1.) * self.max_distance + (1 - self.max_distance) / 2) * self.canvas_size

    def expand_stroke_width(self, vals: Tensor = None) -> Tensor:
        """Expand `vals`,  based on `[min_stroke_width, max_stroke_width] * canvas_height`."""
        if vals is None: vals = tensor(1.)
        w = vals * self.stroke_width if self.stroke_width.ndim == 0 \
            else self.stroke_width[0] + vals * (self.stroke_width[1] - self.stroke_width[0])
        return w * self.canvas_size

    def normalise_raster(self, raster: Tensor) -> Tensor:
        """Apply normalisation to `raster`. Not useful for grayscale letters."""
        if not self.raster_norm: return raster
        return (raster - self.raster_norm.mean) / self.raster_norm.std 

    def forward(self, x) -> Tensor:
        """Render letters defined in `x`, which also contains the font parameters."""
        self.batch_i += 1
        self.x = x
        self.bs = x.size(0)
        self.add_eps()
        self.scenes = [None] * self.bs
        self.create_scenes()
        return self.render()

    def create_scenes(self) -> None:
        """Override this in subclasses to create the vector scenes for the letters."""
        raise NotImplementedError()

    def create_line_scene(self, *shapes) -> Scene:
        """Create a simple line-drawing Scene with shapes."""
        shape_groups = [pydiffvg.ShapeGroup(shape_ids=tensor(list(range(len(shapes)))),
                                            fill_color=None,
                                            stroke_color=self.stroke_color,
                                            use_even_odd_rule=False)]
        return Scene(shapes=shapes, shape_groups=shape_groups, 
                     canvas_width = self.canvas_width, canvas_height = self.canvas_height)

    def create_line_scene_from_points(self, *point_tensors, stroke_width=None) -> Scene:
        """Shorthand for `create_line_scene` by passing `point_tensors` that
           are converted to polygons."""
        return self.create_line_scene(*self.points_to_polygons(*point_tensors, stroke_width=stroke_width))

    def points_to_polygons(self, *point_tensors, stroke_width=None) -> List[pydiffvg.Polygon]:
        """Convert `point_tensors` to a List of pydiffvg Polygons."""
        return [pydiffvg.Polygon(points=pt, 
                                 stroke_width=self.expand_stroke_width() if stroke_width is None else stroke_width,
                                 is_closed=False) \
                for pt in point_tensors]


    def render(self) -> Tensor:
        """Render `self.scenes` as a raster tensor using pydiffvg."""
        assert self.scenes is not None and len(self.scenes) == self.bs
        cols = self.n_colors_out
        output = torch.zeros(self.bs, cols, self.canvas_width, self.canvas_height) # .requires_grad_()
        for i, s in enumerate(self.scenes):
            raster = s.render(seed=self.fixed_seed)
            if self.rendered_callback: 
                self.rendered_callback(raster=raster, batch_i=self.batch_i, item_i=i, scene=s, normalize=False, gamma=self.gamma)
            if cols in (1, 3):
                raster = raster[:,:,-1]               # w,h; float 0.-1. where 1. is black
                raster = 1. - raster                  # w,h; float 0.-1. where 0. is black
                raster = raster.expand(cols, -1, -1)  # c,w,h; all channels equal
            elif cols != 4: raise NotImplementedError(f"n_colors_out '{cols}' can only be 1, 3 or 4.")
            raster = self.normalise_raster(raster)
            if self.clip_raster: raster = raster.clip(0., 1.)
            if self.apply_gamma: 
                if cols == 1: raster = raster.pow(1.0/self.gamma)
                else: raster[:,:,:3] = raster[:,:,:3].pow(1.0/self.gamma)
            # assert raster.requires_grad
            output[i] = raster
        return output

add_docs(VectorRenderLayerBase)

## Loss Function

In [10]:
#export
class OCRLoss(CrossEntropyLossFlat):
    """Softmaxed CrossEntropyLossFlat between `ocr_model`'s prediction
       and target category. Use after `VectorRenderLayerBase`."""
    stored: List[Tuple[float, int, float, Tensor]] = []
    def __init__(self, ocr_model, debug = False, **kwargs):
        super(OCRLoss, self).__init__(**kwargs)
        assert ocr_model is not None
        ocr_model.eval()
        store_attr("ocr_model, debug")

    def __call__(self, inp, target):
        pred = self.activation(self.ocr_model(inp))
        loss = super(OCRLoss, self).__call__(pred, target)
        if self.debug: self.stored.append((loss.item(), pred[0].argmax().item(), pred[0].max().item(), pred[0].detach()))
        return loss

def param_loss(x: Tensor, loss_start=1.5, loss_factor=1.) -> Tensor:
    """Calculate a linear loss for abs values above `loss_start` multiplied
       by `loss_factor`."""
    return loss_factor * torch.maximum(x.abs() - loss_start, tensor(0.)).sum()

class ParamLoss(Module):
    """Calculate a loss based on extreme parameter values."""
    def __init__(self, param_layer, loss_start_dist=1.5, loss_start_width=4., loss_factor=1., **kwargs):
        super(ParamLoss, self).__init__(**kwargs)
        assert param_layer is not None
        store_attr("param_layer,loss_start_dist,loss_start_width,loss_factor")

    def forward(self, *args):
        return param_loss(self.param_layer.distance_params, self.loss_start_dist, self.loss_factor) + \
               param_loss(self.param_layer.width_params, self.loss_start_width, self.loss_factor)
        
class OCRAndParamLoss(Module):
    """Combined OCR and param loss."""
    def __init__(self, ocr_model, param_layer, loss_start_dist=1.5, loss_start_width=4., 
                 loss_factor=1., debug = False, **kwargs):
        super(OCRAndParamLoss, self).__init__(**kwargs)
        self.ocr_loss = OCRLoss(ocr_model=ocr_model, debug=debug, **kwargs)
        self.param_loss = ParamLoss(param_layer=param_layer, loss_start_dist=loss_start_dist, 
                                    loss_start_width=loss_start_width, loss_factor=loss_factor, **kwargs)

    def forward(self, inp, target):
        return self.ocr_loss(inp, target) + self.param_loss(inp, target)

## Learner

In [21]:
#export
class VectorLearner(Learner):
    """A simple extension to Learner offering some utility methods."""
    def __init__(self, image_saver=None, **kwargs):
        super(VectorLearner, self).__init__(**kwargs)
        store_attr("image_saver")
    
    @property
    def vocab(self) -> List[str]:
        return self.dls.vocab

    def render_letter(self, letter: str = "A") -> PIL.Image:
        """Render a letter using the current vector model."""
        inp = tensor([self.vocab.index(letter)])
        m = self.model
        trn = m.training
        m.eval()
        with torch.no_grad(): img = m(inp).squeeze().clip(0., 1.) * 255
        m.train(trn)
        return PILImageBW.create(img).convert('RGB')

    def render_result_video(self, **kwargs):
        """Shortcut for `self.image_saver.render_result_video`"""
        assert self.image_saver is not None
        self.image_saver.render_result_video(**kwargs)

    def calculate_losses(self, n = 20, param_ranges: List[Union[Tuple[float, float], float]] = None) -> pd.DataFrame:
        """Output loss statistics and predictions for different param values.
           If `param_ranges` is supplied, it should contain the min and max
           values to use for each parameter or a fixed value."""
        assert n > 1
        assert self.loss_func.debug, "Debug must be enabled for the loss function."
        model = self.model
        model.eval()
        pl = model[0]
        n_dp = pl.n_distance_params
        n_wp = pl.n_width_params
        n_pars = n_dp + n_wp 
        param_f = torch.full((n_pars,), 4)  if param_ranges is None else tensor([0. if type(x) is float else x[1] - x[0] for x in param_ranges])
        param_c = torch.full((n_pars,), -2) if param_ranges is None else tensor([x  if type(x) is float else x[0] for x in param_ranges])
        x,y = self.dls.one_batch()
        stats = []
        for i in range(n):
            p_vals = param_c + param_f * i / (n - 1)
            if n_dp: pl.distance_params.data = p_vals[:n_dp]
            if n_wp: pl.width_params.data = p_vals[n_dp : n_dp + n_wp]
            # Vector prediction
            p = model(x)
            _ = self.loss_func(p, y)
            d = {
                "loss": self.loss_func.stored[-1][0],
                "pred": self.vocab[self.loss_func.stored[-1][1]],
                "pred_activation": self.loss_func.stored[-1][2]
                }
            for j in range(n_dp): d[f"distance_param_{j}"] = p_vals[j].item()
            for j in range(n_wp): d[f"width_param_{j}"] = p_vals[n_dp + j].item()
            stats.append(d)
        return pd.DataFrame(stats)

add_docs(VectorLearner)

## Export

In [28]:
#hide
from nbdev.export import notebook2script; notebook2script()

Converted 01_fontlearnertests.ipynb.
Converted 02_lettervectors.ipynb.
Converted aifont_core.ipynb.
Converted aifont_fontlearner.ipynb.
Converted aifont_fontsampler.ipynb.
Converted aifont_ocrlearner.ipynb.
Converted index.ipynb.
