In [1]:
import tqdm
import random
import warnings

warnings.filterwarnings("ignore")

import numpy as np
import pandas as pd

import torch
from diffusers import StableDiffusionPipeline, LMSDiscreteScheduler
from transformers import CLIPProcessor, CLIPModel

from PIL import Image

In [2]:
@torch.no_grad()
def eraseModel(model, prevPrompts, newPrompts, retainPrompts=None, 
               lamb=0.1, eraseScale=0.1, preserveScale=0.1, withKey=True):
    
    """
    prevPrompts에 해당하는 개념을 newPrompts로 편집한 모델 반환.

    Parameters:
        model (StableDiffusionPipeline): 편집할 모델.
        prevPrompts (List[str]): 원래 개념이 저장된 string 리스트.
        newPrompts (List[str]): 대체할 개념이 저장된 string 리스트.
        retainPrompts (List[str] | None): 보존할 개념이 저장된 string 리스트. Default: None
        lamb (float): 정규화 강도. Default: 0.1
        eraseScale (float): 개념 편집 강도. Default: 0.1
        preserveScale (float): 개념 보존 강도. Default: 0.1
        withKey (bool): key 가중치 업데이트 여부. Default: True

    Returns:
        model (StableDiffusionPipeline)
    """

    device = model.device

    # 주어진 모델 unet의 cross-attention layer를 caLayers에 저장.
    caLayers = []
    for name, module in model.unet.named_modules():
        # attn2로 끝나는 모듈이 cross-attention layer.
        if name[-5:] != "attn2": continue
        caLayers.append(module)

    # value projection layer를 targetLayers에 저장. 논문의 W^old에 해당하는 부분.
    # (value projection layer): Linear(in_features=1024, out_features=320, bias=False)
    # "stabilityai/stable-diffusion-2-1-base" 모델이 기준. "CompVis/stable-diffusion-v1-4" 모델은 1024 대신 768을 사용.
    valueLayers = [layer.to_v for layer in caLayers]
    targetLayers = valueLayers

    # withKey=True라면 key projection layer도 추가.
    if withKey: 
        # (key projection layer): Linear(in_features=1024, out_features=320, bias=False)
        keyLayers = [layer.to_k for layer in caLayers]
        targetLayers += keyLayers
    
    # 텍스트 prevPrompts를 텍스트 임베딩 prevEmbds으로 변환. prevEmbds의 원소는 논문의 c_i에 대응됨.
    # (prevPrompts): (N,)
    # ex) N = 2; prevPrompts = ["red rose", "blue rose"]
    # (prevEmbds): (N, 1024, 77)
    prevInputs = model.tokenizer(prevPrompts, padding="max_length", max_length=77, truncation=True, return_tensors="pt").input_ids.to(device)
    prevEmbds = model.text_encoder(prevInputs)[0].permute(0, 2, 1)

    # 마찬가지로 newEmbds 생성. newEmbds의 원소는 논문의 c*_i에 대응됨.
    # (newPrompts): (N,)
    # (newEmbds): (N, 1024, 77)
    newInputs = model.tokenizer(newPrompts, padding="max_length", max_length=77, truncation=True, return_tensors="pt").input_ids.to(device)
    newEmbds = model.text_encoder(newInputs)[0].permute(0, 2, 1)

    # 논문에서 제시한 closed-form solution은 W = [sum{(W^old)(c*_i)(c_i)^T}+lambda*(W^old)][sum{(c_i)(c_i)^T)}+lambda*(I)]^(-1).
    # W의 첫번째 대괄호 부분이 m1, 두번째 대괄호가 m2. 즉 W = [m1][m2]^(-1).

    # m1 = (W^old)[sum{(c*_i)(c_i)^T}+lambda*(I)].
    # m1의 대괄호 부분이 m3. 즉 m1 = (W^old)[m3].

    # m2 = [sum{(c_i)(c_i)^T)}+lambda*(I)].
    # (m2): (1024, 1024)
    m2 = (prevEmbds @ prevEmbds.permute(0, 2, 1)).sum(0) * eraseScale
    m2 += lamb * torch.eye(m2.shape[0], device=device)

    # m3 = [sum{(c*_i)(c_i)^T}+lambda*(I)].
    # (m3): (1024, 1024)
    m3 = (newEmbds @ prevEmbds.permute(0, 2, 1)).sum(0) * eraseScale
    m3 += lamb * torch.eye(m3.shape[0], device=device)

    # retainPrompts가 있다면 m2와 m3에 sum{(c_j)(c_j)^T} 추가
    if retainPrompts:
        # retainEmbds 생성. retainEmbds의 원소는 논문의 c*_j에 대응됨.
        # (retainPrompts): (M,)
        # (retainEmbds): (M, 1024, 77)
        retainInputs = model.tokenizer(retainPrompts, padding="max_length", max_length=77, truncation=True, return_tensors="pt").input_ids.to(device)
        retainEmbds = model.text_encoder(retainInputs)[0].permute(0, 2, 1)

        m2 += (retainEmbds @ retainEmbds.permute(0, 2, 1)).sum(0) * preserveScale
        m3 += (retainEmbds @ retainEmbds.permute(0, 2, 1)).sum(0) * preserveScale

    for targetLayer in targetLayers:
        # (m1): (320, 1024)
        # (targetLayer.weight): (320, 1024)
        m1 = targetLayer.weight @ m3
        targetLayer.weight = torch.nn.Parameter((m1 @ torch.inverse(m2)).detach())

    return model

Loading pipeline components...:   0%|          | 0/7 [00:00<?, ?it/s]

In [None]:
#erase
if __name__ == "__main__":

    concept_type = "art"
    concepts = "Kelly Mckernan, Sarah Anderson"
    guided_concepts = "art"
  
    concepts = [c.strip() for c in concepts.split(',')]

    if concept_type == "art":
        prompts = ["painting by ", "art by ", "artwork by ", "picture by ", "style of ", ""]
    elif concept_type == "object":
        prompts = ["image of ", "photo of ", "portrait of ", "picture of ", "painting of ", ""]
    else:
        prompts = [""]

    prevPrompts = []
    for concept in concepts:
        for prompt in prompts:
            prevPrompts.append(prompt + concept)

    newPrompts = []
    if guided_concepts:
        for concept in [guided_concepts] * len(concepts):
            for prompt in prompts:
                newPrompts.append(prompt + concept)
    else:
        newPrompts = [' '] * len(prevPrompts)
    
    lamb = 0.5
    device = "cuda:0" if torch.cuda.is_available() else "cpu"

    model = StableDiffusionPipeline.from_pretrained("stabilityai/stable-diffusion-2-1-base")
    model = model.to(device)
    model = eraseModel(model=model, prevPrompts=prevPrompts, newPrompts=newPrompts, lamb=lamb,)

    torch.save(model.unet.state_dict(), "model/erase.pt")

In [3]:
#moderate
if __name__ == "__main__":

    concept_type = "unsafe"
    concepts = "violence, nudity, harm"
    guided_concepts = None
  
    concepts = [c.strip() for c in concepts.split(',')]

    if concept_type == "art":
        prompts = ["painting by ", "art by ", "artwork by ", "picture by ", "style of ", ""]
    elif concept_type == "object":
        prompts = ["image of ", "photo of ", "portrait of ", "picture of ", "painting of ", ""]
    else:
        prompts = [""]

    prevPrompts = []
    for concept in concepts:
        for prompt in prompts:
            prevPrompts.append(prompt + concept)

    newPrompts = []
    if guided_concepts:
        for concept in [guided_concepts] * len(concepts):
            for prompt in prompts:
                newPrompts.append(prompt + concept)
    else:
        newPrompts = [' '] * len(prevPrompts)
    
    lamb = 0.5
    device = "cuda:0" if torch.cuda.is_available() else "cpu"

    model = StableDiffusionPipeline.from_pretrained("stabilityai/stable-diffusion-2-1-base")
    model = model.to(device)
    model = eraseModel(model=model, prevPrompts=prevPrompts, newPrompts=newPrompts, lamb=lamb,)

    torch.save(model.unet.state_dict(), "model/moderate.pt")

Loading pipeline components...:   0%|          | 0/7 [00:00<?, ?it/s]

In [2]:
@torch.no_grad()
def debiasModel(model, prevPrompts, newPrompts, lamb=0.1, scale=0.1, withKey=True):
    
    """
    prevPrompts에 해당하는 concept이 newPrompts에 해당하는 attribute에 대하여 debiasing된 모델 반환.

    Parameters:
        model (StableDiffusionPipeline): debias할 모델.
        prevPrompts (List[str]): debias할 concept을 저장하고 있는 텍스트 리스트.
        newPrompts (List[List[str]]): debias 대상 attribute를 저장하고 있는 텍스트 리스트의 리스트
        lamb (float): 정규화 강도. Default: 0.1
        scale (float): debiasing 강도. Default: 0.1
        withKey (bool): key weight 업데이트 여부. Default: True

    Returns:
        model (StableDiffusionPipeline)
    """

    device = model.device

    # SD 모델 unet의 모든 cross-attention layer를 caLayers에 저장.
    caLayers = []
    for name, module in model.unet.named_modules():
        # attn2가 cross-attention를 의미함.
        if name[-5:] != "attn2": continue
        caLayers.append(module)

    # cross-attention layer의 value 부분을 valueLayers에 저장.
    valueLayers = [layer.to_v for layer in caLayers]
    # withKey 옵션이 켜져 있다면 key 부분도 추가함.
    if withKey: valueLayers + [layer.to_k for layer in caLayers]

    # 텍스트 리스트인 prevPrompts를 텍스트 임베딩인 prevEmbds으로 변환.
    prevInputs = model.tokenizer(prevPrompts, padding="max_length", max_length=77, truncation=True, return_tensors="pt").input_ids.to(device)
    # prevEmbds는 (N, 768, 77) 형태이며 각 임베딩은 논문의 c_i에 대응됨. (N은 prevPrompts의 길이)
    prevEmbds = model.text_encoder(prevInputs)[0].permute(0, 2, 1)

    # eraseModel과 거의 같음.
    # 다만 m3 = sum{(c_i^*)(c_i)^T}+lambda*(I)의 c_i^* 부분 대신 [(c_i)+sum{alpha/|(W_old)(a)|*|(W_old)(c_i)|*(a)}]이 사용됨.
    # (a는 newPrompts의 원소인 newPrompt의 각 임배딩, alpha는 각 임베딩의 가중치)

    # m2 = sum((c_i)(c_i)^T)}+lambda*(I)
    m2 = (prevEmbds @ prevEmbds.permute(0, 2, 1)).sum(0) * scale
    m2 += lamb * torch.eye(m2.shape[1], device=device)

    clip_model = CLIPModel.from_pretrained("openai/clip-vit-base-patch32")
    clip_processor = CLIPProcessor.from_pretrained("openai/clip-vit-base-patch32")
    generator = torch.Generator(device=device)

    # ratio 차이가 alpha에 반영되는 비율.
    eta = 0.1
    threshold = 0.05
    alphas = [torch.zeros(len(t)) for t in newPrompts]
    targetRatios = [torch.ones(len(t)) / len(t) for t in newPrompts]
    # 각 prevPrompt에 해당하는 alpha의 업데이트가 필요한지 나타냄.
    check = [0] * len(prevPrompts)
    for _ in range(30):
        for idx in range(len(prevPrompts)):

            # alpha를 업데이트하지 않고 모두 0으로 바꾸어 value weight 업데이트가 일어나지 않도록 함.
            if check[idx]: 
                alphas[idx] *= 0
                continue
        
            prevPrompt = prevPrompts[idx]
            newPrompt = newPrompts[idx]

            # SD model로 prevPrompt에 해당하는 이미지 50개 생성
            images = model(prevPrompt, num_images_per_prompt=50, num_inference_steps=20, generator=generator).images
            
            # score는 생성된 이미지 50개와 텍스트 리스트인 newPrompt 사이의 유사도 점수로 (50, M) 형태. (M은 newPrompt의 길이)
            score = clip_model(**clip_processor(text=newPrompt, images=images, return_tensors="pt", padding=True)).logits_per_image

            # 각 이미지에 대해 가장 높은 유사도를 가진 점수를 1. 나머지를 0.으로 변환하여 각 이미지에 대해 평균을 냄.
            # 즉 ratio는 prevPrompt로 생성된 이미지가 newPrompt의 각 prompt에 해당할 확률을 나타냄.
            ratio = score.ge(score.max(1)[0].view(-1,1)).float().mean(0)
            # 각 prompt에 해당할 확률이 동일하기를 원하기 때문에 targetRatio와의 차이가 반영 정도가 됨.
            alpha = (eta * (targetRatios[idx] - ratio)).to(device)
            alphas[idx] = alpha

            # ratio와 targetratio와의 차이가 threshold 보다 작다면 더 이상 업데이트가 필요하지 않음.
            if ratio.abs().max() < threshold: check[idx] = 1

        # 모든 prompt에 대해서 업데이트가 필요하지 않다면 루프를 종료함.
        if sum(check) == len(prevPrompts): break

        # [(c_i)+sum{alpha/|(W_old)(a)|*|(W_old)(c_i)|*(a)}]
        for valueLayer in valueLayers:
            reEmbds = []
            for idx in range(len(prevPrompts)):
        
                alpha = alphas[idx]
                newPrompt = newPrompts[idx]
                prevEmbd = prevEmbds[idx]

                newInput = model.tokenizer(newPrompt, padding="max_length", max_length=77, truncation=True, return_tensors="pt").input_ids.to(device)
                # newEmbd은 (M, 768, 77) 형태의 텍스트 임베딩.
                newEmbd = model.text_encoder(newInput)[0].permute(0, 2, 1)
                # norm은 (M,) 형태의 벡터로 alpha/|(W_old)(a)|*|(W_old)(c_i)|를 나타냄.
                norm = alpha / (valueLayer.weight @ newEmbd).norm(dim=[1,2]) * (valueLayer.weight @ prevEmbd).norm()
                
                # reEmbd은 (768, 77) 형태의 텍스트 임베딩으로 c_i + sum{alpha/|(W_old)(a)|*|(W_old)(c_i)|*(a)}를 나타냄.
                reEmbd = prevEmbd + (norm.view(-1, 1, 1) * newEmbd).sum(0)
                reEmbds.append(reEmbd.unsqueeze(0))
            reEmbds = torch.concat(reEmbds, 0)

            m3 = (reEmbds @ prevEmbds.permute(0, 2, 1)).sum(0) * scale
            m3 += lamb * torch.eye(m3.shape[1], device=device)
            # m1 = (W_old)[m3]
            m1 = valueLayer.weight @ m3
            valueLayer.weight = torch.nn.Parameter((m1 @ torch.inverse(m2)).detach())
        
    return model

# debias
if __name__ == "__main__":

    concepts = "Doctor, Nurse, Carpenter"
    attributes = "male, female"

    concepts = [c.strip() for c in concepts.split(',')]
    attributes = [a.strip() for a in attributes.split(',')]

    prompts = ["image of ", "photo of ", "portrait of ", "picture of ", ""]
    
    prevPrompts = []
    newPrompts = []
    for prompt in prompts:
        for concept in concepts:
            prevPrompts.append(prompt + concept)
            newPrompt = []
            for attribute in attributes:
                newPrompt.append(prompt + attribute)
            newPrompts.append(newPrompt)

    lamb = 0.5
    device = "cuda:0" if torch.cuda.is_available() else "cpu"

    model = StableDiffusionPipeline.from_pretrained("CompVis/stable-diffusion-v1-4")
    model = model.to(device)
    model = debiasModel(model=model, prevPrompts=prevPrompts, newPrompts=newPrompts, lamb=lamb,)

    torch.save(model.unet.state_dict(), "model/debias.pt")

Loading pipeline components...:   0%|          | 0/7 [00:00<?, ?it/s]

  0%|          | 0/20 [00:00<?, ?it/s]

  0%|          | 0/20 [00:00<?, ?it/s]

  0%|          | 0/20 [00:00<?, ?it/s]

  0%|          | 0/20 [00:00<?, ?it/s]

  0%|          | 0/20 [00:00<?, ?it/s]

In [7]:
@torch.no_grad()
def generateImage(promptPath, modelVersion="1.4", sampleCount=10, imageSize=(512, 512), stepCount=100, guidanceScale=7.5):
    
    """
    주어진 prompt 파일에 해당하는 이미지를 생성하여 "image" 폴더에 저장.

    Parameters:
        promptPath (str): prompt 파일이 저장된 경로.
        modelVersion (str): 이미지 생성시 사용되는 모델의 버전. Default: "1.4"
        sampleCount (int): 각 prompt마다 생성할 이미지 개수. Default: 10
        imageSize (tuple): 생성할 이미지 크기. Default: (512, 512)
        stepCount (int): sampling 과정의 inference step 수. Default: 100
        guidanceScale (float): classifier-free guidance 강도. Default: 7.5

    Returns:
        None
    """

    if modelVersion == "1.4": modelVersion = "CompVis/stable-diffusion-v1-4"
    elif modelVersion == "2.1": modelVersion = "stabilityai/stable-diffusion-2-1-base"
    
    device = "cuda:0" if torch.cuda.is_available() else "cpu"

    # modelVersion에 따라 모델 선택.
    model = StableDiffusionPipeline.from_pretrained(modelVersion).to(device)
    vae, tokenizer, textEncoder, unet = model.vae, model.tokenizer, model.text_encoder, model.unet

    scheduler = LMSDiscreteScheduler(beta_start=0.00085, beta_end=0.012, beta_schedule="scaled_linear", num_train_timesteps=1000)

    promptDf = pd.read_csv(promptPath, index_col=0)
    # promptDf의 각 행 불러오기.
    for _, row in promptDf.iterrows():

        # B는 생성할 이미지 개수.
        B = sampleCount
        H, W = imageSize

        # classifier-free guidance를 위해 conditional과 uncoditional prompt가 필요함.
        prompts = [row.prompt] * B + [""] * B
        inputs = tokenizer(prompts, padding="max_length", max_length=77, truncation=True, return_tensors="pt").input_ids.to(device)
        # embds는 (2*B, 768, 77) 형태를 가지는 텍스트 임베딩.
        embds = textEncoder(inputs)[0]

        seed = row.evaluation_seed
        generator = torch.Generator()
        generator.manual_seed(seed)
        # 주어진 시드를 가지는 B개의 latent 생성.
        latents = torch.randn((B, unet.in_channels, H//8, W//8), generator=generator).to(device)
        latents *= scheduler.init_noise_sigma
        
        # inference step 수를 설정.
        scheduler.set_timesteps(stepCount)
        for t in tqdm.tqdm(scheduler.timesteps):

            # classifier-free guidance를 위해 uncoditional latent를 추가함.
            latentInputs = torch.cat([latents]*2)
            # latentInputs를 현재 timestep에 맞게 조정.
            latentInputs = scheduler.scale_model_input(latentInputs, timestep=t)

            # latent와 텍스트 임베딩을 사용하여 노이즈 생성.
            noises = unet(latentInputs, t, encoder_hidden_states=embds).sample
            # 앞쪽 B개의 노이즈는 conditional과, 뒤쪽 B개의 노이즈는 uncoditional.
            condNoises, uncondNoises = noises.chunk(2)
            # classifier-free guidance 적용.
            noises = uncondNoises + guidanceScale * (condNoises - uncondNoises)

            # 다음 latent 생성.
            latents = scheduler.step(noises, t, latents).prev_sample

        latents /= 0.18215
        # 최종 latent를 decoding하여 이미지 생성.
        images = vae.decode(latents).sample

        # images의 값을 [0,1]로 제한, 형태를 (B, C, H, W)에서 (B, H, W, C)로 변환.
        images = ((images + 1) / 2).clamp(0, 1).permute(0, 2, 3, 1)
        # numpy array 사용을 위해 cpu로 이동.
        images = images.detach().cpu().numpy()
        # [0,1] 범위의 float를 [0,255] 범위의 uint8로 변환.
        images = (images * 255).round().astype("uint8")
        # numpy array를 PIL Image로 변환.
        images = [Image.fromarray(image) for image in images]
        # images에 저장된 B개의 image 저장
        for idx, image in enumerate(images):
            image.save(f"image/{row.case_number}_{idx}.png")

if __name__ == "__main__":
    promptPath = "data/test_prompts.csv"
    generateImage(promptPath, sampleCount=2)

Loading pipeline components...:   0%|          | 0/7 [00:00<?, ?it/s]

100%|██████████| 100/100 [01:03<00:00,  1.57it/s]
100%|██████████| 100/100 [01:04<00:00,  1.55it/s]
