feat(api): add inpaint to panorama pipeline
ssube committed May 1, 2023
1 parent d100d75 commit 71b9518
Showing 2 changed files with 276 additions and 3 deletions.
277 changes: 275 additions & 2 deletions api/onnx_web/diffusers/pipelines/
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,11 @@
logger = logging.get_logger(__name__)

# inpaint constants

def preprocess(image):
if isinstance(image, torch.Tensor):
return image
Expand All @@ -53,6 +58,24 @@ def preprocess(image):
return image

def prepare_mask_and_masked_image(image, mask, latents_shape):
image = np.array(image.convert("RGB").resize((latents_shape[1] * 8, latents_shape[0] * 8)))
image = image[None].transpose(0, 3, 1, 2)
image = image.astype(np.float32) / 127.5 - 1.0

image_mask = np.array(mask.convert("L").resize((latents_shape[1] * 8, latents_shape[0] * 8)))
masked_image = image * (image_mask < 127.5)

mask = mask.resize((latents_shape[1], latents_shape[0]), PIL_INTERPOLATION["nearest"])
mask = np.array(mask.convert("L"))
mask = mask.astype(np.float32) / 255.0
mask = mask[None, None]
mask[mask < 0.5] = 0
mask[mask >= 0.5] = 1

return mask, masked_image

class OnnxStableDiffusionPanoramaPipeline(DiffusionPipeline):
vae_encoder: OnnxRuntimeModel
vae_decoder: OnnxRuntimeModel
Expand Down Expand Up @@ -884,6 +907,250 @@ def img2img(
images=image, nsfw_content_detected=has_nsfw_concept

def inpaint(
prompt: Union[str, List[str]],
image: PIL.Image.Image,
mask_image: PIL.Image.Image,
height: Optional[int] = 512,
width: Optional[int] = 512,
num_inference_steps: int = 50,
guidance_scale: float = 7.5,
negative_prompt: Optional[Union[str, List[str]]] = None,
num_images_per_prompt: Optional[int] = 1,
eta: float = 0.0,
generator: Optional[np.random.RandomState] = None,
latents: Optional[np.ndarray] = None,
prompt_embeds: Optional[np.ndarray] = None,
negative_prompt_embeds: Optional[np.ndarray] = None,
output_type: Optional[str] = "pil",
return_dict: bool = True,
callback: Optional[Callable[[int, int, np.ndarray], None]] = None,
callback_steps: int = 1,
Function invoked when calling the pipeline for generation.
prompt (`str` or `List[str]`):
The prompt or prompts to guide the image generation.
image (`PIL.Image.Image`):
`Image`, or tensor representing an image batch which will be inpainted, *i.e.* parts of the image will
be masked out with `mask_image` and repainted according to `prompt`.
mask_image (`PIL.Image.Image`):
`Image`, or tensor representing an image batch, to mask `image`. White pixels in the mask will be
repainted, while black pixels will be preserved. If `mask_image` is a PIL image, it will be converted
to a single channel (luminance) before use. If it's a tensor, it should contain one color channel (L)
instead of 3, so the expected shape would be `(B, H, W, 1)`.
height (`int`, *optional*, defaults to 512):
The height in pixels of the generated image.
width (`int`, *optional*, defaults to 512):
The width in pixels of the generated image.
num_inference_steps (`int`, *optional*, defaults to 50):
The number of denoising steps. More denoising steps usually lead to a higher quality image at the
expense of slower inference.
guidance_scale (`float`, *optional*, defaults to 7.5):
Guidance scale as defined in [Classifier-Free Diffusion Guidance](
`guidance_scale` is defined as `w` of equation 2. of [Imagen
Paper]( Guidance scale is enabled by setting `guidance_scale >
1`. Higher guidance scale encourages to generate images that are closely linked to the text `prompt`,
usually at the expense of lower image quality.
negative_prompt (`str` or `List[str]`, *optional*):
The prompt or prompts not to guide the image generation. Ignored when not using guidance (i.e., ignored
if `guidance_scale` is less than `1`).
num_images_per_prompt (`int`, *optional*, defaults to 1):
The number of images to generate per prompt.
eta (`float`, *optional*, defaults to 0.0):
Corresponds to parameter eta (η) in the DDIM paper: Only applies to
[`schedulers.DDIMScheduler`], will be ignored for others.
generator (`np.random.RandomState`, *optional*):
A np.random.RandomState to make generation deterministic.
latents (`np.ndarray`, *optional*):
Pre-generated noisy latents, sampled from a Gaussian distribution, to be used as inputs for image
generation. Can be used to tweak the same generation with different prompts. If not provided, a latents
tensor will ge generated by sampling using the supplied random `generator`.
prompt_embeds (`np.ndarray`, *optional*):
Pre-generated text embeddings. Can be used to easily tweak text inputs, *e.g.* prompt weighting. If not
provided, text embeddings will be generated from `prompt` input argument.
negative_prompt_embeds (`np.ndarray`, *optional*):
Pre-generated negative text embeddings. Can be used to easily tweak text inputs, *e.g.* prompt
weighting. If not provided, negative_prompt_embeds will be generated from `negative_prompt` input
output_type (`str`, *optional*, defaults to `"pil"`):
The output format of the generate image. Choose between
[PIL]( `PIL.Image.Image` or `np.array`.
return_dict (`bool`, *optional*, defaults to `True`):
Whether or not to return a [`~pipelines.stable_diffusion.StableDiffusionPipelineOutput`] instead of a
plain tuple.
callback (`Callable`, *optional*):
A function that will be called every `callback_steps` steps during inference. The function will be
called with the following arguments: `callback(step: int, timestep: int, latents: np.ndarray)`.
callback_steps (`int`, *optional*, defaults to 1):
The frequency at which the `callback` function will be called. If not specified, the callback will be
called at every step.
[`~pipelines.stable_diffusion.StableDiffusionPipelineOutput`] or `tuple`:
[`~pipelines.stable_diffusion.StableDiffusionPipelineOutput`] if `return_dict` is True, otherwise a `tuple.
When returning a tuple, the first element is a list with the generated images, and the second element is a
list of `bool`s denoting whether the corresponding generated image likely represents "not-safe-for-work"
(nsfw) content, according to the `safety_checker`.

# check inputs. Raise error if not correct
prompt, height, width, callback_steps, negative_prompt, prompt_embeds, negative_prompt_embeds

# define call parameters
if prompt is not None and isinstance(prompt, str):
batch_size = 1
elif prompt is not None and isinstance(prompt, list):
batch_size = len(prompt)
batch_size = prompt_embeds.shape[0]

if generator is None:
generator = np.random

# set timesteps

# here `guidance_scale` is defined analog to the guidance weight `w` of equation (2)
# of the Imagen paper: . `guidance_scale = 1`
# corresponds to doing no classifier free guidance.
do_classifier_free_guidance = guidance_scale > 1.0

prompt_embeds = self._encode_prompt(

num_channels_latents = NUM_LATENT_CHANNELS
latents_shape = (batch_size * num_images_per_prompt, num_channels_latents, height // 8, width // 8)
latents_dtype = prompt_embeds.dtype
if latents is None:
latents = generator.randn(*latents_shape).astype(latents_dtype)
if latents.shape != latents_shape:
raise ValueError(f"Unexpected latents shape, got {latents.shape}, expected {latents_shape}")

# prepare mask and masked_image
mask, masked_image = prepare_mask_and_masked_image(image, mask_image, latents_shape[-2:])
mask = mask.astype(latents.dtype)
masked_image = masked_image.astype(latents.dtype)

masked_image_latents = self.vae_encoder(sample=masked_image)[0]
masked_image_latents = 0.18215 * masked_image_latents

# duplicate mask and masked_image_latents for each generation per prompt
mask = mask.repeat(batch_size * num_images_per_prompt, 0)
masked_image_latents = masked_image_latents.repeat(batch_size * num_images_per_prompt, 0)

mask = np.concatenate([mask] * 2) if do_classifier_free_guidance else mask
masked_image_latents = (
np.concatenate([masked_image_latents] * 2) if do_classifier_free_guidance else masked_image_latents

num_channels_mask = mask.shape[1]
num_channels_masked_image = masked_image_latents.shape[1]

unet_input_channels = NUM_UNET_INPUT_CHANNELS
if num_channels_latents + num_channels_mask + num_channels_masked_image != unet_input_channels:
raise ValueError(
"Incorrect configuration settings! The config of `pipeline.unet` expects"
f" {unet_input_channels} but received `num_channels_latents`: {num_channels_latents} +"
f" `num_channels_mask`: {num_channels_mask} + `num_channels_masked_image`: {num_channels_masked_image}"
f" = {num_channels_latents+num_channels_masked_image+num_channels_mask}. Please verify the config of"
" `pipeline.unet` or your `mask_image` or `image` input."

# set timesteps

# scale the initial noise by the standard deviation required by the scheduler
latents = latents * np.float64(self.scheduler.init_noise_sigma)

# prepare extra kwargs for the scheduler step, since not all schedulers have the same signature
# eta (η) is only used with the DDIMScheduler, it will be ignored for other schedulers.
# eta corresponds to η in DDIM paper:
# and should be between [0, 1]
accepts_eta = "eta" in set(inspect.signature(self.scheduler.step).parameters.keys())
extra_step_kwargs = {}
if accepts_eta:
extra_step_kwargs["eta"] = eta

timestep_dtype = next(
(input.type for input in self.unet.model.get_inputs() if == "timestep"), "tensor(float)"
timestep_dtype = ORT_TO_NP_TYPE[timestep_dtype]

for i, t in enumerate(self.progress_bar(self.scheduler.timesteps)):
# expand the latents if we are doing classifier free guidance
latent_model_input = np.concatenate([latents] * 2) if do_classifier_free_guidance else latents
# concat latents, mask, masked_image_latnets in the channel dimension
latent_model_input = self.scheduler.scale_model_input(torch.from_numpy(latent_model_input), t)
latent_model_input = latent_model_input.cpu().numpy()
latent_model_input = np.concatenate([latent_model_input, mask, masked_image_latents], axis=1)

# predict the noise residual
timestep = np.array([t], dtype=timestep_dtype)
noise_pred = self.unet(sample=latent_model_input, timestep=timestep, encoder_hidden_states=prompt_embeds)[

# perform guidance
if do_classifier_free_guidance:
noise_pred_uncond, noise_pred_text = np.split(noise_pred, 2)
noise_pred = noise_pred_uncond + guidance_scale * (noise_pred_text - noise_pred_uncond)

# compute the previous noisy sample x_t -> x_t-1
scheduler_output = self.scheduler.step(
torch.from_numpy(noise_pred), t, torch.from_numpy(latents), **extra_step_kwargs
latents = scheduler_output.prev_sample.numpy()

# call the callback, if provided
if callback is not None and i % callback_steps == 0:
callback(i, t, latents)

latents = 1 / 0.18215 * latents
# image = self.vae_decoder(latent_sample=latents)[0]
# it seems likes there is a strange result for using half-precision vae decoder if batchsize>1
image = np.concatenate(
[self.vae_decoder(latent_sample=latents[i : i + 1])[0] for i in range(latents.shape[0])]

image = np.clip(image / 2 + 0.5, 0, 1)
image = image.transpose((0, 2, 3, 1))

if self.safety_checker is not None:
safety_checker_input = self.feature_extractor(
self.numpy_to_pil(image), return_tensors="np"
# safety_checker does not support batched inputs yet
images, has_nsfw_concept = [], []
for i in range(image.shape[0]):
image_i, has_nsfw_concept_i = self.safety_checker(
clip_input=safety_checker_input[i : i + 1], images=image[i : i + 1]
image = np.concatenate(images)
has_nsfw_concept = None

if output_type == "pil":
image = self.numpy_to_pil(image)

if not return_dict:
return (image, has_nsfw_concept)

return StableDiffusionPipelineOutput(images=image, nsfw_content_detected=has_nsfw_concept)

def __call__(
Expand All @@ -892,8 +1159,14 @@ def __call__(
if len(args) > 1 and (
isinstance(args[1], np.ndarray) or isinstance(args[1], PIL.Image.Image)
logger.debug("running img2img panorama pipeline")
return self.img2img(*args, **kwargs)
if len(args) > 2 and (
isinstance(args[1], np.ndarray) or isinstance(args[1], PIL.Image.Image)
logger.debug("running inpaint panorama pipeline")
return self.inpaint(*args, **kwargs)
logger.debug("running img2img panorama pipeline")
return self.img2img(*args, **kwargs)
logger.debug("running txt2img panorama pipeline")
return self.text2img(*args, **kwargs)
2 changes: 1 addition & 1 deletion api/onnx_web/
Original file line number Diff line number Diff line change
Expand Up @@ -239,7 +239,7 @@ def get_valid_pipeline(self, group: str, pipeline: str = None) -> str:
if pipeline in ["controlnet", "lpw", "panorama", "pix2pix"]:
return pipeline
elif group == "inpaint":
if pipeline in ["controlnet"]:
if pipeline in ["controlnet", "panorama"]:
return pipeline
elif group == "txt2img":
if pipeline in ["panorama"]:
Expand Down

