## Install detectron 2

In [1]:
!python -m pip install pyyaml==5.1 > /dev/null 2>&1
import sys, os, distutils.core
# Note: This is a faster way to install detectron2 in Colab, but it does not include all functionalities (e.g. compiled operators).
# See https://detectron2.readthedocs.io/tutorials/install.html for full installation instructions
!git clone 'https://github.com/facebookresearch/detectron2' > /dev/null 2>&1
dist = distutils.core.run_setup("./detectron2/setup.py")
!python -m pip install {' '.join([f"'{x}'" for x in dist.install_requires])} > /dev/null 2>&1
sys.path.insert(0, os.path.abspath('./detectron2'))

# Properly install detectron2. (Please do not install twice in both ways)
# !python -m pip install 'git+https://github.com/facebookresearch/detectron2.git'

## Install GPT and Stable diffusion

In [2]:
!pip install diffusers transformers accelerate > /dev/null 2>&1

In [3]:
import os

import cv2
import detectron2
import numpy as np
import torch

# import some common detectron2 utilities
from detectron2 import model_zoo
from detectron2.config import get_cfg
from detectron2.data import DatasetCatalog, MetadataCatalog
from detectron2.engine import DefaultPredictor
from detectron2.utils.visualizer import Visualizer, _create_text_labels
from diffusers import DiffusionPipeline

from PIL import Image
from sklearn.metrics.pairwise import cosine_similarity
from torch import nn
from transformers import AutoModelForCausalLM, AutoTokenizer, Trainer, TrainingArguments
import pathlib

from skimage.metrics import structural_similarity as ssim
from skimage.metrics import mean_squared_error

# Import the necessary libraries for partitioning the dataset into train and test sets
from sklearn.model_selection import train_test_split


class DetectronGPTDiffusion(nn.Module):
    def __init__(self):
        super(DetectronGPTDiffusion, self).__init__()
        self.device = "cuda" if torch.cuda.is_available() else "cpu"

        # Detectron2 parts
        # Note that the "items" we could detect here are limited to the 80 classes
        # of the COCO dataset, but we can add more classes if we want to.
        self.cfg = get_cfg()
        # add project-specific config (e.g., TensorMask) here if you're not running
        # a model in detectron2's core library
        self.cfg.merge_from_file(
            model_zoo.get_config_file(
                "COCO-InstanceSegmentation/mask_rcnn_R_50_FPN_3x.yaml"
            )
        )
        # Detection threshold for this model
        self.cfg.MODEL.ROI_HEADS.SCORE_THRESH_TEST = 0.5
        # Find a model from detectron2's model zoo.
        self.cfg.MODEL.WEIGHTS = model_zoo.get_checkpoint_url(
            "COCO-InstanceSegmentation/mask_rcnn_R_50_FPN_3x.yaml"
        )
        self.metadata = MetadataCatalog.get(self.cfg.DATASETS.TRAIN[0])
        # This is the main detectron module
        # This has no .parameters() function so we assume it's already frozen
        self.detectron = DefaultPredictor(self.cfg)

        # GPT-J parts
        self.tokenizer = AutoTokenizer.from_pretrained("gpt2-large")
        self.gpt = AutoModelForCausalLM.from_pretrained(
            "gpt2-large", cache_dir=pathlib.Path("cache").resolve()
        ).to(self.device)

        # Diffusion parts
        # This has no .parameters() function so we assume it's already frozen
        self.diffusion_model = DiffusionPipeline.from_pretrained(
            "runwayml/stable-diffusion-v1-5", torch_dtype=torch.float16
        ).to(self.device)

    def partition_image(self, image_dir: str = "input.jpg"):
        im = cv2.imread(image_dir)
        # Round down the image's height and width to the nearest multiple of 8
        # (the largest multiple of 8 that is smaller than the image's height and width)
        im_height = im.shape[0]
        im_width = im.shape[1]
        im_height = im_height - (im_height % 8)
        im_width = im_width - (im_width % 8)
        im = cv2.resize(im, (im_width, im_height))

        outputs = self.detectron(im)
        predictions = outputs["instances"]

        boxes = predictions.pred_boxes if predictions.has("pred_boxes") else None
        scores = predictions.scores if predictions.has("scores") else None
        classes = (
            predictions.pred_classes.tolist()
            if predictions.has("pred_classes")
            else None
        )
        labels = _create_text_labels(classes, self.metadata.get("thing_classes", None))
        keypoints = (
            predictions.pred_keypoints if predictions.has("pred_keypoints") else None
        )

        return im, [boxes, scores, classes, labels, keypoints]

    def gpt_post_process(self, output_sequences, input_prompt):
        predictions = []
        generated_sequences = []

        max_repeat = 2

        # decode prediction
        for generated_sequence_idx, generated_sequence in enumerate(output_sequences):
            generated_sequence = generated_sequence.tolist()
            text = self.tokenizer.decode(
                generated_sequence,
                clean_up_tokenization_spaces=True,
                skip_special_tokens=True,
            )
            # Take out the prompt given to it by us
            text = text[len(input_prompt) :]
            generated_sequences.append(text.strip())

        for i, g in enumerate(generated_sequences):
            res = str(g).replace("\n\n\n", "\n").replace("\n\n", "\n")
            lines = res.split("\n")
            # # print(lines)
            # i = max_repeat
            # while i != len(lines):
            #   remove_count = 0
            #   for index in range(0, max_repeat):
            #     # print(i - index - 1, i - index)
            #     if lines[i - index - 1] == lines[i - index]:
            #       remove_count += 1
            #   if remove_count == max_repeat:
            #     lines.pop(i)
            #     i -= 1
            #   else:
            #     i += 1
            predictions.append("\n".join(lines))

        return predictions

    # temp = min:0, max:3, step:0.01
    # top_p = # min:0, max:1, step:0.01
    # repetition penalty is very critical for GPT-2
    # TODO: Move these gpt generation parameters to the __init__ function
    def gpt_generate(
        self,
        prompt,
        num_sequences=1,
        min_length=128,
        max_length=256,
        temperature=1,
        top_p=0.95,
        top_k=50,
        repetition_penalty=1.0,
    ):

        prompt_full = self.tokenizer(
            prompt, add_special_tokens=False, return_tensors="pt"
        ).to(self.device)
        encoded_prompt = prompt_full.input_ids
        encoded_prompt = encoded_prompt.to(self.device)

        output_sequences = self.gpt.generate(
            input_ids=encoded_prompt,
            max_new_tokens=max_length,
            attention_mask=prompt_full.attention_mask,
            min_length=min_length,
            temperature=float(temperature),
            top_p=float(top_p),
            top_k=int(top_k),
            do_sample=True,
            repetition_penalty=repetition_penalty,
            num_return_sequences=num_sequences,
        )

        return self.gpt_post_process(output_sequences, prompt)

    def generate_image(self, prompt, height=256, width=256):
        max_length = self.diffusion_model.tokenizer.model_max_length
        input_ids = self.diffusion_model.tokenizer(
            prompt, return_tensors="pt"
        ).input_ids
        input_ids = input_ids.to(self.device)

        negative_ids = self.diffusion_model.tokenizer(
            "",
            truncation=False,
            padding="max_length",
            max_length=input_ids.shape[-1],
            return_tensors="pt",
        ).input_ids
        negative_ids = negative_ids.to(self.device)

        concat_embeds = []
        neg_embeds = []
        for i in range(0, input_ids.shape[-1], max_length):
            concat_embeds.append(
                self.diffusion_model.text_encoder(input_ids[:, i : i + max_length])[0]
            )
            neg_embeds.append(
                self.diffusion_model.text_encoder(negative_ids[:, i : i + max_length])[
                    0
                ]
            )

        prompt_embeds = torch.cat(concat_embeds, dim=1)
        negative_prompt_embeds = torch.cat(neg_embeds, dim=1)

        return self.diffusion_model(
            prompt_embeds=prompt_embeds,
            negative_prompt_embeds=negative_prompt_embeds,
            height=height,
            width=width,
        ).images

    def generate_description(self, boxes, labels):
        full_str = ""
        for i in range(len(boxes)):
            box = boxes[i]
            label = labels[i]
            box_array = box.tensor.flatten().tolist()
            # Just the pixel is enough, no floating points
            box_array = [int(x) for x in box_array]
            full_str += f"a {label} at {box_array}, "
        return full_str

    def forward(self, image_dir: str):
        input_image, [
            boxes,
            scores,
            classes,
            labels,
            keypoints,
        ] = self.partition_image(image_dir)
        height = input_image.shape[0]
        width = input_image.shape[1]
        description = self.generate_description(boxes, labels)
        prompt = (
            "We would like to generate a text prompt that will be used to generate an image. In this image, [a, b, c, d] represents the position of the object in the image. In the prompt, describe the objects to be included as well as their positions."  # noqa: E501
            + description
        )
        engineered_prompt = self.gpt_generate(prompt)[0]
        output_image = self.generate_image(engineered_prompt, height, width)[0]

        return {
            "manual_prompt": prompt,
            "output_prompt": engineered_prompt,
            "input_image": input_image,
            "output_image": output_image,
        }


def _create_text_labels(classes, class_names, is_crowd=None):
    """
    Args:
        classes (list[int] or None):
        scores (list[float] or None):
        class_names (list[str] or None):
        is_crowd (list[bool] or None):

    Returns:
        list[str] or None
    """
    labels = None
    if classes is not None:
        if class_names is not None and len(class_names) > 0:
            labels = [class_names[i] for i in classes]
        else:
            labels = [str(i) for i in classes]

    if labels is not None and is_crowd is not None:
        labels = [l + ("|crowd" if crowd else "") for l, crowd in zip(labels, is_crowd)]
    return labels

In [39]:
import locale
locale.getpreferredencoding = lambda: "UTF-8"

time: 4.72 ms (started: 2023-06-04 04:14:47 +00:00)


In [40]:
!pip install torchmetrics

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/
Collecting torchmetrics
  Downloading torchmetrics-0.11.4-py3-none-any.whl (519 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m519.2/519.2 kB[0m [31m8.7 MB/s[0m eta [36m0:00:00[0m
Installing collected packages: torchmetrics
Successfully installed torchmetrics-0.11.4
time: 6.85 s (started: 2023-06-04 04:14:49 +00:00)


In [41]:
import torchvision.transforms as transforms
from torchmetrics.functional import structural_similarity_index_measure as ssim
im_transform = transforms.ToTensor()

def ssim_loss(input_image, output_image):
    # Calculate the score between the input and output images
    input_torch = im_transform(input_image)
    output_torch = im_transform(output_image)
    score = ssim(input_torch, output_torch)
    return 1 - score


# A function which returns the filenames of all the images in a directory
def get_image_filenames(directory):
    # Get all the filenames
    filenames = os.listdir(directory)
    # Filter out the non-image files
    filenames = [f for f in filenames if f.endswith(".jpg")]
    # Add the directory to the filenames
    filenames = [os.path.join(directory, f) for f in filenames]
    return filenames


# Optimize the parameters of the gpt part of the model
def optimize_gpt(
    model: DetectronGPTDiffusion,
    num_steps=10,
    learning_rate=0.01,
):
    # Get the dataset
    dataset = get_image_filenames(
        "/content/drive/Shareddrives/COM SCI 263/Final Project/Data/COCO/val2017"
    )[:num_steps]
    # We use run the model one time for each image in the dataset
    # (Filtered to only n_steps images)
    test, train = train_test_split(dataset, test_size=0.2, random_state=42)

    # Define the optimizer
    optimizer = torch.optim.Adam(
        [
            {
                "params": model.gpt.parameters(),
                "lr": learning_rate,
            },
        ]
    )

    # Optimize the parameters
    for i in range(num_steps):
        # Reset the gradients
        optimizer.zero_grad()

        model_output = model.forward(train[i])

        # Calculate the loss
        loss = ssim_loss(model_output["input_image"], model_output["output_image"])

        # Print the loss
        print(f"Step: {i}, Loss: {loss}")

        # Backpropagate the loss
        loss.backward()

        # Update the parameters
        optimizer.step()

time: 1.71 ms (started: 2023-06-04 04:15:43 +00:00)


In [None]:
del pipeline

NameError: ignored

In [4]:
pipeline = DetectronGPTDiffusion()

`text_config_dict` is provided which will be used to initialize `CLIPTextConfig`. The value `text_config["id2label"]` will be overriden.


In [None]:
from google.colab.patches import cv2_imshow

!wget http://images.cocodataset.org/val2017/000000439715.jpg -q -O 'input.jpg'
im = cv2.imread("./input.jpg")
cv2_imshow(im)

In [25]:
im = cv2.imread("./input.jpg")
type(im)

numpy.ndarray

time: 51.8 ms (started: 2023-06-04 04:02:07 +00:00)


In [30]:
im_transform(im).shape

torch.Size([3, 480, 640])

time: 12.4 ms (started: 2023-06-04 04:05:48 +00:00)


In [5]:
!pip install ipython-autotime > /dev/null 2>&1

Do a single run of the whole model and time it.

In [31]:
%load_ext autotime
out_dict = pipeline.forward("./input.jpg")

Setting `pad_token_id` to `eos_token_id`:50256 for open-end generation.


The autotime extension is already loaded. To reload it, use:
  %reload_ext autotime


  0%|          | 0/50 [00:00<?, ?it/s]

time: 22.2 s (started: 2023-06-04 04:06:49 +00:00)


Show the output image

In [34]:
type(out_dict["output_image"])

PIL.Image.Image

time: 3.74 ms (started: 2023-06-04 04:08:17 +00:00)


In [None]:
im_transform(out_dict["input_image"])

In [None]:
im_transform(out_dict["output_image"])

In [42]:
ssim_loss(out_dict["input_image"], out_dict["output_image"])

time: 139 ms (started: 2023-06-04 04:16:05 +00:00)


Mount google drive

In [6]:
from google.colab import drive 
drive.mount('/content/drive',force_remount=True)

Mounted at /content/drive


Download dataset and unzip it

In [15]:
!wget -P '/content/drive/Shareddrives/COM SCI 263/Final Project/Data/COCO' http://images.cocodataset.org/zips/val2017.zip
!cd '/content/drive/Shareddrives/COM SCI 263/Final Project/Data/COCO' && unzip val2017.zip > /dev/null 2>&1

--2023-06-04 03:30:56--  http://images.cocodataset.org/zips/val2017.zip
Resolving images.cocodataset.org (images.cocodataset.org)... 3.5.25.154, 54.231.235.209, 52.216.41.25, ...
Connecting to images.cocodataset.org (images.cocodataset.org)|3.5.25.154|:80... connected.
HTTP request sent, awaiting response... 200 OK
Length: 815585330 (778M) [application/zip]
Saving to: ‘/content/drive/Shareddrives/COM SCI 263/Final Project/Data/COCO/val2017.zip’


2023-06-04 03:31:12 (47.3 MB/s) - ‘/content/drive/Shareddrives/COM SCI 263/Final Project/Data/COCO/val2017.zip’ saved [815585330/815585330]



Finally, optimize the GPT!

In [37]:
%load_ext autotime
optimize_gpt(pipeline)

The autotime extension is already loaded. To reload it, use:
  %reload_ext autotime


Setting `pad_token_id` to `eos_token_id`:50256 for open-end generation.


  0%|          | 0/50 [00:00<?, ?it/s]

time: 22.3 s (started: 2023-06-04 04:09:09 +00:00)
