In [None]:
%load_ext autoreload
%autoreload 2

# Utils

> Library-wide utility classes and functions used within the `BLURR` library.

In [None]:
# |default_exp utils
# |default_cls_lvl 3

In [None]:
# |export
from __future__ import annotations

import gc, importlib, sys, traceback

from accelerate.logging import get_logger
from dotenv import load_dotenv
from fastai.callback.all import *
from fastai.imports import *
from fastai.learner import *
from fastai.losses import BaseLoss, BCEWithLogitsLossFlat, CrossEntropyLossFlat
from fastai.test_utils import show_install
from fastai.torch_core import *
from fastai.torch_imports import *
from transformers import (
    AutoConfig,
    AutoTokenizer,
    PretrainedConfig,
    PreTrainedTokenizerBase,
    PreTrainedModel,
)
from transformers import logging as hf_logging

In [None]:
# |hide
import pdb, nbdev

from fastcore.test import *

In [None]:
# |export
# silence all the HF warnings and load environment variables
os.environ["TOKENIZERS_PARALLELISM"] = "false"
warnings.simplefilter("ignore")
hf_logging.set_verbosity_error()
logger = get_logger(__name__)

load_dotenv()

False

In [None]:
# |hide
# |notest
torch.cuda.set_device(0)
print(f"Using GPU #{torch.cuda.current_device()}: {torch.cuda.get_device_name()}")

Using GPU #0: NVIDIA GeForce RTX 3090


## Defaults

In [None]:
# | export
DEFAULT_SEED = int(os.getenv("RANDOM_SEED", 2023))

## General

Inclues an implementation of the `Singleton` pattern that can be used as a python decorator.  Use this above any class to turn that class into a singleton (see [here](https://python-3-patterns-idioms-test.readthedocs.io/en/latest/Singleton.html) for more info on the singleton pattern).

In [None]:
# |export
class Singleton:
    def __init__(self, cls):
        self._cls, self._instance = cls, None

    def __call__(self, *args, **kwargs):
        if self._instance == None:
            self._instance = self._cls(*args, **kwargs)
        return self._instance

In [None]:
@Singleton
class TestSingleton:
    pass


a = TestSingleton()
b = TestSingleton()
test_eq(a, b)

In [None]:
# |export
def str_to_type(
    typename: str,
) -> type:  # The name of a type as a string  # Returns the actual type
    "Converts a type represented as a string to the actual class"
    return getattr(sys.modules[__name__], typename)

In [None]:
nbdev.show_doc(str_to_type)

---

[source](https://github.com/ohmeow/blurr/blob/dev-3.0.0 #master/blurr/utils.py#L53){target="_blank" style="float:right; font-size:smaller"}

### str_to_type

>      str_to_type (typename:str)

Converts a type represented as a string to the actual class

|    | **Type** | **Details** |
| -- | -------- | ----------- |
| typename | str |  |
| **Returns** | **type** | **The name of a type as a string  # Returns the actual type** |

In [None]:
print(str_to_type("test_eq"))
print(str_to_type("TestSingleton"))

<function test_eq>
<__main__.Singleton object>


In [None]:
# |export
# see the following threads for more info:
# - https://forums.fast.ai/t/solved-reproducibility-where-is-the-randomness-coming-in/31628?u=wgpubs
# - https://docs.fast.ai/dev/test.html#getting-reproducible-results
def set_seed(seed_value: int = 2023):
    """This needs to be ran before creating your DataLoaders, before creating your Learner, and before each call
    to your fit function to help ensure reproducibility.
    """
    np.random.seed(seed_value)  # cpu vars
    torch.manual_seed(seed_value)  # cpu vars
    random.seed(seed_value)  # python

    if torch.cuda.is_available():
        torch.cuda.manual_seed(seed_value)
        torch.cuda.manual_seed_all(seed_value)  # gpu vars
        torch.backends.cudnn.deterministic = True  # needed
        torch.backends.cudnn.benchmark = False

In [None]:
nbdev.show_doc(set_seed, title_level=3)

---

[source](https://github.com/ohmeow/blurr/blob/dev-3.0.0 #master/blurr/utils.py#L63){target="_blank" style="float:right; font-size:smaller"}

### set_seed

>      set_seed (seed_value:int=2023)

This needs to be ran before creating your DataLoaders, before creating your Learner, and before each call
to your fit function to help ensure reproducibility.

In [None]:
set_seed(DEFAULT_SEED)

## Development Environment

In [None]:
# |export
def print_versions(
    # A string of space delimited package names or a list of package names
    packages: str
    | list[str],
):
    """Prints the name and version of one or more packages in your environment"""
    packages = packages.split(" ") if isinstance(packages, str) else packages

    for item in packages:
        item = item.strip()
        print(f"{item}: {importlib.import_module(item).__version__}")

In [None]:
nbdev.show_doc(print_versions, title_level=3)

---

[source](https://github.com/ohmeow/blurr/blob/dev-3.0.0 #master/blurr/utils.py#L78){target="_blank" style="float:right; font-size:smaller"}

### print_versions

>      print_versions (packages:str|list[str])

Prints the name and version of one or more packages in your environment

|    | **Type** | **Details** |
| -- | -------- | ----------- |
| packages | str \| list[str] | A string of space delimited package names or a list of package names |

In [None]:
print_versions("torch transformers fastai")
print("---")
print_versions(["torch", "transformers", "fastai"])

torch: 1.13.1
transformers: 4.26.1
fastai: 2.7.11
---
torch: 1.13.1
transformers: 4.26.1
fastai: 2.7.11


In [None]:
# | export
def print_dev_environment():
    """Provides details on your development environment including packages installed, cuda/cudnn availability, GPUs, etc."""
    print(show_install())

In [None]:
nbdev.show_doc(print_dev_environment, title_level=3)

---

[source](https://github.com/ohmeow/blurr/blob/dev-3.0.0 #master/blurr/utils.py#L91){target="_blank" style="float:right; font-size:smaller"}

### print_dev_environment

>      print_dev_environment ()

Provides details on your development environment including packages installed, cuda/cudnn availability, GPUs, etc.

## Memory Management

In [None]:
# |export
def clean_ipython_hist():
    # Code in this function mainly copied from IPython source
    if not "get_ipython" in globals():
        return

    ip = get_ipython()
    user_ns = ip.user_ns
    ip.displayhook.flush()
    pc = ip.displayhook.prompt_count + 1

    for n in range(1, pc):
        user_ns.pop("_i" + repr(n), None)

    user_ns.update(dict(_i="", _ii="", _iii=""))
    hm = ip.history_manager
    hm.input_hist_parsed[:] = [""] * pc
    hm.input_hist_raw[:] = [""] * pc
    hm._i = hm._ii = hm._iii = hm._i00 = ""

In [None]:
# |export
def clean_tb():
    # h/t Piotr Czapla
    if hasattr(sys, "last_traceback"):
        traceback.clear_frames(sys.last_traceback)
        delattr(sys, "last_traceback")
    if hasattr(sys, "last_type"):
        delattr(sys, "last_type")
    if hasattr(sys, "last_value"):
        delattr(sys, "last_value")

In [None]:
# | export
def clean_memory(
    # The fastai learner to delete
    learn: Learner = None,
):
    """A function which clears gpu memory."""
    if learn is not None:
        del learn
    clean_tb()
    clean_ipython_hist()
    torch.cuda.empty_cache()
    gc.collect()

In [None]:
nbdev.show_doc(clean_memory)

---

[source](https://github.com/ohmeow/blurr/blob/dev-3.0.0 #master/blurr/utils.py#L127){target="_blank" style="float:right; font-size:smaller"}

### clean_memory

>      clean_memory (learn:fastai.learner.Learner=None)

A function which clears gpu memory.

|    | **Type** | **Default** | **Details** |
| -- | -------- | ----------- | ----------- |
| learn | Learner | None | The fastai learner to delete |

In [None]:
clean_memory()

## Loss functions

In [None]:
# |export
class PreCalculatedLoss(BaseLoss):
    """
    If you want to let your Hugging Face model calculate the loss for you, make sure you include the `labels` argument in your inputs and use
    `PreCalculatedLoss` as your loss function. Even though we don't really need a loss function per se, we have to provide a custom loss class/function
    for fastai to function properly (e.g. one with a `decodes` and `activation` methods).  Why?  Because these methods will get called in methods
    like `show_results` to get the actual predictions.

    Note: The Hugging Face models ***will always*** calculate the loss for you ***if*** you pass a `labels` dictionary along with your other inputs
    (so only include it if that is what you intend to happen)
    """

    def __call__(self, inp, targ, **kwargs):
        return tensor(0.0)


class PreCalculatedCrossEntropyLoss(PreCalculatedLoss, CrossEntropyLossFlat):
    pass


class PreCalculatedBCELoss(PreCalculatedLoss, BCEWithLogitsLossFlat):
    pass


class PreCalculatedMSELoss(PreCalculatedLoss):
    def __init__(self, *args, axis=-1, floatify=True, **kwargs):
        super().__init__(nn.MSELoss, *args, axis=axis, floatify=floatify, is_2d=False, **kwargs)

In [None]:
# |export
class MultiTargetLoss(Module):
    """
    Provides the ability to apply different loss functions to multi-modal targets/predictions.

    This new loss function can be used in many other multi-modal architectures, with any mix of loss functions.
    For example, this can be ammended to include the `is_impossible` task, as well as the start/end token tasks
    in the SQUAD v2 dataset (or in any extractive question/answering task)
    """

    def __init__(
        self,
        # The loss function for each target
        loss_classes: list[Callable] = [CrossEntropyLossFlat, CrossEntropyLossFlat],
        # Any kwargs you want to pass to the loss functions above
        loss_classes_kwargs: list[dict] = [{}, {}],
        # The weights you want to apply to each loss (default: [1,1])
        weights: list[float] | list[int] = [1, 1],
        # The `reduction` parameter of the lass function (default: 'mean')
        reduction: str = "mean",
    ):
        loss_funcs = [cls(reduction=reduction, **kwargs) for cls, kwargs in zip(loss_classes, loss_classes_kwargs)]
        store_attr(self=self, names="loss_funcs, weights")
        self._reduction = reduction

    # custom loss function must have either a reduction attribute or a reduction argument (like all fastai and
    # PyTorch loss functions) so that the framework can change this as needed (e.g., when doing lear.get_preds
    # it will set = 'none'). see this forum topic for more info: https://bit.ly/3br2Syz
    @property
    def reduction(self):
        return self._reduction

    @reduction.setter
    def reduction(self, v):
        self._reduction = v
        for lf in self.loss_funcs:
            lf.reduction = v

    def forward(self, outputs, *targets):
        loss = 0.0
        for i, loss_func, weights, output, target in zip(range(len(outputs)), self.loss_funcs, self.weights, outputs, targets):
            loss += weights * loss_func(output, target)

        return loss

    def activation(self, outs):
        acts = [self.loss_funcs[i].activation(o) for i, o in enumerate(outs)]
        return acts

    def decodes(self, outs):
        decodes = [self.loss_funcs[i].decodes(o) for i, o in enumerate(outs)]
        return decodes

## Hugging Face

In [None]:
# |export
def get_hf_objects(
    pretrained_model_name_or_path: str | os.PathLike,
    model_cls: PreTrainedModel,
    config: PretrainedConfig | str | os.PathLike = None,
    tokenizer_cls: PreTrainedTokenizerBase = None,
    config_kwargs: dict = {},
    tokenizer_kwargs: dict = {},
    model_kwargs: dict = {},
    cache_dir: str | os.PathLike = None,
) -> tuple[str, PretrainedConfig, PreTrainedTokenizerBase, PreTrainedModel]:
    """
    Given at minimum a `pretrained_model_name_or_path` and `model_cls (such as
    `AutoModelForSequenceClassification"), this method returns all the Hugging Face objects you need to train
    a model using Blurr
    """
    # config
    if config is None:
        config = AutoConfig.from_pretrained(pretrained_model_name_or_path, cache_dir=cache_dir, **config_kwargs)

    # tokenizer (gpt2, roberta, bart (and maybe others) tokenizers require a prefix space)
    if any(s in pretrained_model_name_or_path for s in ["gpt2", "roberta", "bart", "longformer"]):
        tokenizer_kwargs = {**{"add_prefix_space": True}, **tokenizer_kwargs}

    if tokenizer_cls is None:
        tokenizer = AutoTokenizer.from_pretrained(pretrained_model_name_or_path, cache_dir=cache_dir, **tokenizer_kwargs)
    else:
        tokenizer = tokenizer_cls.from_pretrained(pretrained_model_name_or_path, cache_dir=cache_dir, **tokenizer_kwargs)

    # model
    model = model_cls.from_pretrained(pretrained_model_name_or_path, config=config, cache_dir=cache_dir, **model_kwargs)

    # arch
    try:
        arch = model.__module__.split(".")[2]
    except:
        arch = "unknown"

    return (arch, config, tokenizer, model)

In [None]:
nbdev.show_doc(get_hf_objects, title_level=2)

---

[source](https://github.com/ohmeow/blurr/blob/dev-3.0.0 #master/blurr/utils.py#L228){target="_blank" style="float:right; font-size:smaller"}

## get_hf_objects

>      get_hf_objects (pretrained_model_name_or_path:str|os.PathLike,
>                      model_cls:transformers.modeling_utils.PreTrainedModel, co
>                      nfig:transformers.configuration_utils.PretrainedConfig|st
>                      r|os.PathLike=None, tokenizer_cls:transformers.tokenizati
>                      on_utils_base.PreTrainedTokenizerBase=None,
>                      config_kwargs:dict={}, tokenizer_kwargs:dict={},
>                      model_kwargs:dict={}, cache_dir:str|os.PathLike=None)

Given at minimum a `pretrained_model_name_or_path` and `model_cls (such as
`AutoModelForSequenceClassification"), this method returns all the Hugging Face objects you need to train
a model using Blurr

In [None]:
from transformers import AutoModelForMaskedLM

hf_logging.set_verbosity_error()

arch, config, tokenizer, model = get_hf_objects("bert-base-cased-finetuned-mrpc", model_cls=AutoModelForMaskedLM)

test_eq(arch, "bert")
test_eq(model.name_or_path, "bert-base-cased-finetuned-mrpc")
test_eq(tokenizer.name_or_path, "bert-base-cased-finetuned-mrpc")
test_eq(config._name_or_path, "bert-base-cased-finetuned-mrpc")

print(arch)
print(type(config))
print(type(tokenizer))
print(type(model))

bert
<class 'transformers.models.bert.configuration_bert.BertConfig'>
<class 'transformers.models.bert.tokenization_bert_fast.BertTokenizerFast'>
<class 'transformers.models.bert.modeling_bert.BertForMaskedLM'>


In [None]:
from transformers import AutoModelForQuestionAnswering

hf_logging.set_verbosity_error()

arch, config, tokenizer, model = get_hf_objects("distilbert-base-cased-distilled-squad", model_cls=AutoModelForQuestionAnswering)

test_eq(arch, "distilbert")
test_eq(model.name_or_path, "distilbert-base-cased-distilled-squad")
test_eq(tokenizer.name_or_path, "distilbert-base-cased-distilled-squad")
test_eq(config._name_or_path, "distilbert-base-cased-distilled-squad")


print(arch)
print(type(config))
print(type(tokenizer))
print(type(model))

distilbert
<class 'transformers.models.distilbert.configuration_distilbert.DistilBertConfig'>
<class 'transformers.models.distilbert.tokenization_distilbert_fast.DistilBertTokenizerFast'>
<class 'transformers.models.distilbert.modeling_distilbert.DistilBertForQuestionAnswering'>


In [None]:
from transformers import BertTokenizer, BertForNextSentencePrediction

hf_logging.set_verbosity_error()

arch, config, tokenizer, model = get_hf_objects(
    "bert-base-cased-finetuned-mrpc",
    config=None,
    tokenizer_cls=BertTokenizer,
    model_cls=BertForNextSentencePrediction,
)

test_eq(arch, "bert")
test_eq(model.name_or_path, "bert-base-cased-finetuned-mrpc")
test_eq(tokenizer.name_or_path, "bert-base-cased-finetuned-mrpc")
test_eq(config._name_or_path, "bert-base-cased-finetuned-mrpc")

print(arch)
print(type(config))
print(type(tokenizer))
print(type(model))

bert
<class 'transformers.models.bert.configuration_bert.BertConfig'>
<class 'transformers.models.bert.tokenization_bert.BertTokenizer'>
<class 'transformers.models.bert.modeling_bert.BertForNextSentencePrediction'>


## Export -

In [None]:
# | hide
import nbdev

nbdev.nbdev_export()