In [None]:
import torch
import OpenEXR

from Imath import PixelType
from PIL import Image
from diffusers.pipelines.stable_diffusion.pipeline_stable_diffusion_img2img import *
from diffusers.pipelines.stable_diffusion.pipeline_stable_diffusion_inpaint import StableDiffusionInpaintPipeline
from diffusers.utils import make_image_grid
from scipy.sparse import bsr_array, csr_array, save_npz, load_npz

%cd ../ext/Text2Light
from sritmo.global_sritmo import SRiTMO
%cd ../../notebooks

In [None]:
class myStableDiffusionImg2ImgPipeline(StableDiffusionImg2ImgPipeline):
    def __init__(
        self,
        vae: AutoencoderKL,
        text_encoder: CLIPTextModel,
        tokenizer: CLIPTokenizer,
        unet: UNet2DConditionModel,
        scheduler: KarrasDiffusionSchedulers,
        safety_checker: StableDiffusionSafetyChecker,
        feature_extractor: CLIPImageProcessor,
        image_encoder: CLIPVisionModelWithProjection = None,
        requires_safety_checker: bool = True,
    ):
        super(myStableDiffusionImg2ImgPipeline, self).__init__(
            vae, text_encoder, tokenizer, unet, scheduler, safety_checker, feature_extractor, image_encoder, requires_safety_checker
        )

        self.mask_processor = VaeImageProcessor(
            vae_scale_factor=self.vae_scale_factor, do_normalize=False, do_binarize=True, do_convert_grayscale=True
        )

    def _encode_vae_image(self, image: torch.Tensor, generator: torch.Generator):
        if isinstance(generator, list):
            image_latents = [
                retrieve_latents(self.vae.encode(image[i : i + 1]), generator=generator[i])
                for i in range(image.shape[0])
            ]
            image_latents = torch.cat(image_latents, dim=0)
        else:
            image_latents = retrieve_latents(self.vae.encode(image), generator=generator)

        image_latents = self.vae.config.scaling_factor * image_latents

        return image_latents

    def prepare_mask_latents(
        self, mask, masked_image, batch_size, height, width, dtype, device, generator, do_classifier_free_guidance
    ):
        # resize the mask to latents shape as we concatenate the mask to the latents
        # we do that before converting to dtype to avoid breaking in case we're using cpu_offload
        # and half precision
        mask = torch.nn.functional.interpolate(
            mask, size=(height // self.vae_scale_factor, width // self.vae_scale_factor)
        )
        mask = mask.to(device=device, dtype=dtype)

        masked_image = masked_image.to(device=device, dtype=dtype)

        if masked_image.shape[1] == 4:
            masked_image_latents = masked_image
        else:
            masked_image_latents = self._encode_vae_image(masked_image, generator=generator)

        # duplicate mask and masked_image_latents for each generation per prompt, using mps friendly method
        if mask.shape[0] < batch_size:
            if not batch_size % mask.shape[0] == 0:
                raise ValueError(
                    "The passed mask and the required batch size don't match. Masks are supposed to be duplicated to"
                    f" a total batch size of {batch_size}, but {mask.shape[0]} masks were passed. Make sure the number"
                    " of masks that you pass is divisible by the total requested batch size."
                )
            mask = mask.repeat(batch_size // mask.shape[0], 1, 1, 1)
        if masked_image_latents.shape[0] < batch_size:
            if not batch_size % masked_image_latents.shape[0] == 0:
                raise ValueError(
                    "The passed images and the required batch size don't match. Images are supposed to be duplicated"
                    f" to a total batch size of {batch_size}, but {masked_image_latents.shape[0]} images were passed."
                    " Make sure the number of images that you pass is divisible by the total requested batch size."
                )
            masked_image_latents = masked_image_latents.repeat(batch_size // masked_image_latents.shape[0], 1, 1, 1)

        mask = torch.cat([mask] * 2) if do_classifier_free_guidance else mask
        masked_image_latents = (
            torch.cat([masked_image_latents] * 2) if do_classifier_free_guidance else masked_image_latents
        )

        # aligning device to prevent device errors when concating it with the latent model input
        masked_image_latents = masked_image_latents.to(device=device, dtype=dtype)
        return mask, masked_image_latents
    
    def prepare_latents(
        self,
        batch_size,
        num_channels_latents,
        height,
        width,
        dtype,
        device,
        generator,
        latents=None,
        image=None,
        timestep=None,
        is_strength_max=True,
        return_noise=False,
        return_image_latents=False,
    ):
        shape = (
            batch_size,
            num_channels_latents,
            int(height) // self.vae_scale_factor,
            int(width) // self.vae_scale_factor,
        )
        if isinstance(generator, list) and len(generator) != batch_size:
            raise ValueError(
                f"You have passed a list of generators of length {len(generator)}, but requested an effective batch"
                f" size of {batch_size}. Make sure the batch size matches the length of the generators."
            )

        if (image is None or timestep is None) and not is_strength_max:
            raise ValueError(
                "Since strength < 1. initial latents are to be initialised as a combination of Image + Noise."
                "However, either the image or the noise timestep has not been provided."
            )

        if return_image_latents or (latents is None and not is_strength_max):
            image = image.to(device=device, dtype=dtype)

            if image.shape[1] == 4:
                image_latents = image
            else:
                image_latents = self._encode_vae_image(image=image, generator=generator)
            image_latents = image_latents.repeat(batch_size // image_latents.shape[0], 1, 1, 1)

        if latents is None:
            noise = randn_tensor(shape, generator=generator, device=device, dtype=dtype)
            # if strength is 1. then initialise the latents to noise, else initial to image + noise
            latents = noise if is_strength_max else self.scheduler.add_noise(image_latents, noise, timestep)
            # if pure noise then scale the initial latents by the  Scheduler's init sigma
            latents = latents * self.scheduler.init_noise_sigma if is_strength_max else latents
        else:
            noise = latents.to(device)
            latents = noise * self.scheduler.init_noise_sigma

        outputs = (latents,)

        if return_noise:
            outputs += (noise,)

        if return_image_latents:
            outputs += (image_latents,)

        return outputs

    @torch.no_grad()
    def __call__(
        self,
        prompt: Union[str, List[str]] = None,
        image: PipelineImageInput = None,
        strength: float = 0.8,
        num_inference_steps: Optional[int] = 50,
        timesteps: List[int] = None,
        guidance_scale: Optional[float] = 7.5,
        negative_prompt: Optional[Union[str, List[str]]] = None,
        num_images_per_prompt: Optional[int] = 1,
        eta: Optional[float] = 0.0,
        generator: Optional[Union[torch.Generator, List[torch.Generator]]] = None,
        prompt_embeds: Optional[torch.FloatTensor] = None,
        negative_prompt_embeds: Optional[torch.FloatTensor] = None,
        ip_adapter_image: Optional[PipelineImageInput] = None,
        ip_adapter_image_embeds: Optional[List[torch.FloatTensor]] = None,
        output_type: Optional[str] = "pil",
        return_dict: bool = True,
        cross_attention_kwargs: Optional[Dict[str, Any]] = None,
        clip_skip: int = None,
        callback_on_step_end: Optional[Callable[[int, int, Dict], None]] = None,
        callback_on_step_end_tensor_inputs: List[str] = ["latents"],
        # MASK
        height: Optional[int] = None,
        width: Optional[int] = None,
        padding_mask_crop: Optional[int] = None,
        latents: Optional[torch.FloatTensor] = None,
        mask_image: PipelineImageInput = None,
        masked_image_latents: torch.FloatTensor = None,
        **kwargs,
    ):
        callback = kwargs.pop("callback", None)
        callback_steps = kwargs.pop("callback_steps", None)

        if callback is not None:
            deprecate(
                "callback",
                "1.0.0",
                "Passing `callback` as an input argument to `__call__` is deprecated, consider use `callback_on_step_end`",
            )
        if callback_steps is not None:
            deprecate(
                "callback_steps",
                "1.0.0",
                "Passing `callback_steps` as an input argument to `__call__` is deprecated, consider use `callback_on_step_end`",
            )

        # 0. Default height and width to unet
        height = height or self.unet.config.sample_size * self.vae_scale_factor
        width = width or self.unet.config.sample_size * self.vae_scale_factor

        # 1. Check inputs. Raise error if not correct
        self.check_inputs(
            prompt,
            strength,
            callback_steps,
            negative_prompt,
            prompt_embeds,
            negative_prompt_embeds,
            ip_adapter_image,
            ip_adapter_image_embeds,
            callback_on_step_end_tensor_inputs,
        )

        self._guidance_scale = guidance_scale
        self._clip_skip = clip_skip
        self._cross_attention_kwargs = cross_attention_kwargs
        self._interrupt = False

        # 2. Define call parameters
        if prompt is not None and isinstance(prompt, str):
            batch_size = 1
        elif prompt is not None and isinstance(prompt, list):
            batch_size = len(prompt)
        else:
            batch_size = prompt_embeds.shape[0]

        device = self._execution_device

        # 3. Encode input prompt
        text_encoder_lora_scale = (
            self.cross_attention_kwargs.get("scale", None) if self.cross_attention_kwargs is not None else None
        )
        prompt_embeds, negative_prompt_embeds = self.encode_prompt(
            prompt,
            device,
            num_images_per_prompt,
            self.do_classifier_free_guidance,
            negative_prompt,
            prompt_embeds=prompt_embeds,
            negative_prompt_embeds=negative_prompt_embeds,
            lora_scale=text_encoder_lora_scale,
            clip_skip=self.clip_skip,
        )
        # For classifier free guidance, we need to do two forward passes.
        # Here we concatenate the unconditional and text embeddings into a single batch
        # to avoid doing two forward passes
        if self.do_classifier_free_guidance:
            prompt_embeds = torch.cat([negative_prompt_embeds, prompt_embeds])

        if ip_adapter_image is not None or ip_adapter_image_embeds is not None:
            image_embeds = self.prepare_ip_adapter_image_embeds(
                ip_adapter_image,
                ip_adapter_image_embeds,
                device,
                batch_size * num_images_per_prompt,
                self.do_classifier_free_guidance,
            )

        # 4. Preprocess image
        if padding_mask_crop is not None:
            crops_coords = self.mask_processor.get_crop_region(mask_image, width, height, pad=padding_mask_crop)
            resize_mode = "fill"
        else:
            crops_coords = None
            resize_mode = "default"

        init_image = self.image_processor.preprocess(
            image, height=height, width=width, crops_coords=crops_coords, resize_mode=resize_mode
        )
        image = init_image.to(dtype=torch.float32)

        # 5. set timesteps
        timesteps, num_inference_steps = retrieve_timesteps(self.scheduler, num_inference_steps, device, timesteps)
        timesteps, num_inference_steps = self.get_timesteps(num_inference_steps, strength, device)
        latent_timestep = timesteps[:1].repeat(batch_size * num_images_per_prompt)

        # 6. Prepare latent variables
        num_channels_latents = self.vae.config.latent_channels
        num_channels_unet = self.unet.config.in_channels
        return_image_latents = num_channels_unet == 4

        is_strength_max = strength == 1.0

        latents_outputs = self.prepare_latents(
            batch_size * num_images_per_prompt,
            num_channels_latents,
            height,
            width,
            prompt_embeds.dtype,
            device,
            generator,
            latents,
            image=init_image,
            timestep=latent_timestep,
            is_strength_max=is_strength_max,
            return_noise=True,
            return_image_latents=return_image_latents,
        )

        if return_image_latents:
            latents, noise, image_latents = latents_outputs
        else:
            latents, noise = latents_outputs

        # 6.1 Prepare mask latent variables
        mask_condition = self.mask_processor.preprocess(
            mask_image, height=height, width=width, resize_mode=resize_mode, crops_coords=crops_coords
        )

        if masked_image_latents is None:
            masked_image = init_image * (mask_condition < 0.5)
        else:
            masked_image = masked_image_latents

        mask, masked_image_latents = self.prepare_mask_latents(
            mask_condition,
            masked_image,
            batch_size * num_images_per_prompt,
            height,
            width,
            prompt_embeds.dtype,
            device,
            generator,
            self.do_classifier_free_guidance,
        )

        # 6.2 Check that sizes of mask, masked image and latents match
        if num_channels_unet == 9:
            # default case for runwayml/stable-diffusion-inpainting
            num_channels_mask = mask.shape[1]
            num_channels_masked_image = masked_image_latents.shape[1]
            if num_channels_latents + num_channels_mask + num_channels_masked_image != self.unet.config.in_channels:
                raise ValueError(
                    f"Incorrect configuration settings! The config of `pipeline.unet`: {self.unet.config} expects"
                    f" {self.unet.config.in_channels} but received `num_channels_latents`: {num_channels_latents} +"
                    f" `num_channels_mask`: {num_channels_mask} + `num_channels_masked_image`: {num_channels_masked_image}"
                    f" = {num_channels_latents+num_channels_masked_image+num_channels_mask}. Please verify the config of"
                    " `pipeline.unet` or your `mask_image` or `image` input."
                )
        elif num_channels_unet != 4:
            raise ValueError(
                f"The unet {self.unet.__class__} should have either 4 or 9 input channels, not {self.unet.config.in_channels}."
            )

        # 7. Prepare extra step kwargs. TODO: Logic should ideally just be moved out of the pipeline
        extra_step_kwargs = self.prepare_extra_step_kwargs(generator, eta)

        # 7.1 Add image embeds for IP-Adapter
        added_cond_kwargs = (
            {"image_embeds": image_embeds}
            if ip_adapter_image is not None or ip_adapter_image_embeds is not None
            else None
        )

        # 7.2 Optionally get Guidance Scale Embedding
        timestep_cond = None
        if self.unet.config.time_cond_proj_dim is not None:
            guidance_scale_tensor = torch.tensor(self.guidance_scale - 1).repeat(batch_size * num_images_per_prompt)
            timestep_cond = self.get_guidance_scale_embedding(
                guidance_scale_tensor, embedding_dim=self.unet.config.time_cond_proj_dim
            ).to(device=device, dtype=latents.dtype)

        # 8. Denoising loop
        num_warmup_steps = len(timesteps) - num_inference_steps * self.scheduler.order
        self._num_timesteps = len(timesteps)
        with self.progress_bar(total=num_inference_steps) as progress_bar:
            for i, t in enumerate(timesteps):
                if self.interrupt:
                    continue

                # expand the latents if we are doing classifier free guidance
                latent_model_input = torch.cat([latents] * 2) if self.do_classifier_free_guidance else latents
                latent_model_input = self.scheduler.scale_model_input(latent_model_input, t)

                if num_channels_unet == 9:
                    latent_model_input = torch.cat([latent_model_input, mask, masked_image_latents], dim=1)

                # predict the noise residual
                noise_pred = self.unet(
                    latent_model_input,
                    t,
                    encoder_hidden_states=prompt_embeds,
                    timestep_cond=timestep_cond,
                    cross_attention_kwargs=self.cross_attention_kwargs,
                    added_cond_kwargs=added_cond_kwargs,
                    return_dict=False,
                )[0]

                # perform guidance
                if self.do_classifier_free_guidance:
                    noise_pred_uncond, noise_pred_text = noise_pred.chunk(2)
                    noise_pred = noise_pred_uncond + self.guidance_scale * (noise_pred_text - noise_pred_uncond)

                # compute the previous noisy sample x_t -> x_t-1
                latents = self.scheduler.step(noise_pred, t, latents, **extra_step_kwargs, return_dict=False)[0]

                if num_channels_unet == 4:
                    init_latents_proper = image_latents
                    if self.do_classifier_free_guidance:
                        init_mask, _ = mask.chunk(2)
                    else:
                        init_mask = mask

                    if i < len(timesteps) - 1:
                        noise_timestep = timesteps[i + 1]
                        init_latents_proper = self.scheduler.add_noise(
                            init_latents_proper, noise, torch.tensor([noise_timestep])
                        )

                    latents = (1 - init_mask) * init_latents_proper + init_mask * latents

                if callback_on_step_end is not None:
                    callback_kwargs = {}
                    for k in callback_on_step_end_tensor_inputs:
                        callback_kwargs[k] = locals()[k]
                    callback_outputs = callback_on_step_end(self, i, t, callback_kwargs)

                    latents = callback_outputs.pop("latents", latents)
                    prompt_embeds = callback_outputs.pop("prompt_embeds", prompt_embeds)
                    negative_prompt_embeds = callback_outputs.pop("negative_prompt_embeds", negative_prompt_embeds)

                # call the callback, if provided
                if i == len(timesteps) - 1 or ((i + 1) > num_warmup_steps and (i + 1) % self.scheduler.order == 0):
                    progress_bar.update()
                    if callback is not None and i % callback_steps == 0:
                        step_idx = i // getattr(self.scheduler, "order", 1)
                        callback(step_idx, t, latents)

        if not output_type == "latent":
            image = self.vae.decode(latents / self.vae.config.scaling_factor, return_dict=False, generator=generator)[
                0
            ]
            image, has_nsfw_concept = self.run_safety_checker(image, device, prompt_embeds.dtype)
        else:
            image = latents
            has_nsfw_concept = None

        if has_nsfw_concept is None:
            do_denormalize = [True] * image.shape[0]
        else:
            do_denormalize = [not has_nsfw for has_nsfw in has_nsfw_concept]

        image = self.image_processor.postprocess(image, output_type=output_type, do_denormalize=do_denormalize)

        # Offload all models
        self.maybe_free_model_hooks()

        if not return_dict:
            return (image, has_nsfw_concept)

        return StableDiffusionPipelineOutput(images=image, nsfw_content_detected=has_nsfw_concept)

In [None]:
my_pipeline = myStableDiffusionImg2ImgPipeline.from_pretrained(
    "stabilityai/stable-diffusion-2-inpainting",
    torch_dtype=torch.float16,
    use_safetensors = True,
    safety_checker=None
)
my_pipeline.unet.load_attn_procs("../scripts/finetune_lora_pq")
my_pipeline.to('cuda')

inpaint_pipeline = my_pipeline

In [None]:
# my_pipeline = myStableDiffusionImg2ImgPipeline.from_pretrained(
#     'runwayml/stable-diffusion-v1-5', torch_dtype=torch.float16, variant='fp16', use_safetensors=True, safety_checker=None
# ).to('cuda')
# my_pipeline.enable_model_cpu_offload()

# inpaint_pipeline = StableDiffusionInpaintPipeline.from_pretrained(
#     'runwayml/stable-diffusion-v1-5', torch_dtype=torch.float16, variant='fp16', use_safetensors=True, safety_checker=None
# ).to('cuda')
# inpaint_pipeline.enable_model_cpu_offload()

In [None]:
prompt = 'Equirectangular environment map'
negative_prompt = 'boxes, artifacts'


def load_exr(filename):
    exr = OpenEXR.InputFile(filename)
    dw = exr.header()['dataWindow']
    width = dw.max.x - dw.min.x + 1
    height = dw.max.y - dw.min.y + 1

    img = np.zeros((height, width, 3), dtype=np.float32)
    for i, c in enumerate('RGB'):
        buffer = exr.channel(c, PixelType(OpenEXR.FLOAT))
        img[:, :, i] = np.frombuffer(buffer, dtype=np.float32).reshape(height, width)

    exr.close()
    return img


def save_exr(img, filename):
    height, width, _ = img.shape
    header = OpenEXR.Header(width, height)
    exr = OpenEXR.OutputFile(filename, header)

    r, g, b = np.split(img, 3, axis=-1)
    exr.writePixels({'R': r.tobytes(),
	                 'G': g.tobytes(),
	                 'B': b.tobytes()})
    exr.close()


def rgb_to_srgb(rgb):
    srgb = np.where(rgb <= 0.0031308,
                  12.92 * rgb,
                  (1 + 0.055) * np.power(rgb, 1 / 2.4) - 0.055)
    # srgb = np.clip(srgb * 255, 0, 255).astype(np.uint8)
    return srgb


def srgb_to_rgb(srgb):
    rgb = np.where(srgb <= 0.04045,
                   srgb / 12.92,
                   np.power((srgb + 0.055) / (1 + 0.055), 2.4))
    return rgb


def rgb_to_hlg(rgb):
    hlg = np.where(rgb <= 1.0,
                   0.5 * np.sqrt(rgb),
                   0.17883277 * np.log(rgb - 0.28466892) + 0.55991073)
    return hlg


def hlg_to_rgb(hlg):
    rgb = np.where(hlg <= 0.5,
                   np.square(2.0 * hlg),
                   np.exp((hlg - 0.55991073) / 0.17883277) + 0.28466892)
    return rgb


def luminance(rgb):
    return rgb @ np.asarray((0.2126, 0.7152, 0.0722), dtype=np.float32)


def rgb_to_reinhard(rgb, white_lum=100):
    lum = luminance(rgb)
    numerator = lum * (1.0 + lum / white_lum ** 2)
    new_lum = numerator / (1.0 + lum)

    return rgb * np.where(lum != 0, (new_lum / lum), 0)[:, :, None]


def reinhard_to_rgb(reinhard, white_lum=100):
    new_lum = luminance(reinhard)

    # Solve the quadratic equation
    a = 1.0 / white_lum ** 2
    b = 1.0 - new_lum
    c = -new_lum

    discriminant = np.sqrt(b ** 2 - 4 * a * c)
    lum = (-b + discriminant) / (2 * a)

    # Scale reinhard_rgb back to original rgb
    return reinhard * np.where(new_lum != 0, (lum / new_lum), 0)[:, :, None]

def pq_to_rgb(rgb: np.ndarray, mul: int = 10000) -> np.ndarray:
    m1 = 0.1593017578125
    m2 = 78.84375
    c1 = 0.8359375
    c2 = 18.8515625
    c3 = 18.6875

    ret = None
    with np.errstate(divide='ignore', invalid='ignore'):
        E_p = rgb ** (1/m2)
        par = np.maximum((E_p - c1), 0) / (c2 - c3 * E_p)
        ret = mul * (par ** (1/m1))
        ret[np.isinf(ret)] = 0
        ret[np.isnan(ret)] = 0
    
    return np.clip(ret, 0, 1)

def rgb_to_pq(pq: np.ndarray, div: int = 10000) -> np.ndarray:
    m1 = 0.1593017578125
    m2 = 78.84375
    c1 = 0.8359375
    c2 = 18.8515625
    c3 = 18.6875

    ret = None
    with np.errstate(divide='ignore', invalid='ignore'):
        Y = pq / div
        ret = ((c1 + c2 * (Y ** m1)) / (1 + c3 * (Y ** m1))) ** m2
        ret[np.isinf(ret)] = 0
        ret[np.isnan(ret)] = 0

    return ret

#### Autoselezionare le maschere con il metodo 

In [None]:
# Setto dimensioni dell'immagine env
ENV_HEIGHT, ENV_WIDTH = 256, 512

# Carico Array di tutte le combinazioni della scena 3d di blender
T_bsr = bsr_array(load_npz("../T_bsr.npz"))
T = T_bsr.toarray()

# Carico l'exr dell'immagine da inpaintare ORIGINALE senza modifiche
L = load_exr("../resources/meadow_2_90deg.exr")
L = L[:ENV_HEIGHT // 2, :, :].reshape(-1, 3)

B = T_bsr @ L * 1e-4

# TODO: Come mai ha bisogno di coeff_slices?
# B = B.reshape(IMG_HEIGHT, IMG_WIDTH, 3)
# coeffs = pywt.array_to_coeffs(B, coeff_slices, output_format='wavedec2')
# B = pywt.waverec2(coeffs, 'haar', axes=(0, 1))

# Carico l'immagine dove seleziono con il colore ROSSO l'ombra e VERDE il resto
stroke_img = load_exr(f"../resources/stroke.exr").reshape(-1, 3)
in_stroke = (1, 0, 0) # RED per l'ombra
out_stroke = (0, 1, 0) # GREEN per il resto

# Seleziono i pixel che sono rossi e verdi
X_in = np.all(stroke_img == in_stroke, axis=-1)
X_out = np.all(stroke_img == out_stroke, axis=-1)

T_in = np.mean(T[X_in, :], axis=0)
T_out = np.mean(T[X_out, :], axis=0)
delta = T_out[:, None] * L - T_in[:, None] * L

# Carichiamo l'albedo dell'immagine
p = load_exr('../resources/albedo.exr').reshape(-1, 3)

L_avg = np.mean(L, axis=0)
p_avg = np.mean(p, axis=0)

L_f = np.any(delta > 0.8 * L_avg * p_avg, axis=-1)
L_b = ~L_f

env = L.copy()
env[L_f, :] = (1, 0, 0)
ENV_HEIGHT, ENV_WIDTH = 256, 256

f = np.zeros_like(L, dtype=np.float32)
b = np.zeros_like(L, dtype=np.float32)
mask = np.zeros_like(L, dtype=np.float32)

f[L_f, :] = L[L_f, :]
b[L_b, :] = L[L_b, :]

f_edit = np.roll(f.reshape(ENV_HEIGHT, ENV_WIDTH, 3), -ENV_WIDTH // 2, axis=1).reshape(-1, 3)
L_edit = np.where(f_edit > 0, f_edit, b)

mask[L_f, :] = 1.0
mask[f_edit > 0] = 1.0

In [None]:


mask_img2img = Image.open('../resources/mask_img2img.png')
mask_inpaint = Image.open('../resources/mask_inpaint.png')

mask_img2img = inpaint_pipeline.mask_processor.blur(mask_img2img, blur_factor=10)
mask_inpaint = inpaint_pipeline.mask_processor.blur(mask_inpaint, blur_factor=10)

In [None]:
envmap = load_exr('../resources/env_edit.exr')
mask_img2img = Image.open('../resources/mask_img2img.png')
mask_inpaint = Image.open('../resources/mask_inpaint.png')

mask_img2img = inpaint_pipeline.mask_processor.blur(mask_img2img, blur_factor=10)
mask_inpaint = inpaint_pipeline.mask_processor.blur(mask_inpaint, blur_factor=10)

make_image_grid([Image.fromarray(np.clip(rgb_to_srgb(envmap) * 255, 0, 255).astype(np.uint8)),
                  mask_img2img, mask_inpaint], rows=1, cols=3)

In [None]:
# PQ
init_image: np.ndarray = rgb_to_pq(envmap)
image = my_pipeline(prompt, image=init_image, mask_image=mask_img2img, negative_prompt=negative_prompt,
                    height=256, width=512, strength=0.6, output_type='np').images[0]
image = inpaint_pipeline(prompt, image=image, mask_image=mask_inpaint, negative_prompt=negative_prompt,
                    height=256, width=512, strength=0.6, output_type='np').images[0]

save_exr(pq_to_rgb(image), './pq.exr')

In [None]:
# Linear
init_image = envmap

image = my_pipeline(prompt, image=init_image, mask_image=mask_img2img, negative_prompt=negative_prompt,
                    height=256, width=512, strength=0.5, output_type='np').images[0]
# image = inpaint_pipeline(prompt, image=image, mask_image=mask_inpaint, negative_prompt=negative_prompt,
#                     height=256, width=512, strength=0.5, output_type='np').images[0]

In [None]:
save_exr(image, '../out/diffuse-res/linear.exr')
display(Image.fromarray(np.clip(image * 255, 0, 255).astype(np.uint8)).resize((1024, 512)))

In [None]:
# sRGB without clipping
init_image = rgb_to_srgb(envmap)

image = my_pipeline(prompt, image=init_image, mask_image=mask_img2img, negative_prompt=negative_prompt,
                    height=256, width=512, strength=0.5, output_type='np').images[0]
# image = inpaint_pipeline(prompt, image=image, mask_image=mask_inpaint, negative_prompt=negative_prompt,
#                     height=256, width=512, strength=0.5, output_type='np').images[0]

In [None]:
save_exr(srgb_to_rgb(image), '../out/diffuse-res/srgb_no_clipping.exr')
display(Image.fromarray(np.clip(image * 255, 0, 255).astype(np.uint8)).resize((1024, 512)))

In [None]:
# sRGB
init_image = np.clip(rgb_to_srgb(envmap), 0, 1)

image = my_pipeline(prompt, image=init_image, mask_image=mask_img2img, negative_prompt=negative_prompt,
                    height=256, width=512, strength=0.5, output_type='np').images[0]
# image = inpaint_pipeline(prompt, image=image, mask_image=mask_inpaint, negative_prompt=negative_prompt,
#                     height=256, width=512, strength=0.5, output_type='np').images[0]

In [None]:
save_exr(srgb_to_rgb(image), '../out/diffuse-res/srgb.exr')
display(Image.fromarray(np.clip(image * 255, 0, 255).astype(np.uint8)).resize((1024, 512)))

In [None]:
# sRGB with low exposure
exposure = -4
init_image = np.clip(rgb_to_srgb(envmap * np.exp2(exposure)), 0, 1)

image = my_pipeline(prompt, image=init_image, mask_image=mask_img2img, negative_prompt=negative_prompt,
                    height=256, width=512, strength=0.5, output_type='np').images[0]
# image = inpaint_pipeline(prompt, image=image, mask_image=mask_inpaint, negative_prompt=negative_prompt,
#                     height=256, width=512, strength=0.5, output_type='np').images[0]

In [None]:
save_exr(srgb_to_rgb(image) / np.exp2(exposure), '../out/diffuse-res/srgb_low_exposure.exr')
display(Image.fromarray(np.clip(image * 255, 0, 255).astype(np.uint8)).resize((1024, 512)))

In [None]:
# HLG
init_image = np.clip(rgb_to_hlg(envmap), 0, 1)

image = my_pipeline(prompt, image=init_image, mask_image=mask_img2img, negative_prompt=negative_prompt,
                    height=256, width=512, strength=0.5, output_type='np').images[0]
# image = inpaint_pipeline(prompt, image=image, mask_image=mask_inpaint, negative_prompt=negative_prompt,
#                     height=256, width=512, strength=0.5, output_type='np').images[0]

In [None]:
save_exr(hlg_to_rgb(image), '../out/diffuse-res/hlg.exr')
image = rgb_to_srgb(hlg_to_rgb(image))
display(Image.fromarray(np.clip(image * 255, 0, 255).astype(np.uint8)).resize((1024, 512)))

In [None]:
# HLG with low exposure
exposure = -4

init_image = np.clip(rgb_to_hlg(envmap * np.exp2(exposure)), 0, 1)

image = my_pipeline(prompt, image=init_image, mask_image=mask_img2img, negative_prompt=negative_prompt,
                    height=256, width=512, strength=0.5, output_type='np').images[0]
# image = inpaint_pipeline(prompt, image=image, mask_image=mask_inpaint, negative_prompt=negative_prompt,
#                     height=256, width=512, strength=0.5, output_type='np').images[0]

In [None]:
save_exr(hlg_to_rgb(image) / np.exp2(exposure), '../out/diffuse-res/hlg_low_exposure.exr')
image = rgb_to_srgb(hlg_to_rgb(image))
display(Image.fromarray(np.clip(image * 255, 0, 255).astype(np.uint8)).resize((1024, 512)))

In [None]:
# Reinhard
init_image = np.clip(rgb_to_reinhard(envmap), 0, 1)

image = my_pipeline(prompt, image=init_image, mask_image=mask_img2img, negative_prompt=negative_prompt,
                    height=256, width=512, strength=0.5, output_type='np').images[0]
# image = inpaint_pipeline(prompt, image=image, mask_image=mask_inpaint, negative_prompt=negative_prompt,
#                     height=256, width=512, strength=0.5, output_type='np').images[0]

In [None]:
save_exr(reinhard_to_rgb(image), '../out/diffuse-res/reinhard.exr')
image = rgb_to_srgb(reinhard_to_rgb(image))
display(Image.fromarray(np.clip(image * 255, 0, 255).astype(np.uint8)).resize((1024, 512)))

In [None]:
# sRGB with SRiTMO
init_image = np.clip(rgb_to_srgb(envmap), 0, 1)

image = my_pipeline(prompt, image=init_image, mask_image=mask_img2img, negative_prompt=negative_prompt,
                    height=256, width=512, strength=0.5, output_type='np').images[0]
# image = inpaint_pipeline(prompt, image=image, mask_image=mask_inpaint, negative_prompt=negative_prompt,
#                     height=256, width=512, strength=0.5, output_type='np').images[0]

image = torch.from_numpy(image).permute(2, 0, 1).unsqueeze(0).to('cuda') * 2 - 1
ldr, hdr = SRiTMO(image, {'sritmo': '../ext/sritmo.pth', 'sr_factor': 1, 'device': 'cuda'})  # output is in BGR format

In [None]:
hdr = hdr.squeeze(0).permute(1, 2, 0)[:, :, [2, 1, 0]].cpu().numpy()
save_exr(hdr, '../out/diffuse-res/srgb_sritmo.exr')

ldr = ldr.squeeze(0).permute(1, 2, 0)[:, :, [2, 1, 0]].cpu().numpy() / 2 + 0.5
display(Image.fromarray(np.clip(ldr * 255, 0, 255).astype(np.uint8)).resize((1024, 512)))