In [None]:
# This notebooks 'tests' the current codebase to see if it can grab a series of prompts, convert them to embeddings, save them, generate images from them, and run clip to get back the same prompts.

In [None]:
ENV_TYPE = "TEST"

if(ENV_TYPE != "TEST"):
  !git clone "https://github.com/kk-digital/kcg-ml-sd1p4.git"
  %cd kcg-ml-sd1p4
  !pip3 install -r requirements.txt
  exit()
else:
  %cd ..

In [None]:
!python3 ./download_models.py

In [None]:
!python3 ./process_models.py

In [None]:
import torch
import configparser
from typing import List
import os
import sys
import hashlib
import json
import math

from os.path import join

base_dir = "./"
sys.path.insert(0, base_dir)


from stable_diffusion.model.clip_text_embedder import CLIPTextEmbedder
from stable_diffusion.model.clip_image_encoder import CLIPImageEncoder
from stable_diffusion import StableDiffusion
from stable_diffusion.constants import IODirectoryTree
from stable_diffusion.utils_backend import (
    get_device,
    get_memory_status,
)
from stable_diffusion.utils_image import (
    to_pil,
    save_image_grid,
    show_image_grid,
)

# EMBEDDED_PROMPTS_DIR = os.path.abspath(join(base_dir, "/input/embedded_prompts/"))
EMBEDDED_PROMPTS_DIR = os.path.abspath(join(base_dir, "./input/embedded_prompts/"))
print(EMBEDDED_PROMPTS_DIR)
OUTPUT_DIR = os.path.abspath(
    join(base_dir, "./output/sd2-notebook/disturbed_embeddings/")
)
IMAGES_DIR = os.path.abspath(join(OUTPUT_DIR, "images/"))
FEATURES_DIR = os.path.abspath(join(OUTPUT_DIR, "features/"))
print(OUTPUT_DIR)
print(IMAGES_DIR)
print(FEATURES_DIR)
NULL_PROMPT = ""
PROMPT = (
    "A woman with flowers in her hair in a courtyard, in the style of Frank Frazetta"
)
NUM_ITERATIONS = 16
SEED = 2982
NOISE_MULTIPLIER = 0.01
BATCH_SIZE = 1


# DEVICE = input("Set device: 'cuda:i' or 'cpu'")
DEVICE = None
DEVICE = get_device()

config = configparser.ConfigParser(interpolation=configparser.ExtendedInterpolation())
config.read(os.path.join(base_dir, "config.ini"))
config['BASE']['BASE_DIRECTORY'] = base_dir
config["BASE"].get('base_io_directory')

pt = IODirectoryTree(base_io_directory_prefix = config["BASE"].get('base_io_directory_prefix'), base_directory=base_dir)
pt.create_directory_tree_folders()


os.makedirs(EMBEDDED_PROMPTS_DIR, exist_ok=True)
os.makedirs(OUTPUT_DIR, exist_ok=True)
os.makedirs(IMAGES_DIR, exist_ok=True)
os.makedirs(FEATURES_DIR, exist_ok=True)

In [2]:
def calculate_sha256(tensor):
    if tensor.device == "cpu":
        tensor_bytes = tensor.numpy().tobytes()  # Convert tensor to a byte array
    else:
        tensor_bytes = tensor.cpu().numpy().tobytes()  # Convert tensor to a byte array
    sha256_hash = hashlib.sha256(tensor_bytes)
    return sha256_hash.hexdigest()


def embed_and_save_prompts(prompts: list, null_prompt=NULL_PROMPT):
    null_prompt = null_prompt
    prompts = prompts

    clip_text_embedder = CLIPTextEmbedder(device=get_device())
    clip_text_embedder.load_submodels(**pt.embedder_submodels)

    null_cond = clip_text_embedder(null_prompt)
    torch.save(null_cond, join(EMBEDDED_PROMPTS_DIR, "null_cond.pt"))
    print(
        "Null prompt embedding saved at: ",
        f"{join(EMBEDDED_PROMPTS_DIR, 'null_cond.pt')}",
    )

    embedded_prompts = clip_text_embedder(prompts)
    torch.save(embedded_prompts, join(EMBEDDED_PROMPTS_DIR, "embedded_prompts.pt"))

    print(
        "Prompts embeddings saved at: ",
        f"{join(EMBEDDED_PROMPTS_DIR, 'embedded_prompts.pt')}",
    )

    get_memory_status()
    clip_text_embedder.to("cpu")
    del clip_text_embedder
    torch.cuda.empty_cache()
    get_memory_status()

    return embedded_prompts, null_cond

In [None]:
embedded_prompts, null_prompt = embed_and_save_prompts(PROMPT)
embedding = embedded_prompts

In [None]:
import numpy as np
import numpy
from numpy.linalg import norm

ep_tensor = embedded_prompts.cpu().detach()
ep = ep_tensor.numpy()

plist = []
for i in range(0,77):
    plist.append(ep[0][i])
    #print(ep[0][i])
    #break

print("Clip Embedding Vector:")
print("Norm of Difference of Successive Vectors")
print("")

for i in range(0,76):
    tmp_v = plist[i+1] - plist[i]
    #print(tmp_v)
    l2 = numpy.linalg.norm(tmp_v, ord=2)
    l1 = numpy.linalg.norm(tmp_v, ord=1)
    std = numpy.std(tmp_v)

    v1 = plist[i]
    v2 = plist[i+1]
    cos = np.dot(v1,v2)/(norm(v2)*norm(v2))
    print("i=", i, " l1= ",l1, " l2= ", l2, " cos= ", cos, " std= ", std)

In [None]:
import numpy as np
import numpy
from numpy.linalg import norm

ep_tensor = embedded_prompts.cpu().detach()
ep = ep_tensor.numpy()

plist = []
for i in range(0,77):
    plist.append(ep[0][i])
    #print(ep[0][i])
    #break

print("Clip Embedding Vector:")
print("Size and Norm of Vectors")
print("")

for i in range(0,77):
    v1 = plist[i]
    l1 = numpy.linalg.norm(v1, ord=1)
    l2 = numpy.linalg.norm(v1, ord=2)
    std = numpy.std(v1)
    std2 = std*std

    print("i=", i, " l1= ",l1, " l2= ", l2, " std= ", std)

In [None]:
# check the shape of the embedded prompts
embedding_shape = tuple(embedded_prompts.shape)
embedding_shape

In [None]:
# check mean and std to use the same for the noise generation
# one idea is to use one distribution per position (in the 77 positions)
# in this case we would check the mean and std along dimension 2
# embedded_prompts.mean(dim=2), embedded_prompts.std(dim=2)
embedding_mean, embedding_std = embedded_prompts.mean(), embedded_prompts.std()
embedding_mean, embedding_std

In [7]:
embedded_prompts.mean(dim=2), embedded_prompts.std(dim=2)
noise = torch.normal(mean=embedded_prompts.mean(dim=2), std=embedded_prompts.std(dim=2))

In [None]:
dist = torch.distributions.normal.Normal(
    loc=embedded_prompts.mean(dim=2), scale=embedded_prompts.std(dim=2)
)
noise = dist.sample(sample_shape=torch.Size([768])).permute(1, 0, 2).permute(0, 2, 1)
noise.shape

In [9]:
differences = []
differences_means = []
differences_stds = []
dot_products = []
for i, row in enumerate(embedded_prompts.squeeze()):
    if i == 0:
        continue
    diff = row - embedded_prompts.squeeze()[0]
    differences.append(diff)
    differences_means.append(diff.mean())
    differences_stds.append(diff.std())
    dot_products.append(torch.dot(row, embedded_prompts.squeeze()[0]))

In [10]:
differences = torch.stack(differences)
differences_means = torch.stack(differences_means)
differences_stds = torch.stack(differences_stds)
dot_products = torch.stack(dot_products)

In [None]:
differences_means

In [None]:
differences_stds

In [None]:
dot_products

In [None]:
# generate noise and add to the embedded prompt
generator = torch.Generator(device=DEVICE).manual_seed(SEED)
# noise = torch.normal(
#     mean=embedding_mean.item(),
#     std=embedding_std.item(),
#     size=embedding_shape,
#     device=DEVICE,
#     generator=generator,
# )
# noise.shape

dist = torch.distributions.normal.Normal(
    loc=embedded_prompts.mean(dim=2), scale=embedded_prompts.std(dim=2)
)
noise = dist.sample(sample_shape=torch.Size([768])).permute(1, 0, 2).permute(0, 2, 1)
noise.shape

In [15]:
embedding_e = embedded_prompts + 0.1 * noise

In [None]:
get_memory_status()

In [17]:
%%capture
sd = StableDiffusion(device=DEVICE)
sd.quick_initialize().load_autoencoder(**pt.autoencoder).load_decoder(**pt.decoder)
sd.model.load_unet(**pt.unet)

In [None]:
get_memory_status()

In [None]:
image = sd.generate_images_from_embeddings(
    seed=SEED, embedded_prompt=embedded_prompts, null_prompt=null_prompt
)

In [None]:
to_pil(image[0])

In [None]:
image_e = sd.generate_images_from_embeddings(
    seed=SEED, embedded_prompt=embedding_e, null_prompt=null_prompt
)

In [None]:
to_pil(image_e[0])

In [22]:
def generate_images_from_disturbed_embeddings(
    sd: StableDiffusion,
    embedded_prompt: torch.Tensor,
    device=DEVICE,
    seed=SEED,
    num_iterations=NUM_ITERATIONS,
    noise_multiplier=NOISE_MULTIPLIER,
    batch_size=BATCH_SIZE,
):
    generator = torch.Generator(device=device).manual_seed(seed)

    embedding_mean, embedding_std = embedded_prompt.mean(), embedded_prompt.std()
    embedding_shape = tuple(embedded_prompt.shape)

    # noise = torch.normal(
    #     mean=embedding_mean.item(),
    #     std=embedding_std.item(),
    #     size=embedding_shape,
    #     device=device,
    #     generator=generator,
    # )
    # test with standard normal distribution
    # noise = torch.normal(
    #     mean=0.0,
    #     std=1.0,
    #     size=embedding_shape,
    #     device=device,
    #     generator=generator,
    # )
    # embedded_prompt.mean(dim=2), embedded_prompt.std(dim=2)
    # noise = torch.normal(
    #     mean=embedded_prompt.mean(dim=2), std=embedded_prompt.std(dim=2)
    # )
    dist = torch.distributions.normal.Normal(
        loc=embedded_prompt.mean(dim=2), scale=embedded_prompt.std(dim=2)
    )

    for i in range(0, num_iterations):
        j = num_iterations - i

        noise_i = (
            dist.sample(sample_shape=torch.Size([768]))
            .permute(1, 0, 2)
            .permute(0, 2, 1)
        )
        noise_j = (
            dist.sample(sample_shape=torch.Size([768]))
            .permute(1, 0, 2)
            .permute(0, 2, 1)
        )
        embedding_e = embedded_prompt + (
            (i * noise_multiplier) * noise_i + (j * noise_multiplier) * noise_j
        ) / (2 * num_iterations)

        image_e = sd.generate_images_from_embeddings(
            seed=seed,
            embedded_prompt=embedding_e,
            null_prompt=null_prompt,
            batch_size=batch_size,
        )

        yield (image_e, embedding_e)

In [23]:
image_generator = generate_images_from_disturbed_embeddings(
    sd, embedded_prompts)

In [None]:
yielded = list(image_generator)

In [25]:
images = [image[0] for image in yielded]
embeddings = [image[1] for image in yielded]
hashs = [calculate_sha256(image[0]) for image in yielded]

In [None]:
show_image_grid(torch.cat(images), nrow=int(math.log(NUM_ITERATIONS, 2)))

In [27]:
pil_images = list(map(to_pil, map(torch.Tensor.squeeze, images)))

In [None]:
from torch import nn
import clip


class AestheticPredictor(nn.Module):
    def __init__(self, input_size):
        super().__init__()
        self.input_size = input_size
        self.layers = nn.Sequential(
            nn.Linear(self.input_size, 1024),
            nn.Dropout(0.2),
            nn.Linear(1024, 128),
            nn.Dropout(0.2),
            nn.Linear(128, 64),
            nn.Dropout(0.1),
            nn.Linear(64, 16),
            nn.Linear(16, 1),
        )

    def forward(self, x):
        return self.layers(x)


chadscorer_path = join("./input/model/aesthetic_scorer", "chadscorer.pth")
device = "cuda" if torch.cuda.is_available() else "cpu"

pt_state = torch.load(chadscorer_path, map_location=torch.device("cpu"))

# CLIP embedding dim is 768 for CLIP ViT L 14
predictor = AestheticPredictor(768)
predictor.load_state_dict(pt_state)
predictor.to(device)
predictor.eval()

clip_model, clip_preprocess = clip.load("ViT-L/14", device=device)


def get_image_features(
    image, device=device, model=clip_model, preprocess=clip_preprocess
):
    image = preprocess(image).unsqueeze(0).to(device)
    with torch.no_grad():
        image_features = model.encode_image(image)
        # l2 normalize
        image_features /= image_features.norm(dim=-1, keepdim=True)
    image_features = image_features.cpu().detach().numpy()
    return image_features


def get_score(image):
    image_features = get_image_features(image)
    score = predictor(torch.from_numpy(image_features).to(device).float())
    return score.item()

In [None]:
images_data = []
manifest = []

for i, image in enumerate(pil_images):
    # hash = hashlib.sha256(image).hexdigest()
    image_tensor = images[i]
    image_name = f"image_{i}.png"
    image_path = os.path.abspath(join(IMAGES_DIR, image_name))
    image_hash = calculate_sha256(image_tensor)
    image_features = get_image_features(image)
    image_score = get_score(image)
    manifest_i = {
        "file-name": image_name,
        "file-hash": image_hash,
        "file-path": image_path,
        "aesthetic-score": image_score,
        "initial-prompt": PROMPT,
    }
    manifest.append(manifest_i)

    json_output_i = manifest_i.copy()
    json_output_i["initial-prompt"] = PROMPT
    json_output_i["embedding-tensor"] = embedding.tolist()
    json_output_i["clip-vector"] = image_features.tolist()
    images_data.append(json_output_i)
    image.save(image_path)
    # images_data.append(
    #     {
    #         "file-name": image_name,
    #         "file-hash": image_hash,
    #         "file-path": image_path,
    #         "aesthetic-score": image_score,
    #         "initial-prompt": PROMPT,
    #         "embedding-tensor": embeddings[i].tolist(),
    #         "clip-vector": image_features.tolist(),
    #     }
    # )

json.dump(images_data, open(join(FEATURES_DIR, "features.json"), "w"), indent=4)
print("Features saved at: ", join(FEATURES_DIR, "features.json"))
json.dump(manifest, open(join(OUTPUT_DIR, "manifest.json"), "w"), indent=4)
print("Manifest saved at: ", join(OUTPUT_DIR, "manifest.json"))