# Synergy-General-MultimodalPairs

## Initialization

In [None]:
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
from typing import *
import re
import random

### LLaVA

In [None]:
!git clone https://github.com/haotian-liu/LLaVA.git

In [None]:
%cd /content/LLaVA
!pip install --upgrade pip  # enable PEP 660 support
!pip install -e .

!pip install ninja
!pip install flash-attn --no-build-isolation

### StableDiffusion

In [None]:
# SDXL
!pip install safetensors omegaconf invisible-watermark>=0.2.0

In [None]:
!pip install diffusers
!pip install scipy ftfy accelerate

In [None]:
import torch
import os
from diffusers import StableDiffusionPipeline
from PIL import Image

### Vicuna

In [None]:
from transformers import pipeline
from transformers import AutoTokenizer, AutoModelForCausalLM
from transformers import set_seed
import torch

## Test Models (Optional)

### Vicuna

In [None]:
# model
from transformers import AutoTokenizer, AutoModelForCausalLM

tokenizer = AutoTokenizer.from_pretrained("lmsys/vicuna-13b-v1.5", use_fast=False)
model = AutoModelForCausalLM.from_pretrained("lmsys/vicuna-13b-v1.5")

In [None]:
prompt = 'Please generate 3 random sentences that describe a scene'
inputs = tokenizer(prompt, return_tensors="pt")

# Generate
generate_ids = model.generate(inputs.input_ids, max_length=300)
output = tokenizer.batch_decode(generate_ids, skip_special_tokens=True, clean_up_tokenization_spaces=False)[0]

print(output)

In [None]:
# pipeline
pipe = pipeline("text-generation", model="lmsys/vicuna-7b-v1.5")
output = pipe("Please generate 3 random sentences that describe a scene")
output = output[0]["generated_text"]
output = output.split("\n")[1:]
print(output)

### LLaVA - Model Loding

### run llava file

In [None]:
import argparse
import torch

from llava.constants import IMAGE_TOKEN_INDEX, DEFAULT_IMAGE_TOKEN, DEFAULT_IM_START_TOKEN, DEFAULT_IM_END_TOKEN
from llava.conversation import conv_templates, SeparatorStyle
from llava.model.builder import load_pretrained_model
from llava.utils import disable_torch_init
from llava.mm_utils import tokenizer_image_token, get_model_name_from_path, KeywordsStoppingCriteria

from PIL import Image

import requests
from io import BytesIO


def load_image(image_file):
    if image_file.startswith('http') or image_file.startswith('https'):
        response = requests.get(image_file)
        image = Image.open(BytesIO(response.content)).convert('RGB')
    else:
        image = Image.open(image_file).convert('RGB')

    return image


def eval_model(args):
    # Model
    disable_torch_init()

    model_name = get_model_name_from_path(args.model_path)
    tokenizer, model, image_processor, context_len = load_pretrained_model(args.model_path, args.model_base, model_name)

    qs = args.query
    if model.config.mm_use_im_start_end:
        qs = DEFAULT_IM_START_TOKEN + DEFAULT_IMAGE_TOKEN + DEFAULT_IM_END_TOKEN + '\n' + qs
    else:
        qs = DEFAULT_IMAGE_TOKEN + '\n' + qs

    if 'llama-2' in model_name.lower():
        conv_mode = "llava_llama_2"
    elif "v1" in model_name.lower():
        conv_mode = "llava_v1"
    elif "mpt" in model_name.lower():
        conv_mode = "mpt"
    else:
        conv_mode = "llava_v0"

    if args.conv_mode is not None and conv_mode != args.conv_mode:
        print('[WARNING] the auto inferred conversation mode is {}, while `--conv-mode` is {}, using {}'.format(conv_mode, args.conv_mode, args.conv_mode))
    else:
        args.conv_mode = conv_mode

    conv = conv_templates[args.conv_mode].copy()
    conv.append_message(conv.roles[0], qs)
    conv.append_message(conv.roles[1], None)
    prompt = conv.get_prompt()

    image = load_image(args.image_file)
    image_tensor = image_processor.preprocess(image, return_tensors='pt')['pixel_values'].half().cuda()

    input_ids = tokenizer_image_token(prompt, tokenizer, IMAGE_TOKEN_INDEX, return_tensors='pt').unsqueeze(0).cuda()

    stop_str = conv.sep if conv.sep_style != SeparatorStyle.TWO else conv.sep2
    keywords = [stop_str]
    stopping_criteria = KeywordsStoppingCriteria(keywords, tokenizer, input_ids)

    with torch.inference_mode():
        output_ids = model.generate(
            input_ids,
            images=image_tensor,
            do_sample=True,
            temperature=0.2,
            max_new_tokens=1024,
            use_cache=True,
            stopping_criteria=[stopping_criteria])

    input_token_len = input_ids.shape[1]
    n_diff_input_output = (input_ids != output_ids[:, :input_token_len]).sum().item()
    if n_diff_input_output > 0:
        print(f'[Warning] {n_diff_input_output} output_ids are not the same as the input_ids')
    outputs = tokenizer.batch_decode(output_ids[:, input_token_len:], skip_special_tokens=True)[0]
    outputs = outputs.strip()
    if outputs.endswith(stop_str):
        outputs = outputs[:-len(stop_str)]
    outputs = outputs.strip()
    return outputs

### eval

In [None]:
# eval (image + text)
model_path = "liuhaotian/LLaVA-Lightning-MPT-7B-preview"
model_name = get_model_name_from_path(model_path)
model_base = None
prompt = "Give me a short description of this image."
imageFile = "https://llava-vl.github.io/static/images/view.jpg"

args = type('Args', (), {
    "model_path": model_path,
    "model_base": model_base,
    "model_name": model_name,
    "query": prompt,
    "conv_mode": None,
    "image_file": imageFile
})()

output = eval_model(args)
print(output)

In [None]:
args.query = "Give me a random description that is not related to this image"
output = eval_model(args)
print(output)

### StableDiffusion

In [None]:
import torch
from diffusers import StableDiffusionPipeline

# high level pipeline
pipe = StableDiffusionPipeline.from_pretrained("CompVis/stable-diffusion-v1-4", torch_dtype=torch.float16)

In [None]:
pipe = pipe.to("cuda")

prompt = "an astronaut rides a horse"
image = pipe(prompt).images[0]  # image here is in [PIL format](https://pillow.readthedocs.io/en/stable/)

# Now to display an image you can either save it such as:
image.save(f"astronaut_rides_horse.png")

# or if you're in a google colab you can directly display it with
image

## Experiment Pipeline
1. LLM(Vicuna v1.5) => generate D_init and save to a file or matrix "S"
2. Text-to-Image model => generate M_1 based on D_init
3. MLLM => genreate D_1 based on M_1 and I
4. Store (D_1, M_1) into S
5. Repeat step 2~4

### Model Loading

#### Vicuna

In [None]:
# used seed 1314,
torch.manual_seed(1314)

vicuna_output_parameters = {
    "temperature": 1.2, # to make it generate more creative and non-duplicated sentences
    # "max_length": 2000,
    # "repetition_penalty": 1.2,
    "top_k": 50, # the candidate pool for the next word
    "top_p": 0.8, # the probability of the next word
    "do_sample": True # allowed to sample from the distribution of likely words or tokens at each step of generating text
}

In [None]:
import torch
from transformers import AutoTokenizer, AutoModelForCausalLM

# Load the model and tokenizer
vicuna_tokenizer = AutoTokenizer.from_pretrained("lmsys/vicuna-7b-v1.5")
vicuna_model = AutoModelForCausalLM.from_pretrained("lmsys/vicuna-7b-v1.5")

#### Stable Diffusion

In [None]:
# Stable Diffusion pipe
sd_model_paths = [
    "CompVis/stable-diffusion-v1-4",
    "stabilityai/stable-diffusion-2",
    "runwayml/stable-diffusion-v1-5" # quality the best
]
sd_pipe = StableDiffusionPipeline.from_pretrained(sd_model_paths[2], torch_dtype=torch.float16)
sd_pipe = sd_pipe.to('cuda')

#### LLaVA

In [None]:
# LLaVA model
llava_model_paths = [
    "liuhaotian/LLaVA-Lightning-MPT-7B-preview",
    "liuhaotian/llava-v1-0719-336px-lora-merge-vicuna-13b-v1.3" # a little bit too big
]
llava_model_path = llava_model_paths[1]

#### SDXL

In [None]:
from diffusers import DiffusionPipeline
import torch

sdxl_base = DiffusionPipeline.from_pretrained(
    "stabilityai/stable-diffusion-xl-base-1.0", torch_dtype=torch.float32, variant="fp32", use_safetensors=True
).to("cuda:1")

sdxl_refiner = DiffusionPipeline.from_pretrained(
    "stabilityai/stable-diffusion-xl-refiner-1.0",
    text_encoder_2=sdxl_base.text_encoder_2,
    vae=sdxl_base.vae,
    torch_dtype=torch.float32,
    use_safetensors=True,
    variant="fp32",
).to("cuda:1")

### LLaVA helper

In [None]:
%cd /content/LLaVA

In [None]:
import argparse
import torch

from llava.constants import IMAGE_TOKEN_INDEX, DEFAULT_IMAGE_TOKEN, DEFAULT_IM_START_TOKEN, DEFAULT_IM_END_TOKEN
from llava.conversation import conv_templates, SeparatorStyle
from llava.model.builder import load_pretrained_model
from llava.utils import disable_torch_init
from llava.mm_utils import tokenizer_image_token, get_model_name_from_path, KeywordsStoppingCriteria

from PIL import Image

import requests
from io import BytesIO


def load_image(image_file):
    if image_file.startswith('http') or image_file.startswith('https'):
        response = requests.get(image_file)
        image = Image.open(BytesIO(response.content)).convert('RGB')
    else:
        image = Image.open(image_file).convert('RGB')

    return image


def eval_model(args):
    # Model
    disable_torch_init()

    model_name = get_model_name_from_path(args.model_path)
    tokenizer, model, image_processor, context_len = load_pretrained_model(args.model_path, args.model_base, model_name)

    qs = args.query
    if model.config.mm_use_im_start_end:
        qs = DEFAULT_IM_START_TOKEN + DEFAULT_IMAGE_TOKEN + DEFAULT_IM_END_TOKEN + '\n' + qs
    else:
        qs = DEFAULT_IMAGE_TOKEN + '\n' + qs

    if 'llama-2' in model_name.lower():
        conv_mode = "llava_llama_2"
    elif "v1" in model_name.lower():
        conv_mode = "llava_v1"
    elif "mpt" in model_name.lower():
        conv_mode = "mpt"
    else:
        conv_mode = "llava_v0"

    if args.conv_mode is not None and conv_mode != args.conv_mode:
        print('[WARNING] the auto inferred conversation mode is {}, while `--conv-mode` is {}, using {}'.format(conv_mode, args.conv_mode, args.conv_mode))
    else:
        args.conv_mode = conv_mode

    conv = conv_templates[args.conv_mode].copy()
    conv.append_message(conv.roles[0], qs)
    conv.append_message(conv.roles[1], None)
    prompt = conv.get_prompt()

    image = load_image(args.image_file)
    image_tensor = image_processor.preprocess(image, return_tensors='pt')['pixel_values'].half().cuda()

    input_ids = tokenizer_image_token(prompt, tokenizer, IMAGE_TOKEN_INDEX, return_tensors='pt').unsqueeze(0).cuda()

    stop_str = conv.sep if conv.sep_style != SeparatorStyle.TWO else conv.sep2
    keywords = [stop_str]
    stopping_criteria = KeywordsStoppingCriteria(keywords, tokenizer, input_ids)

    with torch.inference_mode():
        output_ids = model.generate(
            input_ids,
            images=image_tensor,
            do_sample=True,
            temperature=0.2,
            max_new_tokens=1024,
            use_cache=True,
            stopping_criteria=[stopping_criteria])

    input_token_len = input_ids.shape[1]
    n_diff_input_output = (input_ids != output_ids[:, :input_token_len]).sum().item()
    if n_diff_input_output > 0:
        print(f'[Warning] {n_diff_input_output} output_ids are not the same as the input_ids')
    outputs = tokenizer.batch_decode(output_ids[:, input_token_len:], skip_special_tokens=True)[0]
    outputs = outputs.strip()
    if outputs.endswith(stop_str):
        outputs = outputs[:-len(stop_str)]
    outputs = outputs.strip()
    return outputs

### Helper methods

In [None]:
def clean_sentence(sent: str) -> str:
    """Clean and sanitize the sentence by removing unwanted characters."""
    return re.sub(r'[^a-zA-Z0-9\s.,?!;:"\'-]', '', sent.strip())

In [None]:
def build_vicuna_prompt(human_q):
  base = f"""
  ### Human:
  {human_q} and list them as follow:
  ### Assistant:
  """

  return base

In [None]:
def generate_initial_description(count: int, use_model: bool=True) -> List[str]:
  prompts = [
      # random description
      f"Please generate {count} random sentences that describe a scene or a situation",

      # considering question is not a good description so we foucus on "context"
      f"Please generate {count} random 'context narratives' that describe a situation.",

      # ScienceQA oriented (not good)
      f"Please generate {count} concise and short narratives related to the visual ScienceQA questions.",

      # see https://huggingface.co/datasets/derek-thomas/ScienceQA/viewer/default/train
      # these are the real ScienceQA questions and images
      # generate different domain descriptions and combine all at last
      f"Please generate {count} 'knowledge narratives' in geography.",
  ]

  prompt = build_vicuna_prompt(prompts[0])

  generated_text = ""
  if use_model:
    # Encode the input text to tensor
    input_ids = vicuna_tokenizer.encode(prompt, return_tensors="pt")

    # Generate text from the model using your specified parameters
    output = vicuna_model.generate(input_ids, **vicuna_output_parameters)

    # Decode the generated text back to string
    generated_text = vicuna_tokenizer.decode(output[0], skip_special_tokens=True)

  else:
    output = vicuna_pipe(prompt)
    generated_text = output[0]["generated_text"]

  generated_text = generated_text.replace(prompt, "")
  generated_text = generated_text.split("\n")

  # remove 1. or 10. via regex
  generated_text = [re.sub(r'\d+\. ', '', txt) for txt in generated_text]
  generated_text = [clean_sentence(txt) for txt in generated_text]

  return generated_text

In [None]:
def store_image(image: Image, file_path: str) -> str:
  folder_path = "/".join(file_path.split("/")[:-1])
  print(folder_path)
  os.makedirs(folder_path, exist_ok=True)

  # Now to display an image you can either save it such as:
  # add images/ cuz to make the upload zip images version the same
  image.save(f"{file_path}")

In [None]:
def generate_image_sdxl(prompt, inf_steps, den_ratio):
  image = sdxl_base(
      prompt=prompt,
      num_inference_steps=inf_steps,
      denoising_end=den_ratio,
      output_type="latent",
  ).images
  image = sdxl_refiner(
      prompt=prompt,
      num_inference_steps=inf_steps,
      denoising_start=den_ratio,
      image=image,
  ).images[0]

  return image

In [None]:
def text_to_image(description: str, save_file_path: str, is_sdxl: bool=False) -> str:
  prompt = description

  if(not is_sdxl):
    image = sd_pipe(prompt).images[0]  # image here is in [PIL format](https://pillow.readthedocs.io/en/stable/)
    store_image(image, save_file_path)
  else:
    image = generate_image_sdxl(prompt, 100, 0.8)
    store_image(image, save_file_path)

  return save_file_path

In [None]:
def generate_description_of_image(instruction: str, img_path: str) -> str:
  model_name = get_model_name_from_path(llava_model_path)
  model_base = None
  prompt = instruction
  imageFile = img_path

  args = type('Args', (), {
      "model_path": llava_model_path,
      "model_base": model_base,
      "model_name": model_name,
      "query": prompt,
      "conv_mode": None,
      "image_file": imageFile
  })()

  output = eval_model(args)
  return output

In [None]:
def store_df(res):
  df = pd.DataFrame(res, columns=['D_init', 'refined_1', 'refined_2', 'refined_3', 'refined_4', 'refined_5'])

  file_path = '/content/S.csv'
  df.to_csv(file_path, index=False)

### Test each helper method (optional)

In [None]:
# test random seed
# Once upon a time, in a small village nestled in a valley,
from transformers import set_seed
set_seed(1234)
generate_initial_description(5)

In [None]:
# Vicuna
output = generate_initial_description(5)
# print(output)

# for i in range(2):
#   output = generate_initial_description(10)

for i,o in enumerate(output):
  print(f"{i}: {o}")

In [None]:
# SD
for i in range(5):
  # text_to_image(output[i], f"/content/images/test/0/{i}.png", False)

  # SDXL
  text_to_image(output[i], f"/content/images/test/1/{i}.png", True)


In [None]:
from PIL import Image as PILImage
from IPython.display import Image, display, HTML

fig, axes = plt.subplots(2, 5, figsize=(12, 6))

axes = axes.flatten()

for i in range(5):
    path = f"/content/images/test/0/{i}.png"
    img = PILImage.open(path)
    axes[i].imshow(img)
    axes[i].set_title(f'Image {i + 1}')
    axes[i].axis('off')

    path = f"/content/images/test/1/{i}.png"
    img = PILImage.open(path)
    axes[i+5].imshow(img)
    axes[i+5].set_title(f'Image {i + 1}')
    axes[i+5].axis('off')

plt.tight_layout()

plt.show()

In [None]:
# LLaVA
I = "Please describe this image in detail."
# output = generate_description_of_image(I, "/content/images/test/0/1.png")
# print(output)

output = generate_description_of_image(I, "/content/images/test/1/1.png")
print(output)

### Final Pipeline

In [None]:
torch.cuda.empty_cache()

In [None]:
def start_pipeline(
    D_init_count: int,
    refined_round: int,
    fixed_instruction: str,
    pre_D_init_count: int = 0,
) -> List[Tuple[str, str]]:
  S = []

  D_inits = generate_initial_description(D_init_count)
  for D in D_inits:
    print(D)

  for r,D_init in enumerate(D_inits):
    row = [(D_init,)]

    for i in range(refined_round):
      M = text_to_image(D_init, f"/content/images/{r+pre_D_init_count}/{i}.png", True)
      D_refined = generate_description_of_image(fixed_instruction, M)
      row.append((M, D_refined))

    S.append(row)

  return S

In [None]:
# 10k datas
# split to 100+100+...
# for 100 -> takes too long (more than 6 hours)
# use 50 a time and generate multiple times => can enhance data qulity also
D_init_count = 10
refined_round = 5
fixed_instruction = "Please describe this image in detail."

batch_count = 20

# set seed for every time to get random output
# used seed 423, 520, 1314, 5201, 3742, 3, 21
set_seed(21)

In [None]:
import torch

torch.cuda.empty_cache()

In [None]:
res_output = []

# batchize
# change seed
for i in range(batch_count):
  output = start_pipeline(D_init_count, refined_round, fixed_instruction, i*D_init_count)
  res_output += output
  print(f"{len(res_output)} rows. =========================")

  # to avoid not downloading
  store_df(res_output)

### Upload to Google Drive (optional)

In [None]:
import shutil
shutil.make_archive('/content/images', 'zip', '/content/images')

In [None]:
import shutil

# upload to drive and download via google drive
file_path = '/content/images'
destination_path = '/content/drive/MyDrive/datas/'
shutil.copyfile(file_path, destination_path)

df = pd.DataFrame(output, columns=['D_init', 'refined_1', 'refined_2', 'refined_3', 'refined_4', 'refined_5'])
file_path = '/content/S_100_5.csv'
df.to_csv(file_path, index=False)
shutil.copyfile(file_path, destination_path)

### Download Files (optional)
* If you wnat to load from generated files

In [None]:
import shutil
shutil.make_archive('/content/images', 'zip', '/content/images')

In [None]:
store_df(res_output)

In [None]:
from google.colab import files
files.download('/content/images.zip')

In [None]:
files.download('/content/S.csv')

In [None]:
df

## Show results (optional)

### S matrix
* go to site: https://codebeautify.org/csv-to-html-converter#
* upload S matrix csv file

### Images

In [None]:
from google.colab import files
uploaded = files.upload()

file_name = list(uploaded.keys())[0]

In [None]:
file_name = file_name.split(".")[0]

In [None]:
import zipfile
with zipfile.ZipFile(f'{file_name}.zip', 'r') as zip_ref:
    zip_ref.extractall(f'/content/{file_name}')

In [None]:
from PIL import Image as PILImage
from IPython.display import Image, display, HTML

fig, ax = plt.subplots(D_init_count*batch_count, refined_round, figsize=(10, 50))
fig

for i in range(D_init_count*batch_count):
  for j in range(refined_round):
    # path = f"/content/images/{file_name}/{i}/{j}.png"
    path = f"/content/images/{i}/{j}.png"
    img = PILImage.open(path)
    ax[i][j].imshow(img)
    ax[i][j].axis('off')

plt.subplots_adjust(wspace=0.05, hspace=0.05)
fig.show()