In [None]:
!pip install onnx diffusers transformers huggingface_hub

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/


In [None]:
!huggingface-cli login


        _|    _|  _|    _|    _|_|_|    _|_|_|  _|_|_|  _|      _|    _|_|_|      _|_|_|_|    _|_|      _|_|_|  _|_|_|_|
        _|    _|  _|    _|  _|        _|          _|    _|_|    _|  _|            _|        _|    _|  _|        _|
        _|_|_|_|  _|    _|  _|  _|_|  _|  _|_|    _|    _|  _|  _|  _|  _|_|      _|_|_|    _|_|_|_|  _|        _|_|_|
        _|    _|  _|    _|  _|    _|  _|    _|    _|    _|    _|_|  _|    _|      _|        _|    _|  _|        _|
        _|    _|    _|_|      _|_|_|    _|_|_|  _|_|_|  _|      _|    _|_|_|      _|        _|    _|    _|_|_|  _|_|_|_|

        To login, `huggingface_hub` now requires a token generated from https://huggingface.co/settings/tokens .
        
Token: 
Login successful
Your token has been saved to /root/.huggingface/token
[1m[31mAuthenticated through git-credential store but this isn't the helper defined on your machine.
You might have to re-authenticate when pushing to the Hugging Face Hub. Run the following command in yo

In [None]:
# Copyright 2022 The HuggingFace Team. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import argparse
import os
import shutil
from pathlib import Path

import torch
from torch.onnx import export

import onnx
from diffusers import StableDiffusionOnnxPipeline, StableDiffusionPipeline
from diffusers.onnx_utils import OnnxRuntimeModel
from packaging import version


is_torch_less_than_1_11 = version.parse(version.parse(torch.__version__).base_version) < version.parse("1.11")


def onnx_export(
    model,
    model_args: tuple,
    output_path: Path,
    ordered_input_names,
    output_names,
    dynamic_axes,
    opset,
    use_external_data_format=False,
):
    output_path.parent.mkdir(parents=True, exist_ok=True)
    # PyTorch deprecated the `enable_onnx_checker` and `use_external_data_format` arguments in v1.11,
    # so we check the torch version for backwards compatibility
    if is_torch_less_than_1_11:
        export(
            model,
            model_args,
            f=output_path.as_posix(),
            input_names=ordered_input_names,
            output_names=output_names,
            dynamic_axes=dynamic_axes,
            do_constant_folding=True,
            use_external_data_format=use_external_data_format,
            enable_onnx_checker=True,
            opset_version=opset,
        )
    else:
        export(
            model,
            model_args,
            f=output_path.as_posix(),
            input_names=ordered_input_names,
            output_names=output_names,
            dynamic_axes=dynamic_axes,
            do_constant_folding=True,
            opset_version=opset,
        )


@torch.no_grad()
def convert_models(model_path: str, output_path: str, opset: int):
    pipeline = StableDiffusionPipeline.from_pretrained(model_path)
    output_path = Path(output_path)

    # TEXT ENCODER
    text_input = pipeline.tokenizer(
        "A sample prompt",
        padding="max_length",
        max_length=pipeline.tokenizer.model_max_length,
        truncation=True,
        return_tensors="pt",
    )
    onnx_export(
        pipeline.text_encoder,
        # casting to torch.int32 until the CLIP fix is released: https://github.com/huggingface/transformers/pull/18515/files
        model_args=(text_input.input_ids.to(torch.int32)),
        output_path=output_path / "text_encoder" / "model.onnx",
        ordered_input_names=["input_ids"],
        output_names=["last_hidden_state", "pooler_output"],
        dynamic_axes={
            "input_ids": {0: "batch", 1: "sequence"},
        },
        opset=opset,
    )

    # UNET
    unet_path = output_path / "unet" / "model.onnx"
    onnx_export(
        pipeline.unet,
        model_args=(torch.randn(2, 4, 64, 64), torch.LongTensor([0, 1]), torch.randn(2, 77, 768), False),
        output_path=unet_path,
        ordered_input_names=["sample", "timestep", "encoder_hidden_states", "return_dict"],
        output_names=["out_sample"],  # has to be different from "sample" for correct tracing
        dynamic_axes={
            "sample": {0: "batch", 1: "channels", 2: "height", 3: "width"},
            "timestep": {0: "batch"},
            "encoder_hidden_states": {0: "batch", 1: "sequence"},
        },
        opset=opset,
        use_external_data_format=True,  # UNet is > 2GB, so the weights need to be split
    )
    unet_model_path = str(unet_path.absolute().as_posix())
    unet_dir = os.path.dirname(unet_model_path)
    unet = onnx.load(unet_model_path)
    # clean up existing tensor files
    shutil.rmtree(unet_dir)
    os.mkdir(unet_dir)
    # collate external tensor files into one
    onnx.save_model(
        unet,
        unet_model_path,
        save_as_external_data=True,
        all_tensors_to_one_file=True,
        location="weights.pb",
        convert_attribute=False,
    )

    # VAE ENCODER
    vae_encoder = pipeline.vae
    # need to get the raw tensor output (sample) from the encoder
    vae_encoder.forward = lambda sample, return_dict: vae_encoder.encode(sample, return_dict)[0].sample()
    onnx_export(
        vae_encoder,
        model_args=(torch.randn(1, 3, 512, 512), False),
        output_path=output_path / "vae_encoder" / "model.onnx",
        ordered_input_names=["sample", "return_dict"],
        output_names=["latent_sample"],
        dynamic_axes={
            "sample": {0: "batch", 1: "channels", 2: "height", 3: "width"},
        },
        opset=opset,
    )

    # VAE DECODER
    vae_decoder = pipeline.vae
    # forward only through the decoder part
    vae_decoder.forward = vae_encoder.decode
    onnx_export(
        vae_decoder,
        model_args=(torch.randn(1, 4, 64, 64), False),
        output_path=output_path / "vae_decoder" / "model.onnx",
        ordered_input_names=["latent_sample", "return_dict"],
        output_names=["sample"],
        dynamic_axes={
            "latent_sample": {0: "batch", 1: "channels", 2: "height", 3: "width"},
        },
        opset=opset,
    )

    # SAFETY CHECKER
    safety_checker = pipeline.safety_checker
    safety_checker.forward = safety_checker.forward_onnx
    onnx_export(
        pipeline.safety_checker,
        model_args=(torch.randn(1, 3, 224, 224), torch.randn(1, 512, 512, 3)),
        output_path=output_path / "safety_checker" / "model.onnx",
        ordered_input_names=["clip_input", "images"],
        output_names=["out_images", "has_nsfw_concepts"],
        dynamic_axes={
            "clip_input": {0: "batch", 1: "channels", 2: "height", 3: "width"},
            "images": {0: "batch", 1: "channels", 2: "height", 3: "width"},
        },
        opset=opset,
    )

    onnx_pipeline = StableDiffusionOnnxPipeline(
        vae_decoder=OnnxRuntimeModel.from_pretrained(output_path / "vae_decoder"),
        text_encoder=OnnxRuntimeModel.from_pretrained(output_path / "text_encoder"),
        tokenizer=pipeline.tokenizer,
        unet=OnnxRuntimeModel.from_pretrained(output_path / "unet"),
        scheduler=pipeline.scheduler,
        safety_checker=OnnxRuntimeModel.from_pretrained(output_path / "safety_checker"),
        feature_extractor=pipeline.feature_extractor,
    )

    onnx_pipeline.save_pretrained(output_path)
    print("ONNX pipeline saved to", output_path)

    _ = StableDiffusionOnnxPipeline.from_pretrained(output_path, provider="CPUExecutionProvider")
    print("ONNX pipeline is loadable")



    


In [None]:
convert_models("CompVis/stable-diffusion-v1-4", "drive/MyDrive/exported_models/stable_diffusion_onnx", 14)

Downloading:   0%|          | 0.00/543 [00:00<?, ?B/s]

Fetching 16 files:   0%|          | 0/16 [00:00<?, ?it/s]

Downloading:   0%|          | 0.00/342 [00:00<?, ?B/s]

Downloading:   0%|          | 0.00/4.56k [00:00<?, ?B/s]

Downloading:   0%|          | 0.00/1.22G [00:00<?, ?B/s]

Downloading:   0%|          | 0.00/209 [00:00<?, ?B/s]

Downloading:   0%|          | 0.00/230 [00:00<?, ?B/s]

Downloading:   0%|          | 0.00/592 [00:00<?, ?B/s]

Downloading:   0%|          | 0.00/492M [00:00<?, ?B/s]

Downloading:   0%|          | 0.00/525k [00:00<?, ?B/s]

Downloading:   0%|          | 0.00/472 [00:00<?, ?B/s]

Downloading:   0%|          | 0.00/806 [00:00<?, ?B/s]

Downloading:   0%|          | 0.00/1.06M [00:00<?, ?B/s]

Downloading:   0%|          | 0.00/743 [00:00<?, ?B/s]

Downloading:   0%|          | 0.00/3.44G [00:00<?, ?B/s]

Downloading:   0%|          | 0.00/522 [00:00<?, ?B/s]

Downloading:   0%|          | 0.00/335M [00:00<?, ?B/s]

ftfy or spacy is not installed using BERT BasicTokenizer instead of ftfy.
  mask.fill_(torch.tensor(torch.finfo(dtype).min))
  if attn_weights.size() != (bsz * self.num_heads, tgt_len, src_len):
  if causal_attention_mask.size() != (bsz, 1, tgt_len, src_len):
  if attn_output.size() != (bsz * self.num_heads, tgt_len, self.head_dim):
  + "If indices include negative values, the exported graph will produce incorrect results."
  if any(s % default_overall_up_factor != 0 for s in sample.shape[-2:]):
  _verify_batch_size([input.size(0) * input.size(1) // num_groups, num_groups] + list(input.size()[2:]))
  tensor = tensor.reshape(batch_size, seq_len, head_size, dim // head_size)
  tensor = tensor.permute(0, 2, 1, 3).reshape(batch_size * head_size, seq_len, dim // head_size)
  tensor = tensor.reshape(batch_size // head_size, head_size, seq_len, dim)
  tensor = tensor.permute(0, 2, 1, 3).reshape(batch_size // head_size, seq_len, dim * head_size)
  assert hidden_states.shape[1] == self.channels

RuntimeError: ignored

In [None]:
!pip install onnxruntime

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/
Collecting onnxruntime
  Downloading onnxruntime-1.12.1-cp37-cp37m-manylinux_2_27_x86_64.whl (4.9 MB)
[K     |████████████████████████████████| 4.9 MB 5.2 MB/s 
Collecting coloredlogs
  Downloading coloredlogs-15.0.1-py2.py3-none-any.whl (46 kB)
[K     |████████████████████████████████| 46 kB 3.6 MB/s 
Collecting humanfriendly>=9.1
  Downloading humanfriendly-10.0-py2.py3-none-any.whl (86 kB)
[K     |████████████████████████████████| 86 kB 4.6 MB/s 
Installing collected packages: humanfriendly, coloredlogs, onnxruntime
Successfully installed coloredlogs-15.0.1 humanfriendly-10.0 onnxruntime-1.12.1


In [None]:
from onnx import numpy_helper
import onnxruntime as rt
import numpy as np
import shutil
import onnx
import time
import glob
import sys
import os

def _try_load_and_check_model(path_to_onnx):
    """
    Loads and checks the ONNX model, or returns an error code.
    """
    try:
        onnx_model = onnx.load(path_to_onnx)
        onnx.checker.check_model(onnx_model)
    except FileNotFoundError as e:
        print(e)
        return False
    except onnx.checker.ValidationError as e:
        print(e)
        return False
    else:
        return True

def _try_load_and_test_model(path_to_onnx):
    """
    Loads and tests the ONNX model, returning list of test results.
    """
    # Load session
    sess = rt.InferenceSession(path_to_onnx)
    input_name = sess.get_inputs()[0].name
    output_name = sess.get_outputs()[0].name
    # Test folders should lie in same directory
    top_level_dir = os.path.dirname(path_to_onnx)
    test_dir_pattern = os.path.join(top_level_dir, "test_data_set_*")
    results = []
    for test_dir in glob.glob(test_dir_pattern):
        input_pattern = os.path.join(test_dir, "input_*.pb")
        output_pattern = os.path.join(test_dir, "output_*.pb")
        # glob ordering is arbitrary, sorting them will ensure correct
        # pairings of inputs and outputs
        inputs = sorted(glob.glob(input_pattern))
        outputs = sorted(glob.glob(output_pattern))
        for inp, expected_out in zip(inputs, outputs):
            # Initialize TensorProto
            inp_tensor = onnx.TensorProto()
            expected_out_tensor = onnx.TensorProto()
            # Parse protobuf
            with open(inp, 'rb') as inp_f, open(expected_out, 'rb') as exp_o_f:
                inp_tensor.ParseFromString(inp_f.read())
                expected_out_tensor.ParseFromString(exp_o_f.read())
            # Retrieve actual value
            actual_out_tensor = sess.run([output_name], {
                input_name: numpy_helper.to_array(inp_tensor)
            })[0]
            # Compare the results and output result
            try:
                np.testing.assert_allclose(
                    actual_out_tensor,
                    numpy_helper.to_array(expected_out_tensor),
                    rtol=5e-4,
                    atol=1e-3
                )
                sys.stdout.write('.')
                # Mutability :(
                results.append(1)
            except AssertionError as e:
                print(f'Input: {numpy_helper.to_array(inp_tensor)}')
                print(e)
                results.append(0)
    # If we've made it this far then everything went well
    return results


path_to_onnx = 'drive/MyDrive/exported_models/stable_diffusion_onnx/text_encoder/model.onnx'
term_size = shutil.get_terminal_size((80, 20)).columns
title_str = f' {path_to_onnx} '
title_str = title_str.center(term_size, '=')
print(title_str)
start = time.time()
# This must pass in order for the next suite to make
# sense, so assert here and fail early
assert _try_load_and_check_model(path_to_onnx) == True
# Now run the actual suite of tests
results = _try_load_and_test_model(path_to_onnx)
end = time.time()
total = end - start
# Print a summary
print('\n\nFinished in {:.3f} seconds'.format(total))
print(f'{len(results)} tests, {len(results) - sum(results)} failures')
print('='*term_size)
# Exit code is number of failures
exit(len(results) - sum(results))

= drive/MyDrive/exported_models/stable_diffusion_onnx/text_encoder/model.onnx ==


Finished in 10.779 seconds
0 tests, 0 failures
