# Refactoring Data Science Projects

Today, we'll be discussing how to refactor code from data science projects to make them more robust, maintainable and scalable. 
Many data scientists today are trained in mathematics includin probability and statistics as well as other disciplines, however, not all data scientists have training as software engineers. Because of this lack of familiarity with proper software engineering principles,  Code in many data science projects is often underperformant, as well as difficult to maintain and scale.


In this seminar, we will look at a simple data science project that trains a neural network on image recognition.  It will use a very common training set called *MNIST*.  MNIST is a set of gray scale hand drawn numbers.  The purpose of the project is to accurately distinguish different numbers from the images.  The challenge comes from the fact that many of the numbers in the images can be ambiguous.  A "1" may look like a "7", or a "5" may look like a "6".   The goal of the project is to look at the data set and accurately predict what is the correct number being displayed. 

Let's start by taking a look at the project code.  Our first file is main.py.  
main.py starts by importing required libraries.

It then sets the neural networks hyperparameters.
Next it loads the testing and training data and decides which machine learning model to use. 
It sets a number of other parameters including the optimization function and the loss function.

The code then runs the training and testing epochs for the model.
It then calculates the training metrics and then at the end, it resets the project.
This project also uses tensorboard to monitor the results.


In [None]:
import numpy as np
import torch
from sklearn.metrics import accuracy_score
from tqdm import tqdm

from src.dataset import get_train_dataloader, get_test_dataloader
from src.metrics import Metric
from src.models import LinearNet
from src.tracking import TensorboardExperiment, Stage
from src.utils import generate_tensorboard_experiment_directory

# Hyperparameters
hparams = {
    'EPOCHS': 20,
    'LR': 5e-5,
    'OPTIMIZER': 'Adam',
    'BATCH_SIZE': 128
}

# Data
train_loader = get_train_dataloader(batch_size=hparams.get('BATCH_SIZE'))
test_loader = get_test_dataloader(batch_size=hparams.get('BATCH_SIZE'))

# Model and Optimizer
model = LinearNet()
optimizer = torch.optim.Adam(model.parameters(), lr=hparams.get('LR'))

# Objective (loss) function
compute_loss = torch.nn.CrossEntropyLoss(reduction='mean')

# Metric Containers
train_accuracy = Metric()
test_accuracy = Metric()
y_true_batches = []
y_pred_batches = []

# Experiment Trackers
log_dir = generate_tensorboard_experiment_directory(root='./runs')
experiment = TensorboardExperiment(log_dir=log_dir)

# Batch Counters
test_batch = 0
train_batch = 0

for epoch in range(hparams.get('EPOCHS')):
    # Testing Loop
    for x_test, y_test in tqdm(test_loader, desc='Validation Batches', ncols=80):
        test_batch += 1
        test_batch_size = x_test.shape[0]
        test_pred = model(x_test)
        loss = compute_loss(test_pred, y_test)

        # Compute Batch Validation Metrics
        y_test_np = y_test.detach().numpy()
        y_test_pred_np = np.argmax(test_pred.detach().numpy(), axis=1)
        batch_test_accuracy = accuracy_score(y_test_np, y_test_pred_np)
        test_accuracy.update(batch_test_accuracy, test_batch_size)
        experiment.set_stage(Stage.VAL)
        experiment.add_batch_metric('accuracy', batch_test_accuracy, test_batch)
        y_true_batches += [y_test_np]
        y_pred_batches += [y_test_pred_np]

    # Training Loop
    for x_train, y_train in tqdm(train_loader, desc='Train Batches', ncols=80):
        train_batch += 1
        train_batch_size = x_train.shape[0]
        train_pred = model(x_train)
        loss = compute_loss(train_pred, y_train)

        # Compute Batch Training Metrics
        y_train_np = y_train.detach().numpy()
        y_train_pred_np = np.argmax(train_pred.detach().numpy(), axis=1)
        batch_train_accuracy = accuracy_score(y_train_np, y_train_pred_np)
        train_accuracy.update(batch_train_accuracy, train_batch_size)
        experiment.set_stage(Stage.TRAIN)
        experiment.add_batch_metric('accuracy', batch_train_accuracy, train_batch)

        # Reverse-mode AutoDiff (backpropagation)
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()

    # Compute Average Epoch Metrics
    summary = ', '.join([
        f"[Epoch: {epoch + 1}/{hparams.get('EPOCHS')}]",
        f"Test Accuracy: {test_accuracy.average: 0.4f}",
        f"Train Accuracy: {train_accuracy.average: 0.4f}",
    ])
    print('\n' + summary + '\n')

    # Log Validation Epoch Metrics
    experiment.set_stage(Stage.VAL)
    experiment.add_epoch_metric('accuracy', test_accuracy.average, epoch)
    experiment.add_epoch_confusion_matrix(y_true_batches, y_pred_batches, epoch)

    # Log Validation Epoch Metrics
    experiment.set_stage(Stage.TRAIN)
    experiment.add_epoch_metric('accuracy', train_accuracy.average, epoch)

    # Reset metrics
    train_accuracy.reset()
    test_accuracy.reset()

experiment.flush()

Most of the code, other than the main.py file, is located in a src folder.  Let's start looking at these modules. 
The first module we'll examine is dataset.py.

This module handles loading the MNIST data set into memory.  IT also has some preprocessing methods to normalize the data for use with the training model. Note that the preprocessing_x method uses a variable (self.x) to store intermediate results.  This is problematic from a proper softwware engineering standpoint.  This is because self.x isn't going to have the same value at different stages during the program execution.   This is an example of a method that needs refactoring. 


In [None]:
import numpy as np
import torch
from torch.utils.data import Dataset, DataLoader

from src.load_data import load_train_labels, load_train_data, load_test_data, load_test_labels


class MNIST(Dataset):
    idx: int  # requested data index
    x: torch.Tensor
    y: torch.Tensor

    TRAIN_MAX = 255.0
    TRAIN_NORMALIZED_MEAN = 0.1306604762738429
    TRAIN_NORMALIZED_STDEV = 0.3081078038564622

    def __init__(self, data: np.ndarray, targets: np.ndarray):
        if len(data) != len(targets):
            raise ValueError('data and targets must be the same length. '
                             f'{len(data)} != {len(targets)}')

        self.data = data
        self.targets = targets

    def __len__(self):
        return len(self.data)

    def __getitem__(self, idx) -> tuple[torch.Tensor, torch.Tensor]:
        x = self.get_x(idx)
        y = self.get_y(idx)
        return x, y

    def get_x(self, idx: int):
        self.idx = idx
        self.preprocess_x()
        return self.x

    def preprocess_x(self):
        self.x = self.data[self.idx].copy().astype(np.float64)
        self.x /= self.TRAIN_MAX
        self.x -= self.TRAIN_NORMALIZED_MEAN
        self.x /= self.TRAIN_NORMALIZED_STDEV
        self.x = self.x.astype(np.float32)
        self.x = torch.from_numpy(self.x)
        self.x = self.x.unsqueeze(0)

    def get_y(self, idx: int):
        self.idx = idx
        self.preprocess_y()
        return self.y

    def preprocess_y(self):
        self.y = self.targets[self.idx]
        self.y = torch.tensor(self.y, dtype=torch.long)


def get_train_dataloader(batch_size: int) -> DataLoader:
    return DataLoader(
        dataset=MNIST(load_train_data(), load_train_labels()),
        batch_size=batch_size,
        shuffle=True,
        num_workers=0,
    )


def get_test_dataloader(batch_size: int) -> DataLoader:
    return DataLoader(
        dataset=MNIST(load_test_data(), load_test_labels()),
        batch_size=batch_size,
        shuffle=False,
        num_workers=0,
    )

Next, let's look at the load_data.py module.  This is the low level module that actually handles loading the data directly from the disk. This module reads the data from the disk, make sure that the data is in unsigned byte format, checks the size and dimensions of the data and returns an np.array of both the data and the labels.  Note here that the load_test_data() and load_train_data() methods are nearly identical.  This is a target for refactoring. 

In [None]:
import gzip
import struct
from pathlib import Path

import numpy as np

DATA_DIR = (Path(__file__).parent / "../data").resolve()

ALLOWED_TYPES = {
    "UNSIGNED_BYTE": b"\x08",
    "SIGNED_BYTE": b"\x09",
    "SHORT": b"\x0B",
    "INT": b"\x0C",
    "SINGLE": b"\x0D",
    "DOUBLE": b"\x0E",
}


def load_test_data():
    with gzip.open(DATA_DIR / "t10k-images-idx3-ubyte.gz", "rb") as fp:
        _ = struct.unpack(">H", fp.read(2))  # dump padding bytes

        (data_type,) = struct.unpack(">c", fp.read(1))
        assert data_type == ALLOWED_TYPES["UNSIGNED_BYTE"]

        number_of_dimensions = ord(struct.unpack(">c", fp.read(1))[0])
        assert number_of_dimensions == 3

        (num_images,) = struct.unpack(">I", fp.read(4))
        assert num_images == 10_000

        (num_rows,) = struct.unpack(">I", fp.read(4))
        (num_cols,) = struct.unpack(">I", fp.read(4))
        assert num_rows == num_cols == 28

        raw = fp.read()
        assert len(raw) == num_images * num_rows * num_cols

    data = np.frombuffer(raw, dtype=np.dtype(np.uint8).newbyteorder(">"))
    data = data.reshape((num_images, num_rows, num_cols))
    return data


def load_train_data():
    with gzip.open(DATA_DIR / "train-images-idx3-ubyte.gz", "rb") as fp:
        _ = struct.unpack(">H", fp.read(2))  # dump padding bytes

        (data_type,) = struct.unpack(">c", fp.read(1))
        assert data_type == ALLOWED_TYPES["UNSIGNED_BYTE"]

        number_of_dimensions = ord(struct.unpack(">c", fp.read(1))[0])
        assert number_of_dimensions == 3

        (num_images,) = struct.unpack(">I", fp.read(4))
        assert num_images == 60_000

        (num_rows,) = struct.unpack(">I", fp.read(4))
        (num_cols,) = struct.unpack(">I", fp.read(4))
        assert num_rows == num_cols == 28

        raw = fp.read()
        assert len(raw) == num_images * num_rows * num_cols

    data = np.frombuffer(raw, dtype=np.dtype(np.uint8).newbyteorder(">"))
    data = data.reshape((num_images, num_rows, num_cols))

    return data


def load_test_labels():
    with gzip.open(DATA_DIR / "t10k-labels-idx1-ubyte.gz", "rb") as fp:
        _ = struct.unpack(">H", fp.read(2))  # dump padding bytes

        (data_type,) = struct.unpack(">c", fp.read(1))
        assert data_type == ALLOWED_TYPES["UNSIGNED_BYTE"]

        number_of_dimensions = ord(struct.unpack(">c", fp.read(1))[0])
        assert number_of_dimensions == 1

        (num_images,) = struct.unpack(">I", fp.read(4))
        assert num_images == 10_000

        raw = fp.read()
        assert len(raw) == num_images

    data = np.frombuffer(raw, dtype=np.dtype(np.uint8).newbyteorder(">"))
    return data


def load_train_labels():
    with gzip.open(DATA_DIR / "train-labels-idx1-ubyte.gz", "rb") as fp:
        _ = struct.unpack(">H", fp.read(2))  # dump padding bytes

        (data_type,) = struct.unpack(">c", fp.read(1))
        assert data_type == ALLOWED_TYPES["UNSIGNED_BYTE"]

        number_of_dimensions = ord(struct.unpack(">c", fp.read(1))[0])
        assert number_of_dimensions == 1

        (num_images,) = struct.unpack(">I", fp.read(4))
        assert num_images == 60_000

        raw = fp.read()
        assert len(raw) == num_images

    data = np.frombuffer(raw, dtype=np.dtype(np.uint8).newbyteorder(">"))
    return data


Now, let's look at the metrics.py module.  This is has the responsibility of collecting metrics about the machine learning module, i.e. how well is it doing in predicting correct results.  Note that we're importing the Real number set from the Python numbers library, but we're using it interchangeably with float values in other variables.  This is a target for refactoring. 

In [None]:
from numbers import Real


class Metric:
    values: list[Real]
    running_total: float
    num_updates: float
    average: float

    def __init__(self):
        self.reset()

    def __str__(self):
        return f"Metric(average={self.average:0.4f})"

    def update(self, value: Real, batch_size: int):
        self.values.append(value)
        self.running_total += value * batch_size
        self.num_updates += batch_size
        self.average = self.running_total / self.num_updates

    def reset(self):
        self.values: list[Real] = []
        self.running_total: float = 0.0
        self.num_updates: float = 0.0
        self.average: float = 0.0


The model.py module creates our ML model using Pytorch.  Again, notice the structure of the forward() method.  We're using the same variable to store intermediate results.  This is a target for refactoring. 

In [None]:
import torch


class LinearNet(torch.nn.Module):
    def __init__(self):
        super(LinearNet, self).__init__()

        self.flatten = torch.nn.Flatten()
        self.linear1 = torch.nn.Linear(in_features=28 * 28, out_features=32)
        self.relu = torch.nn.ReLU()
        self.linear2 = torch.nn.Linear(in_features=32, out_features=10)
        self.softmax = torch.nn.Softmax(dim=1)

    def forward(self, x: torch.Tensor):
        x = self.flatten(x)
        x = self.linear1(x)
        x = self.relu(x)
        x = self.linear2(x)
        x = self.softmax(x)
        return x


The tracking module is used to send the results to tensorboard.  Tensorboard is a web based application which displays metrics in a visual format. Note that the ExperimentTracker is an abstract base class, but not all methods in this class are abstract.  We'll see how we can replace the abstract base class with a new feature in Python called a *Protocol class*. 
Additionally, we'll discuss another new Python feature called a *Dataclass.*  Also note that we have a Stage class with three values  This might better be implemented as an Enum. 

In [None]:
from abc import ABC, abstractmethod
from dataclasses import dataclass
from numbers import Real
from pathlib import Path
from typing import Union, Tuple

import numpy as np
from matplotlib import pyplot as plt
from sklearn.metrics import confusion_matrix, ConfusionMatrixDisplay
from torch.utils.tensorboard import SummaryWriter


@dataclass(frozen=True)
class Stage:
    TRAIN: str = 'train'
    TEST: str = 'test'
    VAL: str = 'val'


class ExperimentTracker(ABC):
    stage: str

    @abstractmethod
    def add_batch_metric(self, name: str, value: Real, step: int):
        """Implements logging a batch-level metric."""

    @abstractmethod
    def add_epoch_metric(self, name: str, value: Real, step: int):
        """Implements logging a epoch-level metric."""

    @abstractmethod
    def add_epoch_confusion_matrix(self, y_true: np.array, y_pred: np.array, step: int):
        """Implements logging a confusion matrix at epoch-level."""

    @abstractmethod
    def add_hparams(self, hparams: dict[str, Union[str, Real]], metrics: dict[str, Real]):
        """Implements logging hyperparameters."""

    def add_batch_metrics(self, values: dict[str, Real], step: int):
        for name, value in values.items():
            self.add_batch_metric(name, value, step)

    def add_epoch_metrics(self, values: dict[str, Real], step: int):
        for name, value in values.items():
            self.add_epoch_metric(name, value, step)


class TensorboardExperiment(ExperimentTracker):

    def __init__(self, log_dir: str, create=True):
        self._validate_log_dir(log_dir, create=create)
        self._writer = SummaryWriter(log_dir=log_dir)
        plt.ioff()

    def set_stage(self, stage: str):
        self.stage = stage
        return self

    def flush(self):
        self._writer.flush()

    @staticmethod
    def _validate_log_dir(log_dir, create=True):
        log_dir = Path(log_dir).resolve()
        if log_dir.exists():
            return
        elif not log_dir.exists() and create:
            log_dir.mkdir(parents=True)
        else:
            raise NotADirectoryError(f'log_dir {log_dir} does not exist.')

    def add_batch_metric(self, name: str, value: Real, step: int):
        tag = f'{self.stage}/batch/{name}'
        self._writer.add_scalar(tag, value, step)

    def add_epoch_metric(self, name: str, value: Real, step: int):
        tag = f'{self.stage}/epoch/{name}'
        self._writer.add_scalar(tag, value, step)

    def add_epoch_confusion_matrix(self, y_true: list[np.array], y_pred: list[np.array], step: int):
        y_true, y_pred = self.collapse_batches(y_true, y_pred)
        fig = self.create_confusion_matrix(y_true, y_pred, step)
        tag = f'{self.stage}/epoch/confusion_matrix'
        self._writer.add_figure(tag, fig, step)

    @staticmethod
    def collapse_batches(y_true: list[np.array], y_pred: list[np.array]) -> Tuple[np.ndarray, np.ndarray]:
        return np.concatenate(y_true), np.concatenate(y_pred)

    def create_confusion_matrix(self, y_true: np.array, y_pred: np.array, step: int) -> plt.Figure:
        cm = ConfusionMatrixDisplay(confusion_matrix(y_true, y_pred)).plot(cmap='Blues')
        fig: plt.Figure = cm.figure_
        ax: plt.Axes = cm.ax_
        ax.set_title(f'{self.stage.title()} Epoch: {step}')
        return fig

    def add_hparams(self, hparams: dict[str, Union[str, Real]], metrics: dict[str, Real]):
        _metrics = self._validate_hparam_metric_keys(metrics)
        self._writer.add_hparams(hparams, _metrics)

    @staticmethod
    def _validate_hparam_metric_keys(metrics):
        _metrics = metrics.copy()
        prefix = 'hparam/'
        for name in _metrics.keys():
            if not name.startswith(prefix):
                _metrics[f'{prefix}{name}'] = _metrics[name]
                del _metrics[name]
        return _metrics


Finally, we have a file called *utils.py*.  Which, as the name suggests contain a number of utility functions used by other parts of the application. 

In [None]:
from pathlib import Path


def generate_tensorboard_experiment_directory(root: str, parents=True) -> str:
    root = Path(root).resolve()
    child = create_from_missing(root) if not root.exists() else create_from_existing(root)
    child.mkdir(parents=parents)
    return child.as_posix()


def create_from_missing(root):
    return root / '0'


def create_from_existing(root):
    children = [int(c.name) for c in root.glob('*') if (c.is_dir() and c.name.isnumeric())]
    if is_first_experiment(children):
        child = root / '0'
    else:
        child = root / increment_experiment_number(children)
    return child


def is_first_experiment(children: list[int]) -> bool:
    return len(children) == 0


def increment_experiment_number(children: list[int]) -> str:
    return str(max(children) + 1)


## A Small Digression

Here we'll digress from our topic to discuss two features of Python that we will use in our refactoring. 
The first features is the new Protocol class that was introduced in Python 3.8.  

### Protocol Classes
The closest resemblance to a Protocol class in other languages might be the *Interface* features in Java.  Protocol classes are used as an implicit base class for other classes. 

Any class that has the same methods defined as the Protocol class are determined to be subclasses of the base class for any static typing analysis. 

Here's a simple exxample of a Protocol class. 

In [1]:
from typing import Protocol

class Person(Protocol):
    
    def returnRole(self) -> str:
        pass
    
class Employee:
    def __init__(self, role: str,salary: float):
        self.role = role
        self.salary = salary
        
    def returnRole(self) -> str:
        return self.role
    
class Manager:
    def __init__(self,role:str, salary: float):
        self.role = role
        self.salary = salary
        
    def returnRole(self) -> str:
        return self.role
    
m = Manager('Operations manager',50000.00)
e = Employee('Line worker',35000.00)

print (m.returnRole())
print (e.returnRole())
        
    

Operations manager
Line worker


Note that we don't have to explicitly define an inheritance structure in our sub classes.  Python uses *duck typing* to figure out the inheritance structure for us. 

### Data Classes ###
The data class feature in Python is designed to make it easy to write classes where the main purpose of the class is to store data, rather than implement logic via methods.   Here's an example of how we would use a data class.

Let's take an example of a class called Person. Before data classes we might easily implement a Person object as follows:

In [None]:
class Person:
    numbeer_of_people: int = 0
    def __init__(self,name: str, age: int, address:str):
        self.name = name
        self.age = age
        self.address = address
        
    def happyBirthday(self):
        self.age += 1
        
    def __str__(self) -> str:
        return f"Name: {self.name}e Age:  {self.age} Address {self.address}" 
        
    
    

Implementing a class like Person would require creating an __init__() method, as well as creating either a __str__() method, or a __repr__() method, or both.  

Here's how we would do the same thing using a dataclass.

In [12]:
from dataclasses import dataclass
from typing import ClassVar

@dataclass
class Person:
    
    name: str
    age:  int
    address: str
    number_of_people: ClassVar[int] = 0
        
    def __post__init__(self):
        print (type(number_of_people))
      #  Person.number_of_people[0] += 1
        
p1 = Person("Braun", 21, "1234 Main Street")
p2 = Person("Jon", 19, "1235 Main Street")
print (p1)
print (p2)
print (p1.number_of_people)

    
    

Person(name='Braun', age=21, address='1234 Main Street')
Person(name='Jon', age=19, address='1235 Main Street')
0


Note that the dataclass handles much of the boilerplate that you would be required to write yourself.  No longer do you need to create an \_\_init\_\_() method, nor do you need to override the \_\_str\_\_() method. 

## Let's refactor
Lets start by looking at the ExperimentTracker class.  Specificlly this code:

In [None]:
@dataclass(frozen=True)
class Stage:
    TRAIN: str = 'train'
    TEST: str = 'test'
    VAL: str = 'val'


Here we have a dataclass called Stage(The frozen=True option makes Stage immutable).  A better way to do this is to turn this into a enumeration (enum).   So, our new code looks like this:

In [None]:
from enum import Enum, auto()

class Stage(Enum):
    
    TRAIN = auto()
    TEST = auto()
    VAL = auto()
    


The auto() method assigns an incrementing value to each element of the enum members. 

Let's go back to our Tracking class.  There are a number of things we want to change.  We've already looked at changing the stage type to an enum, but also note that this is defined in the abstract base class.  It is usually never a great idea to define concrete variable types inside an ABC, so we'll move this out of the ABC and into the concrete class.

In [None]:
from abc import ABC, abstractmethod
from dataclasses import dataclass
from numbers import Real
from pathlib import Path
from typing import Union, Tuple
from enum import Enum,  auto()

import numpy as np
from matplotlib import pyplot as plt
from sklearn.metrics import confusion_matrix, ConfusionMatrixDisplay
from torch.utils.tensorboard import SummaryWriter



class Stage(Enum):
    TRAIN = auto()
    TEST = auto()
    VAL = auto()

class ExperimentTracker(ABC):
    
    #Note here that stage is no longer a string, but a Stage enum, so we need to change its type. 
    # Additionally we want to remove the entire definition out of the Abstract class and into the
    #concrete implementation. 
    #stage: str
    

    @abstractmethod
    def add_batch_metric(self, name: str, value: Real, step: int):
        """Implements logging a batch-level metric."""

    @abstractmethod
    def add_epoch_metric(self, name: str, value: Real, step: int):
        """Implements logging a epoch-level metric."""

    @abstractmethod
    def add_epoch_confusion_matrix(self, y_true: np.array, y_pred: np.array, step: int):
        """Implements logging a confusion matrix at epoch-level."""

    @abstractmethod
    def add_hparams(self, hparams: dict[str, Union[str, Real]], metrics: dict[str, Real]):
        """Implements logging hyperparameters."""

    def add_batch_metrics(self, values: dict[str, Real], step: int):
        for name, value in values.items():
            self.add_batch_metric(name, value, step)

    def add_epoch_metrics(self, values: dict[str, Real], step: int):
        for name, value in values.items():
            self.add_epoch_metric(name, value, step)


class TensorboardExperiment(ExperimentTracker):
  
# Move this from Abstract to concrete implementation. 
    stage: Stage

    def __init__(self, log_dir: str, create=True):
        self._validate_log_dir(log_dir, create=create)
        self._writer = SummaryWriter(log_dir=log_dir)
        plt.ioff()

        #Change the stage type here as well. 
    def set_stage(self, stage: Stage):
        self.stage = stage
        return self

    def flush(self):
        self._writer.flush()

    @staticmethod
    def _validate_log_dir(log_dir, create=True):
        log_dir = Path(log_dir).resolve()
        if log_dir.exists():
            return
        elif not log_dir.exists() and create:
            log_dir.mkdir(parents=True)
        else:
            raise NotADirectoryError(f'log_dir {log_dir} does not exist.')
      #Now that we've set the stage as an enum, we want to print out the stage name, not its value. 
      # We do this by chaining the name() method to the stage variable.
    def add_batch_metric(self, name: str, value: Real, step: int):
       # tag = f'{self.stage}/batch/{name}'
        tag = f'{self.stage.name}/batch/{name}'
        self._writer.add_scalar(tag, value, step)

        
    def add_epoch_metric(self, name: str, value: Real, step: int):
        
   
      # Again, add the .name attribute. 
      #  tag = f'{self.stage}/epoch/{name}'
        tag = f'{self.stage.name}/epoch/{name}'
        self._writer.add_scalar(tag, value, step)

    def add_epoch_confusion_matrix(self, y_true: list[np.array], y_pred: list[np.array], step: int):
        y_true, y_pred = self.collapse_batches(y_true, y_pred)
        fig = self.create_confusion_matrix(y_true, y_pred, step)
        # Add the name attribute to the stage variable here as well. 
        # tag = f'{self.stage}/epoch/confusion_matrix'
        tag = f'{self.stage.name}/epoch/confusion_matrix'

        self._writer.add_figure(tag, fig, step)

    @staticmethod
    def collapse_batches(y_true: list[np.array], y_pred: list[np.array]) -> Tuple[np.ndarray, np.ndarray]:
        return np.concatenate(y_true), np.concatenate(y_pred)

    def create_confusion_matrix(self, y_true: np.array, y_pred: np.array, step: int) -> plt.Figure:
        cm = ConfusionMatrixDisplay(confusion_matrix(y_true, y_pred)).plot(cmap='Blues')
        fig: plt.Figure = cm.figure_
        ax: plt.Axes = cm.ax_
        ax.set_title(f'{self.stage.title()} Epoch: {step}')
        return fig

    def add_hparams(self, hparams: dict[str, Union[str, Real]], metrics: dict[str, Real]):
        _metrics = self._validate_hparam_metric_keys(metrics)
        self._writer.add_hparams(hparams, _metrics)

    @staticmethod
    def _validate_hparam_metric_keys(metrics):
        _metrics = metrics.copy()
        prefix = 'hparam/'
        for name in _metrics.keys():
            if not name.startswith(prefix):
                _metrics[f'{prefix}{name}'] = _metrics[name]
                del _metrics[name]
        return _metrics


Let's now pay attention to the ExperimentTracker abstract base class.  We note that this class consists of abstract and non-abstract methods.  Ideally, we'd like to move all concrete implementations out of the ABC and into the concrete class.  However, we notice that in this case, none of those non-abstract methods are actually being used. They really just call the abstract methods.   So, let's get rid of them altogether.  

In [None]:
from abc import ABC, abstractmethod
from dataclasses import dataclass
from numbers import Real
from pathlib import Path
from typing import Union, Tuple
from enum import Enum,  auto()

import numpy as np
from matplotlib import pyplot as plt
from sklearn.metrics import confusion_matrix, ConfusionMatrixDisplay
from torch.utils.tensorboard import SummaryWriter


class Stage(Enum):
    TRAIN = auto()
    TEST = auto()
    VAL = auto()

class ExperimentTracker(ABC):
    
    #Note here that stage is no longer a string, but a Stage enum, so we need to change its type. 
    # Additionally we want to remove the entire definition out of the Abstract class and into the
    #concrete implementation. 
    #stage: str
    

    @abstractmethod
    def add_batch_metric(self, name: str, value: Real, step: int):
        """Implements logging a batch-level metric."""

    @abstractmethod
    def add_epoch_metric(self, name: str, value: Real, step: int):
        """Implements logging a epoch-level metric."""

    @abstractmethod
    def add_epoch_confusion_matrix(self, y_true: np.array, y_pred: np.array, step: int):
        """Implements logging a confusion matrix at epoch-level."""

    @abstractmethod
    def add_hparams(self, hparams: dict[str, Union[str, Real]], metrics: dict[str, Real]):
        """Implements logging hyperparameters."""

## Removed the non-abstract methods from the ABC as they're never used. 

class TensorboardExperiment(ExperimentTracker):
  
# Move this from Abstract to concrete implementation. 
    stage: Stage

    def __init__(self, log_dir: str, create=True):
        self._validate_log_dir(log_dir, create=create)
        self._writer = SummaryWriter(log_dir=log_dir)
        plt.ioff()

        #Change the stage type here as well. 
    def set_stage(self, stage: Stage):
        self.stage = stage
        return self

    def flush(self):
        self._writer.flush()

    @staticmethod
    def _validate_log_dir(log_dir, create=True):
        log_dir = Path(log_dir).resolve()
        if log_dir.exists():
            return
        elif not log_dir.exists() and create:
            log_dir.mkdir(parents=True)
        else:
            raise NotADirectoryError(f'log_dir {log_dir} does not exist.')
      #Now that we've set the stage as an enum, we want to print out the stage name, not its value. 
      # We do this by chaining the name() method to the stage variable.
    def add_batch_metric(self, name: str, value: Real, step: int):
       # tag = f'{self.stage}/batch/{name}'
        tag = f'{self.stage.name}/batch/{name}'
        self._writer.add_scalar(tag, value, step)

        
    def add_epoch_metric(self, name: str, value: Real, step: int):
        
   
      # Again, add the .name attribute. 
      #  tag = f'{self.stage}/epoch/{name}'
        tag = f'{self.stage.name}/epoch/{name}'
        self._writer.add_scalar(tag, value, step)

    def add_epoch_confusion_matrix(self, y_true: list[np.array], y_pred: list[np.array], step: int):
        y_true, y_pred = self.collapse_batches(y_true, y_pred)
        fig = self.create_confusion_matrix(y_true, y_pred, step)
        # Add the name attribute to the stage variable here as well. 
        # tag = f'{self.stage}/epoch/confusion_matrix'
        tag = f'{self.stage.name}/epoch/confusion_matrix'

        self._writer.add_figure(tag, fig, step)

    @staticmethod
    def collapse_batches(y_true: list[np.array], y_pred: list[np.array]) -> Tuple[np.ndarray, np.ndarray]:
        return np.concatenate(y_true), np.concatenate(y_pred)

    def create_confusion_matrix(self, y_true: np.array, y_pred: np.array, step: int) -> plt.Figure:
        cm = ConfusionMatrixDisplay(confusion_matrix(y_true, y_pred)).plot(cmap='Blues')
        fig: plt.Figure = cm.figure_
        ax: plt.Axes = cm.ax_
        ax.set_title(f'{self.stage.title()} Epoch: {step}')
        return fig

    def add_hparams(self, hparams: dict[str, Union[str, Real]], metrics: dict[str, Real]):
        _metrics = self._validate_hparam_metric_keys(metrics)
        self._writer.add_hparams(hparams, _metrics)

    @staticmethod
    def _validate_hparam_metric_keys(metrics):
        _metrics = metrics.copy()
        prefix = 'hparam/'
        for name in _metrics.keys():
            if not name.startswith(prefix):
                _metrics[f'{prefix}{name}'] = _metrics[name]
                del _metrics[name]
        return _metrics


In [None]:
import numpy as np
import torch
from sklearn.metrics import accuracy_score
from tqdm import tqdm

from src.dataset import get_train_dataloader, get_test_dataloader
from src.metrics import Metric
from src.models import LinearNet
from src.tracking import TensorboardExperiment, Stage
from src.utils import generate_tensorboard_experiment_directory

# Hyperparameters
hparams = {
    'EPOCHS': 20,
    'LR': 5e-5,
    'OPTIMIZER': 'Adam',
    'BATCH_SIZE': 128
}

# Data
train_loader = get_train_dataloader(batch_size=hparams.get('BATCH_SIZE'))
test_loader = get_test_dataloader(batch_size=hparams.get('BATCH_SIZE'))

# Model and Optimizer
model = LinearNet()
optimizer = torch.optim.Adam(model.parameters(), lr=hparams.get('LR'))

# Objective (loss) function
compute_loss = torch.nn.CrossEntropyLoss(reduction='mean')

# Metric Containers
train_accuracy = Metric()
test_accuracy = Metric()
y_true_batches = []
y_pred_batches = []

# Experiment Trackers
log_dir = generate_tensorboard_experiment_directory(root='./runs')
experiment = TensorboardExperiment(log_dir=log_dir)

# Batch Counters
test_batch = 0
train_batch = 0

for epoch in range(hparams.get('EPOCHS')):
    # Testing Loop
    for x_test, y_test in tqdm(test_loader, desc='Validation Batches', ncols=80):
        test_batch += 1
        test_batch_size = x_test.shape[0]
        test_pred = model(x_test)
        loss = compute_loss(test_pred, y_test)

        # Compute Batch Validation Metrics
        y_test_np = y_test.detach().numpy()
        y_test_pred_np = np.argmax(test_pred.detach().numpy(), axis=1)
        batch_test_accuracy = accuracy_score(y_test_np, y_test_pred_np)
        test_accuracy.update(batch_test_accuracy, test_batch_size)
        experiment.set_stage(Stage.VAL)
        experiment.add_batch_metric('accuracy', batch_test_accuracy, test_batch)
        y_true_batches += [y_test_np]
        y_pred_batches += [y_test_pred_np]

    # Training Loop
    for x_train, y_train in tqdm(train_loader, desc='Train Batches', ncols=80):
        train_batch += 1
        train_batch_size = x_train.shape[0]
        train_pred = model(x_train)
        loss = compute_loss(train_pred, y_train)

        # Compute Batch Training Metrics
        y_train_np = y_train.detach().numpy()
        y_train_pred_np = np.argmax(train_pred.detach().numpy(), axis=1)
        batch_train_accuracy = accuracy_score(y_train_np, y_train_pred_np)
        train_accuracy.update(batch_train_accuracy, train_batch_size)
        experiment.set_stage(Stage.TRAIN)
        experiment.add_batch_metric('accuracy', batch_train_accuracy, train_batch)

        # Reverse-mode AutoDiff (backpropagation)
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()

    # Compute Average Epoch Metrics
    summary = ', '.join([
        f"[Epoch: {epoch + 1}/{hparams.get('EPOCHS')}]",
        f"Test Accuracy: {test_accuracy.average: 0.4f}",
        f"Train Accuracy: {train_accuracy.average: 0.4f}",
    ])
    print('\n' + summary + '\n')

    # Log Validation Epoch Metrics
    experiment.set_stage(Stage.VAL)
    experiment.add_epoch_metric('accuracy', test_accuracy.average, epoch)
    experiment.add_epoch_confusion_matrix(y_true_batches, y_pred_batches, epoch)

    # Log Validation Epoch Metrics
    experiment.set_stage(Stage.TRAIN)
    experiment.add_epoch_metric('accuracy', train_accuracy.average, epoch)

    # Reset metrics
    train_accuracy.reset()
    test_accuracy.reset()

experiment.flush()

Let's see if we can fix this dependency inversion issue.  The first thing we'll do is ditch the abstract base class concept and replacce it with a protocol class.  We'll also move the concrete implementation of set_stage and flush into the protocol class so that it becomes part of the interface.


In [None]:
from abc import ABC, abstractmethod
from dataclasses import dataclass
from numbers import Real
from pathlib import Path
from typing import Union, Tuple
from enum import Enum,  auto()

import numpy as np
from matplotlib import pyplot as plt
from sklearn.metrics import confusion_matrix, ConfusionMatrixDisplay
from torch.utils.tensorboard import SummaryWriter


class Stage(Enum):
    TRAIN = auto()
    TEST = auto()
    VAL = auto()
    
class ExperimentTracker(Protocol):
    
# Note that because this is now a protocol class, we can ditch the abstractmethod decorators on the methods.  This 
# cleans the code up a bit. 
    

  #   @abstractmethod
    def add_batch_metric(self, name: str, value: Real, step: int):
        """Implements logging a batch-level metric."""

 #   @abstractmethod
    def add_epoch_metric(self, name: str, value: Real, step: int):
        """Implements logging a epoch-level metric."""

 #   @abstractmethod
    def add_epoch_confusion_matrix(self, y_true: np.array, y_pred: np.array, step: int):
        """Implements logging a confusion matrix at epoch-level."""

 #   @abstractmethod
    def add_hparams(self, hparams: dict[str, Union[str, Real]], metrics: dict[str, Real]):
        """Implements logging hyperparameters."""
        
    def set_stage(self, stage: Stage):
        """ sets the stage """
    
    def flush(self):
       """ Flushes the experiment"""

# TensorboardExperiment no longer needs to have an inheritance realtionship witih experiment tracker. 
#class TensorboardExperiment(ExperimentTracker):
class TensorboardExperiment: 
    
# Move this from Abstract to concrete implementation. 
    stage: Stage

    def __init__(self, log_dir: str, create=True):
        self._validate_log_dir(log_dir, create=create)
        self._writer = SummaryWriter(log_dir=log_dir)
        plt.ioff()

        #Change the stage type here as well. 
    def set_stage(self, stage: Stage):
        self.stage = stage
        return self

    def flush(self):
        self._writer.flush()

    @staticmethod
    def _validate_log_dir(log_dir, create=True):
        log_dir = Path(log_dir).resolve()
        if log_dir.exists():
            return
        elif not log_dir.exists() and create:
            log_dir.mkdir(parents=True)
        else:
            raise NotADirectoryError(f'log_dir {log_dir} does not exist.')
      #Now that we've set the stage as an enum, we want to print out the stage name, not its value. 
      # We do this by chaining the name() method to the stage variable.
    def add_batch_metric(self, name: str, value: Real, step: int):
       # tag = f'{self.stage}/batch/{name}'
        tag = f'{self.stage.name}/batch/{name}'
        self._writer.add_scalar(tag, value, step)

        
    def add_epoch_metric(self, name: str, value: Real, step: int):
        
   
      # Again, add the .name attribute. 
      #  tag = f'{self.stage}/epoch/{name}'
        tag = f'{self.stage.name}/epoch/{name}'
        self._writer.add_scalar(tag, value, step)

    def add_epoch_confusion_matrix(self, y_true: list[np.array], y_pred: list[np.array], step: int):
        y_true, y_pred = self.collapse_batches(y_true, y_pred)
        fig = self.create_confusion_matrix(y_true, y_pred, step)
        # Add the name attribute to the stage variable here as well. 
        # tag = f'{self.stage}/epoch/confusion_matrix'
        tag = f'{self.stage.name}/epoch/confusion_matrix'

        self._writer.add_figure(tag, fig, step)

    @staticmethod
    def collapse_batches(y_true: list[np.array], y_pred: list[np.array]) -> Tuple[np.ndarray, np.ndarray]:
        return np.concatenate(y_true), np.concatenate(y_pred)

    def create_confusion_matrix(self, y_true: np.array, y_pred: np.array, step: int) -> plt.Figure:
        cm = ConfusionMatrixDisplay(confusion_matrix(y_true, y_pred)).plot(cmap='Blues')
        fig: plt.Figure = cm.figure_
        ax: plt.Axes = cm.ax_
        ax.set_title(f'{self.stage.title()} Epoch: {step}')
        return fig

    def add_hparams(self, hparams: dict[str, Union[str, Real]], metrics: dict[str, Real]):
        _metrics = self._validate_hparam_metric_keys(metrics)
        self._writer.add_hparams(hparams, _metrics)

    @staticmethod
    def _validate_hparam_metric_keys(metrics):
        _metrics = metrics.copy()
        prefix = 'hparam/'
        for name in _metrics.keys():
            if not name.startswith(prefix):
                _metrics[f'{prefix}{name}'] = _metrics[name]
                del _metrics[name]
        return _metrics
   
    

A further refactoring can be done here.  There's no need to have the ExperimentTracker and TensorboardExperiment classes in teh same file.  A better organizational structure would be to move the TensorboardExperiment class into its own .py file. 
Here's what the new tracking.py file looks like:

In [None]:
from enum import Enum, auto
from pathlib import Path
from typing import Protocol


import numpy as np


class Stage(Enum):
    TRAIN = auto()
    TEST = auto()
    VAL = auto()


class ExperimentTracker(Protocol):
    def set_stage(self, stage: Stage):
        """Sets the current stage of the experiment."""

    def add_batch_metric(self, name: str, value: float, step: int):
        """Implements logging a batch-level metric."""

    def add_epoch_metric(self, name: str, value: float, step: int):
        """Implements logging a epoch-level metric."""

    def add_epoch_confusion_matrix(
        self, y_true: list[np.array], y_pred: list[np.array], step: int
    ):
        """Implements logging a confusion matrix at epoch-level."""


Here's what the new tensorboard.py file looks like:

In [None]:
from pathlib import Path

import numpy as np
from matplotlib import pyplot as plt
from sklearn.metrics import ConfusionMatrixDisplay, confusion_matrix
from torch.utils.tensorboard import SummaryWriter

from ds.tracking import Stage
from ds.utils import create_experiment_log_dir

#Don't forget to import Stage from the tracking module as we need it for the TensorboardExperiment implementation. 
from src.tracking import Stage


class TensorboardExperiment:
    def __init__(self, log_path: str, create: bool = True):

        log_dir = create_experiment_log_dir(root=log_path)
        self.stage = Stage.TRAIN
        self._validate_log_dir(log_dir, create=create)
        self._writer = SummaryWriter(log_dir=log_dir)
        plt.ioff()

    def set_stage(self, stage: Stage):
        self.stage = stage

    def flush(self):
        self._writer.flush()

    @staticmethod
    def _validate_log_dir(log_dir: str, create: bool = True):
        log_path = Path(log_dir).resolve()
        if log_path.exists():
            return
        elif not log_path.exists() and create:
            log_path.mkdir(parents=True)
        else:
            raise NotADirectoryError(f"log_dir {log_dir} does not exist.")

    def add_batch_metric(self, name: str, value: float, step: int):
        tag = f"{self.stage.name}/batch/{name}"
        self._writer.add_scalar(tag, value, step)

    def add_epoch_metric(self, name: str, value: float, step: int):
        tag = f"{self.stage.name}/epoch/{name}"
        self._writer.add_scalar(tag, value, step)

    def add_epoch_confusion_matrix(
        self, y_true: list[np.array], y_pred: list[np.array], step: int
    ):
        y_true, y_pred = self.collapse_batches(y_true, y_pred)
        fig = self.create_confusion_matrix(y_true, y_pred, step)
        tag = f"{self.stage.name}/epoch/confusion_matrix"
        self._writer.add_figure(tag, fig, step)

    @staticmethod
    def collapse_batches(
        y_true: list[np.array], y_pred: list[np.array]
    ) -> tuple[np.ndarray, np.ndarray]:
        return np.concatenate(y_true), np.concatenate(y_pred)

    def create_confusion_matrix(
        self, y_true: list[np.array], y_pred: list[np.array], step: int
    ) -> plt.Figure:
        cm = ConfusionMatrixDisplay(confusion_matrix(y_true, y_pred)).plot(cmap="Blues")
        cm.ax_.set_title(f"{self.stage.name} Epoch: {step}")
        return cm.figure_


Note that splitting these two files from the original tracking.py file makes both of the new files, easier to read and understand. 

Don't forget that the main.py file needs to change some import statements because we moved out the Experiment Tracker and the Tensorboard Experiment classes. 

In [None]:
import numpy as np
import torch
from sklearn.metrics import accuracy_score
from tqdm import tqdm

from src.dataset import get_train_dataloader, get_test_dataloader
from src.metrics import Metric
from src.models import LinearNet
from src.tracking import Stage
# We need a new import for the TensorboardExperiment class in our main.py file. 
from src.tensorboard import TensorboardExperiment
from src.utils import generate_tensorboard_experiment_directory

# ... Rest of main.py

## Explicit vs. implicit typing

One thing you may have noticed is that we use both the instrisic float type for our numeric types as well as the Real type. Often times in the code we see these types being used interchangeably, i.e. implicit casting of types.  This is usually not a good coding practice.  It would be better if we were consistent with our numeric types. The code can get away with this because Python will implicitly cast Real types to floats.  But let's fix this problem so that we use explicit typing for all of our variables. 

In looking through the code, we see that we have the Real Number type used in a number of files, including metric.py and tracking.py.  Let's change those. 

In [None]:
# We no longer need this import as we're not going to use the Real type any longer. 
# from numbers import Real


class Metric:
    # Change Real to float.
    # values: list[Real]
    values: list[float]
    running_total: float
    num_updates: float
    average: float

    def __init__(self):
        self.reset()

    def __str__(self):
        return f"Metric(average={self.average:0.4f})"

 # Change real param to float type.    
    # def update(self, value: Real, batch_size: int):
    def update (self, value: float, batch_size: int):
        self.values.append(value)
        self.running_total += value * batch_size
        self.num_updates += batch_size
        self.average = self.running_total / self.num_updates

    def reset(self):
        
        #Change Real type to float
        # self.values: list[Real] = []
        self.values: list[float] = []
        self.running_total: float = 0.0
        self.num_updates: float = 0.0
        self.average: float = 0.0
