# Linear example notebook

In this notebook, we'll go throuhg some exampels on how to use `alma`, as well as how one can configure it to one's own use case.

## benchmark_model

`benchmark_model` is the core API for `alma`. It allows one to benchmark's one's model speed on given data for as all of the conversion options that `alma` supports.

We'll start with just initializing a model, and creating some data we'll pasd through it for our benchmarking.

In [1]:
import logging
from typing import Any, Dict

import torch

from alma.utils.setup_logging import setup_logging

# Set up logging. This will be sidscussed for in a later section.
setup_logging(log_file=None, level="INFO")

# Set the device one wants to benchmark on
device = torch.device("cuda") if torch.cuda.is_available() else torch.device("cpu")

# Create a random model
model = torch.nn.Sequential(
        torch.nn.Linear(3, 3),
        torch.nn.ReLU(),
    )

# Create a random tensor
data = torch.rand(1, 512, 3)

Then, we can start messing around with `alma`. We'll begin with defining a very simple benchmark config, which will tell `alma` how you would like to benchmark your model. 

In [2]:
from alma.benchmark import BenchmarkConfig
from alma.benchmark.log import display_all_results
from alma.benchmark_model import benchmark_model

# Set up the benchmarking configuration
config = BenchmarkConfig(
    n_samples=1024,  # Total nb of samples to benchmark on
    batch_size=64,  # Batch size
    device=device,  # The device to benchmark on
)

# What conversion methods to benchmark. In this case, it is just "eager", which is the default forward call, and 
# "jit trace", which is a jit-traced model.
conversions = ["EAGER", "JIT_TRACE"]

# Benchmark the model
results: Dict[str, Dict[str, Any]] = benchmark_model(
    model, config, conversions, data=data.squeeze()  # The batch dimension should be squeezed away
)

# Display the results
display_all_results(
    results, display_function=print, include_errors=True, include_traceback_for_errors=False
)

as the model will be pickled and sent to each child process, which will require the model to be stored in memory
twice. If the model is large, this may cause memory issues. Consider using a callable to return the model, which
will be created in each child process, rather than the parent process. See `examples/mnist/mem_efficient_benchmark_rand_tensor.py`
for an example.
INFO: [benchmark_model.py:112] Benchmarking model using conversion: EAGER
INFO: [device.py:163] Chosen device: mps (Fallback selection)
Benchmarking EAGER on mps:  94%|█████████▍| 15/16 [00:00<00:00, 1201.62it/s]
INFO: [benchmark_model.py:112] Benchmarking model using conversion: JIT_TRACE
INFO: [device.py:163] Chosen device: mps (Fallback selection)
Benchmarking JIT_TRACE on mps:  94%|█████████▍| 15/16 [00:00<00:00, 1435.42it/s]




All results:
EAGER results:
Device: mps
Total elapsed time: 0.0142 seconds
Total inference time (model only): 0.0010 seconds
Total samples: 1024 - Batch size: 64
Throughput: 1030868.68 samples/second


JIT_TRACE results:
Device: mps
Total elapsed time: 0.0114 seconds
Total inference time (model only): 0.0011 seconds
Total samples: 1024 - Batch size: 64
Throughput: 966456.23 samples/second




It's that simple! You can also feed in the config as a dict if you prefer, but using the `BenchmarkConfig` will give you integrated type hinting. For example, one could do:

In [3]:
# Set up the benchmarking configuration
config = {
    "n_samples": 1024,  # Total nb of samples to benchmark on
    "batch_size": 64,  # Batch size
    "device": device,  # The device to benchmark on
}

conversions = ["EAGER", "JIT_TRACE"]

# Benchmark the model
results: Dict[str, Dict[str, Any]] = benchmark_model(
    model, config, conversions, data=data.squeeze()
)

# Display the results
display_all_results(
    results, display_function=print, include_errors=True, include_traceback_for_errors=False
)

as the model will be pickled and sent to each child process, which will require the model to be stored in memory
twice. If the model is large, this may cause memory issues. Consider using a callable to return the model, which
will be created in each child process, rather than the parent process. See `examples/mnist/mem_efficient_benchmark_rand_tensor.py`
for an example.
INFO: [benchmark_model.py:112] Benchmarking model using conversion: EAGER
INFO: [device.py:163] Chosen device: mps (Fallback selection)
Benchmarking EAGER on mps:  94%|█████████▍| 15/16 [00:00<00:00, 1360.67it/s]
INFO: [benchmark_model.py:112] Benchmarking model using conversion: JIT_TRACE
INFO: [device.py:163] Chosen device: mps (Fallback selection)
Benchmarking JIT_TRACE on mps:  94%|█████████▍| 15/16 [00:00<00:00, 1396.92it/s]




All results:
EAGER results:
Device: mps
Total elapsed time: 0.0124 seconds
Total inference time (model only): 0.0009 seconds
Total samples: 1024 - Batch size: 64
Throughput: 1152399.16 samples/second


JIT_TRACE results:
Device: mps
Total elapsed time: 0.0116 seconds
Total inference time (model only): 0.0011 seconds
Total samples: 1024 - Batch size: 64
Throughput: 974388.06 samples/second




## Understanding the config

Now let's dig in a little deeper into the `config` options. If we print out all of the fields, we see this:

In [4]:
def print_pydantic_fields(model_class):
    """
    Print all fields of a Pydantic model class in a pretty format.
    Usage: print_pydantic_fields(YourModelClass)
    """
    print(f"\n{'='*20} {model_class.__name__} Fields {'='*20}")
    
    for name, field in model_class.model_fields.items():
        field_type = field.annotation.__name__ if hasattr(field.annotation, '__name__') else str(field.annotation)
        default = field.default if field.default is not ... else "Required"
        
        print(f"\nField: {name}; Type {field_type}; Default: {default}; Description: {field.description}")

print_pydantic_fields(BenchmarkConfig)



Field: n_samples; Type int; Default: 128; Description: Number of samples to benchmark.

Field: batch_size; Type int; Default: 128; Description: Batch size for benchmarking.

Field: multiprocessing; Type bool; Default: True; Description: Enable multiprocessing support.

Field: fail_on_error; Type bool; Default: True; Description: Fail immediately on any error.

Field: allow_device_override; Type bool; Default: True; Description: Allow device override selection.

Field: allow_cuda; Type bool; Default: True; Description: Allow CUDA acceleration if available.

Field: allow_mps; Type bool; Default: True; Description: Allow MPS acceleration if available.

Field: device; Type Optional; Default: None; Description: Device for benchmarking.



We can see some new fields:
- multiprocessing
- fail_on_error
- allow_device_override
- allow_cuda
- allow_mps

`multiprocessing` is a boolean that defines whether or not we should run each conversion method benchmarking of a child process. This means that we spin up a new Python interpreter instance (internally inside of `alma`) for each conversion method, and this allows each method to not affect the others. As we were developing `alma`, we noticed that some conversion methods (e.g. `optimum quanto`) affect the global torch state, and multiprocessing was the solution we came up with for isolating each methods's environment. It is True by default, however fell free to turn it off, especially if debugging!

`fail_on_error` just defines whether we fail gracefully or not. Some conversion methods will not work on certain hardware, or because of missing dependencies, etc. We can either stop as soon as we encounter an error, or keep going. If we keep going, the error message and traceback will be returned.

`allow_device_override` is a boolean that defines whether or not we will allow `alma` to move conversion methods to specific devices, if the conversion method in question only works on that device. E.g. `ONNX_CPU` will fail on GPU, as will PyTorch's native converted quantized models which are CPU only: `NATIVE_CONVERT_AI8WI8_STATIC_QUANTIZED`. This is `True` by default, but it is very much up to the user. If you want the methods to fail if not compatiblewith `device`, then set this to `False`. If you want `alma` to automatically move the method to the appropriate device, leave it as `True`.

`allow_cuda` and `allow_mps` are guides on which device to fallback to in case `device` fails to run the conversion method in question. If `allow_cuda=True` and CUDA is available, then it will default to cuda. If not, then it will similarly check `mps`.

In [5]:

# Set up the benchmarking configuration
config = BenchmarkConfig(
    n_samples=1024,  # Total nb of samples to benchmark on
    batch_size=64,  # Batch size
    device=device,  # The device to benchmark on
    multiprocessing=True,  # If True, we test each method in its own isolated environment,
    # which helps keep methods from contaminating the global torch state
    fail_on_error=False,  # If False, we fail gracefully and keep testing other methods
    allow_device_override=False,  # No overriding of device for any conversion method
    allow_cuda=True,  # Does nothing without `allow_device_override`
    allow_mps=True,  # Does nothing without `allow_device_override`
)

# Benchmark the model
results: Dict[str, Dict[str, Any]] = benchmark_model(
    model, config, conversions, data=data.squeeze()
)

# Display the results
display_all_results(
    results, display_function=print, include_errors=True, include_traceback_for_errors=False
)

as the model will be pickled and sent to each child process, which will require the model to be stored in memory
twice. If the model is large, this may cause memory issues. Consider using a callable to return the model, which
will be created in each child process, rather than the parent process. See `examples/mnist/mem_efficient_benchmark_rand_tensor.py`
for an example.
INFO: [benchmark_model.py:112] Benchmarking model using conversion: EAGER
INFO: [device.py:163] Chosen device: mps (Fallback selection)
Benchmarking EAGER on mps:  94%|█████████▍| 15/16 [00:00<00:00, 1201.23it/s]
INFO: [benchmark_model.py:112] Benchmarking model using conversion: JIT_TRACE
INFO: [device.py:163] Chosen device: mps (Fallback selection)
Benchmarking JIT_TRACE on mps:  94%|█████████▍| 15/16 [00:00<00:00, 2218.27it/s]




All results:
EAGER results:
Device: mps
Total elapsed time: 0.0139 seconds
Total inference time (model only): 0.0011 seconds
Total samples: 1024 - Batch size: 64
Throughput: 904560.08 samples/second


JIT_TRACE results:
Device: mps
Total elapsed time: 0.0076 seconds
Total inference time (model only): 0.0010 seconds
Total samples: 1024 - Batch size: 64
Throughput: 1056079.46 samples/second




## Testing all conversion methods

If one sets `conversions=None`, then by default all of the supported conversion methods will be tested. To see all of the supported methods, one can import them. Printing them will show that eahc option has a name, and an optional device_override field that tells us if there is only a specific hardware that it runs on and that it should move to if `allow_device_override=True` in the `config`.

In [6]:
from alma.conversions.conversion_options import MODEL_CONVERSION_OPTIONS

for index, value in MODEL_CONVERSION_OPTIONS.items():
    print(f"{index}: {value}")

0: mode='EAGER' device_override=None
1: mode='EXPORT+EAGER' device_override=None
2: mode='ONNX_CPU' device_override='CPU'
3: mode='ONNX_GPU' device_override='CUDA'
4: mode='ONNX+DYNAMO_EXPORT' device_override=None
5: mode='COMPILE_CUDAGRAPHS' device_override='CUDA'
6: mode='COMPILE_INDUCTOR_DEFAULT' device_override=None
7: mode='COMPILE_INDUCTOR_REDUCE_OVERHEAD' device_override=None
8: mode='COMPILE_INDUCTOR_MAX_AUTOTUNE' device_override=None
9: mode='COMPILE_INDUCTOR_EAGER_FALLBACK' device_override=None
10: mode='COMPILE_ONNXRT' device_override='CUDA'
11: mode='COMPILE_OPENXLA' device_override='XLA_GPU'
12: mode='COMPILE_TVM' device_override=None
13: mode='EXPORT+AI8WI8_FLOAT_QUANTIZED' device_override=None
14: mode='EXPORT+AI8WI8_FLOAT_QUANTIZED+RUN_DECOMPOSITION' device_override=None
15: mode='EXPORT+AI8WI8_STATIC_QUANTIZED' device_override=None
16: mode='EXPORT+AI8WI8_STATIC_QUANTIZED+RUN_DECOMPOSITION' device_override=None
17: mode='EXPORT+AOT_INDUCTOR' device_override=None
18: mo

## Error handling
Let's see what happens if we fail gracefully with an error. This example should fail for everybody, where we set the device as cuda but attempt to run the `NATIVE_CONVERT_AI8WI8_STATIC_QUANTIZED` method which is CPU only.

In [7]:
# Set up the benchmarking configuration
config = BenchmarkConfig(
    n_samples=1024,  # Total nb of samples to benchmark on
    batch_size=64,  # Batch size
    device=torch.device("cuda"),  # The device to benchmark on
    multiprocessing=True,  # If True, we test each method in its own isolated environment,
    # which helps keep methods from contaminating the global torch state
    fail_on_error=False,  # If False, we fail gracefully and keep testing other methods
    allow_device_override=False,  # No overriding of device for any conversion method
    allow_cuda=True,  # Does nothing without `allow_device_override`
    allow_mps=True,  # Does nothing without `allow_device_override`
)

# We choose a conversion method that cannot work on GPU
conversions = ["EAGER", "NATIVE_CONVERT_AI8WI8_STATIC_QUANTIZED"]

# Benchmark the model
results: Dict[str, Dict[str, Any]] = benchmark_model(
    model, config, conversions, data=data.squeeze()
)

# Display the results
display_all_results(
    results, display_function=print, include_errors=True, include_traceback_for_errors=False
)

as the model will be pickled and sent to each child process, which will require the model to be stored in memory
twice. If the model is large, this may cause memory issues. Consider using a callable to return the model, which
will be created in each child process, rather than the parent process. See `examples/mnist/mem_efficient_benchmark_rand_tensor.py`
for an example.
INFO: [benchmark_model.py:112] Benchmarking model using conversion: EAGER
INFO: [device.py:163] Chosen device: mps (Fallback selection)
Benchmarking EAGER on mps:  94%|█████████▍| 15/16 [00:00<00:00, 1111.11it/s]
INFO: [benchmark_model.py:112] Benchmarking model using conversion: NATIVE_CONVERT_AI8WI8_STATIC_QUANTIZED
INFO: [device.py:163] Chosen device: mps (Fallback selection)
ERROR: [benchmark_model.py:139] Benchmarking conversion NATIVE_CONVERT_AI8WI8_STATIC_QUANTIZED failed.




All results:
EAGER results:
Device: mps
Total elapsed time: 0.0148 seconds
Total inference time (model only): 0.0010 seconds
Total samples: 1024 - Batch size: 64
Throughput: 1042321.93 samples/second


NATIVE_CONVERT_AI8WI8_STATIC_QUANTIZED results:
Benchmarking failed




We can see that the `NATIVE_CONVERT_AI8WI8_STATIC_QUANTIZED` method failed. If we want to get full details on why, we can access the traceback.

In [8]:
print(results["NATIVE_CONVERT_AI8WI8_STATIC_QUANTIZED"]["traceback"])

Traceback (most recent call last):
  File "/opt/homebrew/Caskroom/miniconda/base/envs/alma/lib/python3.10/runpy.py", line 196, in _run_module_as_main
    return _run_code(code, main_globals, None,
  File "/opt/homebrew/Caskroom/miniconda/base/envs/alma/lib/python3.10/runpy.py", line 86, in _run_code
    exec(code, run_globals)
  File "/opt/homebrew/Caskroom/miniconda/base/envs/alma/lib/python3.10/site-packages/ipykernel_launcher.py", line 18, in <module>
    app.launch_new_instance()
  File "/opt/homebrew/Caskroom/miniconda/base/envs/alma/lib/python3.10/site-packages/traitlets/config/application.py", line 1075, in launch_instance
    app.start()
  File "/opt/homebrew/Caskroom/miniconda/base/envs/alma/lib/python3.10/site-packages/ipykernel/kernelapp.py", line 739, in start
    self.io_loop.start()
  File "/opt/homebrew/Caskroom/miniconda/base/envs/alma/lib/python3.10/site-packages/tornado/platform/asyncio.py", line 205, in start
    self.asyncio_loop.run_forever()
  File "/opt/homebrew/

For a more succinct message, we can also just access the error message:

In [9]:
print(results["NATIVE_CONVERT_AI8WI8_STATIC_QUANTIZED"]["error"])



## Better use of memory when multiprocessing

We do allow people to feed in models directly into `benchmark_model`. However, if multi-processing is enabled, this is not very memory efficient. This is because the model gets intialised as one creates it, and then gets copied over to the child process for each conversion method. This means it can get stored in memory twice. As such, it would be better, if multiprocessing is enabled, to not feed in the model directly. Instead, we can feed in a callable that RETURNS the model. This allows us to only initialize the model inside the child processes, and not in the parent process.

Unfortunately, Jupyter notebooks don't play very nicely with multiprocessing, and so we have to refer you to one of our script-based examples, e.g. `examples/mnist/mem_efficient_benchmark_rand_tensor.py`. 

## Using a data loader inside of a tensor

`alma` provides two options for feeding in data to benchmark the model on. Throuhgout this notebook, we've just fed in a `data` tensor. Under the hood, this initializes a data loader with the config-defined batch size, and then uses that data loader to benchmark the model.

However, you might wish to provide your own data loader. In which case, you can provide one via the `data_loader` argument. The config-defined batch size will be overridden.

Unfortunately, Jupyter notebooks still don't play very nicely with multiprocessing, and so as in the `get_model` case, we have to refer you to one of our script-based examples, e.g. `examples/mnist/benchmark_with_dataloader.py`.

## Logging

We do highly recommend that users set up logging. There are many internal operations that one can get insight into via enabling logging. A `setup_logging` function is provided for convenience, but one
can use whatever logging one wishes, or none.

In [11]:
import logging
from typing import Any, Dict

import torch

from alma.utils.setup_logging import setup_logging


# Set up logging. DEBUG level will also log the internal conversion logs (where available), as well
# as the model graphs. A `setup_logging` function is provided for convenience, but one can use
# whatever logging one wishes, or none.
setup_logging(log_file=None, level="INFO")

# Set the device one wants to benchmark on
device = torch.device("cuda") if torch.cuda.is_available() else torch.device("cpu")

# Benchmark the model
# Feeding in a tensor, and no dataloader, will cause the benchmark_model function to generate a
# dataloader that provides random tensors of the same shape as `data`, which is used to
# benchmark the model. As verbose logging is provided, it will log the benchmarking
# at a DEBUG level.
logging.info("Benchmarking model using random data")
results: Dict[str, Dict[str, Any]] = benchmark_model(
    model, config, conversions, data=data.squeeze()
)

# Display the results
display_all_results(
    results, display_function=print, include_traceback_for_errors=False
)

INFO: [279899324.py:22] Benchmarking model using random data
as the model will be pickled and sent to each child process, which will require the model to be stored in memory
twice. If the model is large, this may cause memory issues. Consider using a callable to return the model, which
will be created in each child process, rather than the parent process. See `examples/mnist/mem_efficient_benchmark_rand_tensor.py`
for an example.
INFO: [benchmark_model.py:112] Benchmarking model using conversion: EAGER
INFO: [device.py:163] Chosen device: mps (Fallback selection)
Benchmarking EAGER on mps:  94%|█████████▍| 15/16 [00:00<00:00, 1257.03it/s]
INFO: [benchmark_model.py:112] Benchmarking model using conversion: JIT_TRACE
INFO: [device.py:163] Chosen device: mps (Fallback selection)
Benchmarking JIT_TRACE on mps:  94%|█████████▍| 15/16 [00:00<00:00, 1236.19it/s]




All results:
EAGER results:
Device: mps
Total elapsed time: 0.0136 seconds
Total inference time (model only): 0.0010 seconds
Total samples: 1024 - Batch size: 64
Throughput: 1004334.13 samples/second


JIT_TRACE results:
Device: mps
Total elapsed time: 0.0133 seconds
Total inference time (model only): 0.0013 seconds
Total samples: 1024 - Batch size: 64
Throughput: 764869.20 samples/second




A lot of the conversion methods have extremely verbose logging. We have opted to wrap most of them
in a `suppress_output` context manager that silences all `sys.stdout` and `sys.stderr`. However, if one
sets ones logging level to DEBUG with the `setup_logging` function, then those internal import logs
will not be supressed.

Furthermore, as we have highlighted prior, we provide a `display_all_results` function to print 
the results in a nice format.There is also a `save_dict_to_json` function to save the results to a 
JSON file for easy CI integration.

### Debugging
If one is debugging, it is highly recommended that one use the `setup_logging` function and set one's
level to DEBUG. This will, among other things, log any torch.compile warnings and errors thrown by
torch.inductor that can point to issues in triton kernels, give verbose ONNX logging, and print 
the model graphs where appropriate.

## Further examples

For script-based examples, including examples on how to use our provided argparser for a CLI, see the `.py` file examples in `example/linear` and `examples/mnist`.