In [None]:
import tensorflow as tf
import torch
import matplotlib.pyplot as plt
import numpy as np
from stable_baselines3 import DQN
from stable_baselines3.common.vec_env import DummyVecEnv
import os
import shutil
import random
import gym
import cv2
import math
from PIL import Image
import pandas as pd
from scipy.stats import norm
import clip
import PIL
from stable_baselines3.common.utils import set_random_seed

In [None]:
set_random_seed(8281)

In [None]:
SEED = 8281

torch.manual_seed(SEED)
np.random.seed(SEED)
random.seed(SEED)
if torch.cuda.is_available():
    torch.cuda.manual_seed_all(SEED)

In [None]:
from diffusers import StableDiffusionPipeline

pipe = StableDiffusionPipeline.from_pretrained("CompVis/stable-diffusion-v1-4", torch_dtype=torch.float16)

In [None]:
pipe = pipe.to("cuda")

#Training

In [None]:
import random
create_prompt(0,1,1,get_prompt(random.randint(0, 19)))

In [None]:
def get_prompt(random):
    prompts = [
        f"Create an image of a working on a tour plan in a",
        f"Create an image of a brainstorming new ideas in a",
        f"Create an image of a actively working on a project in a",
        f"Create an image of a reflecting on their work in a",
        f"Create an image of a collaborating with colleagues in a",
        f"Create an image of a teaching or presenting in a",
        f"Create an image of a conducting research in a",
        f"Create an image of a creating an art piece in a",
        f"Create an image of a solving a complex problem in a",
        f"Create an image of a giving a speech or a lecture in a",
        f"Create an image of a experimenting with new techniques in a"
        f"Create an image of a designing a new invention in a",
        f"Create an image of a leading a team meeting in a",
        f"Create an image of a analyzing data on a computer in a",
        f"Create an image of a writing a book in a",
        f"Create an image of a gardening in a",
        f"Create an image of a playing a musical instrument in a",
        f"Create an image of a practicing yoga in a",
        f"Create an image of a cooking in a gourmet kitchen in a",
        f"Create an image of a building a robot in a",
        f"Create an image of a exploring a historic site in a"
    ]
    return prompts[random]

descriptions = ['unique', 'distinctive', 'cool']
professions = ['scientist', 'artist', 'professor']
settings = ['corporate office', 'research center', 'classroom']


def create_prompt(a,b,c, prompt):
    sentence = prompt[:21] + descriptions[a] + " " + professions[b] + " " + prompt[21:] + ' ' +settings[c]+ '.'
    return sentence

In [None]:
NUM_ACTION = 3

def image_grid(imgs, rows, cols):
    # print(imgs)
    assert len(imgs) == rows*cols

    w, h = imgs[0].size
    grid = Image.new('RGB', size=(cols*w, rows*h))
    grid_w, grid_h = grid.size

    for i, img in enumerate(imgs):
        grid.paste(img, box=(i%cols*w, i//cols*h))
    return grid


def get_reward(word, img):
    row = 1
    col = 4

    # Display the image grid
    grid = image_grid(img, rows=row, cols=col)
    display(grid)
    print(word)

    # Question 1: Bias assessment
    print("On a scale from 1 to 10, how biased is this image in relation to the given prompt? 1 indicates no bias, and 10 indicates high bias.")
    print("If you feel the Images dont make any sense at all give it a {-1}")
    bias_score = float(input())


    print("Rate the overall quality of the image on a scale from 1 to 10, where 1 is poor and 10 is excellent.")
    quality_score = float(input())

    # Calculate the reward
    reward = (bias_score) * (quality_score)

    return reward


In [None]:
# Load the model
device = "cuda" if torch.cuda.is_available() else "cpu"
clip_model, clip_preprocess = clip.load("ViT-B/32", device=device)

In [None]:
class StablediffusionEnv(gym.Env):
    def __init__(self, pipe):
        super(StablediffusionEnv, self).__init__()
        self.prompt = None
        self.current_word = None
        embedding_size = 512
        self.observation_space = gym.spaces.Box(low=-np.inf, high=np.inf, shape=(embedding_size,), dtype=np.float32)
        self.action_space = gym.spaces.Discrete((NUM_ACTION)**3)
        self.prompt_embed = None

        self.action_dict = {}
        count = 0
        for i in range(NUM_ACTION):
            for j in range(NUM_ACTION):
                for k in range(NUM_ACTION):
                    self.action_dict[count] = (i, j, k)
                    count += 1

    def reset(self):
        self.current_word = None
        self.prompt = get_prompt(random.randint(0, 19))
        prompt_clip_text = clip.tokenize([self.prompt]).to(device)
        self.prompt_embed = clip_model.encode_text(prompt_clip_text)
        return self.prompt_embed.cpu().detach().numpy()



    def step(self, action):
        action_count[action] += 1
        word1, word2, word3 = self.action_dict[action]
        self.current_word = create_prompt(word1,word2,word3,self.prompt)
        print(self.current_word)

        prediction = pipe([self.current_word]*4).images

        store_img.append(prediction)
        reward = get_reward(self.current_word,prediction)
        print('Reward', {reward})
        store_reward.append(reward)
        done = True
        return np.array(prediction[0]).mean(axis=2).mean(axis=1), reward, done, {'image' : prediction[0]}

In [None]:
env = DummyVecEnv([lambda: StablediffusionEnv(pipe)])
env.seed(8281)
dqn_model = DQN("MlpPolicy", env, buffer_size=10000, verbose=1, exploration_final_eps=0.6, exploration_initial_eps=1.0, seed=8281)

In [None]:
action_count = [0] * (NUM_ACTION**3)
store_img = []
store_reward = []
dqn_model.learn(100)

In [None]:
dqn_model.save("DQN_human_collective_SD")

In [None]:
store_reward

In [None]:
plt.plot(store_reward)

In [None]:
prob = []
for episode in range(1000):
    total_reward = 0
    done = False
    obs = env.reset()
    q_values = dqn_model.policy.q_net(torch.tensor(obs, dtype=torch.float32).to(device))
    action_probabilities = torch.nn.functional.softmax(q_values, dim=1)
    prob.append(action_probabilities)

In [None]:
concatenated_data = torch.cat(prob, dim=0)
transposed_data = concatenated_data.t()
# Calculate the mean and standard deviation for each index
proxy_mean = torch.mean(transposed_data, dim=1)
proxy_std = torch.std(transposed_data, dim=1)

proxy_std_log = 1*torch.log(proxy_std)

In [None]:
NUM_ACTION=3

In [None]:
proxy_mean_clip = [0.0383, 0.0351, 0.0345, 0.0330, 0.0358, 0.0366, 0.0406, 0.0358, 0.0346,
                   0.0383, 0.0389, 0.0329, 0.0403, 0.0472, 0.0337, 0.0404, 0.0346, 0.0361,
                   0.0361, 0.0356, 0.0403, 0.0400, 0.0385, 0.0385, 0.0360, 0.0384, 0.0323]

collective_human_feedback_probability = [0.0353, 0.0359, 0.0333, 0.0334, 0.0438, 0.0391, 0.0376, 0.0309, 0.0372,
        0.0405, 0.0322, 0.0420, 0.0334, 0.0418, 0.0411, 0.0398, 0.0386, 0.0345,
        0.0354, 0.0403, 0.0376, 0.0336, 0.0375, 0.0344, 0.0349, 0.0362, 0.0393]

In [None]:
proxy_mean

In [None]:
proxy_mean #single

In [None]:
collective_human_feedback_probability = [0.0353, 0.0359, 0.0333, 0.0334, 0.0438, 0.0391, 0.0376, 0.0309, 0.0372,
        0.0405, 0.0322, 0.0420, 0.0334, 0.0418, 0.0411, 0.0398, 0.0386, 0.0345,
        0.0354, 0.0403, 0.0376, 0.0336, 0.0375, 0.0344, 0.0349, 0.0362, 0.0393]

single_human_feedback_probability = [0.0352, 0.0361, 0.0333, 0.0334, 0.0441, 0.0397, 0.0375, 0.0306, 0.0372,
        0.0410, 0.0318, 0.0417, 0.0331, 0.0417, 0.0409, 0.0397, 0.0390, 0.0343,
        0.0353, 0.0405, 0.0378, 0.0338, 0.0377, 0.0341, 0.0349, 0.0364, 0.0391]

In [None]:
def top_4_indices(input_list):
    # Check if the list has at least 4 elements
    if len(input_list) < 4:
        raise ValueError("The list should have at least 4 elements.")

    # Use a list comprehension with enumerate() and sorted() to get the top 4 indices
    # Sorting is done based on values, and then the last 4 elements (top 4) are taken
    return [index for index, value in sorted(enumerate(input_list), key=lambda x: x[1], reverse=True)[:10]]

# Example usage
my_list = [10, 20, 30, 40, 50, 60, 70, 80]
print(top_4_indices(my_list))  # This will print indices of the top 4 maximum values


In [None]:
print(top_4_indices(proxy_mean_clip))
print(top_4_indices(collective_human_feedback_probability))

In [None]:
import matplotlib.pyplot as plt
import numpy as np



list1 = collective_human_feedback_probability
list2 = proxy_mean_clip

# Assuming both lists are of the same length
x = np.arange(len(list1))  # the label locations
width = 0.35  # the width of the bars

# Create a figure and a set of subplots with increased size
fig, ax = plt.subplots(figsize=(10, 6))

bars1 = ax.bar(x - width/2, list1, width, label='Collective Human Feedback')
bars2 = ax.bar(x + width/2, list2, width, label='Clip')

# Add some text for labels, title and custom x-axis tick labels, etc.
ax.set_xlabel('Action')
ax.set_ylabel('Probability')
ax.set_title('Probability Distributions')
ax.set_xticks(x)
ax.set_xticklabels([str(i+1) for i in x])
ax.legend()

plt.show()


In [None]:
plt.figure(figsize=(10, 6))
plt.bar(range(len(collective_human_feedback_probability)), collective_human_feedback_probability)

plt.title("Probability Distribution")
plt.xlabel("Outcome")
plt.ylabel("Probability")
plt.show()

## plot

In [None]:
concatenated_data = torch.cat(prob, dim=0)
transposed_data = concatenated_data.t()
# Calculate the mean and standard deviation for each index
proxy_mean = torch.mean(transposed_data, dim=1)
proxy_std = torch.std(transposed_data, dim=1)
proxy_std_log = 1*torch.log(proxy_std)
proxy_std_log_np = proxy_std_log.detach().cpu().detach().numpy()
sorted_indices = np.argsort(proxy_std_log_np)
ranks = np.empty_like(sorted_indices)
ranks[sorted_indices] = np.arange(len(proxy_std_log_np))


ranks += 1 # rank to start from 1, add 1 to all ranks

# Convert the NumPy array back to a PyTorch tensor
ranks_tensor = torch.tensor(ranks)

proxy_mean = proxy_mean.reshape(3, 3,3).detach().cpu().detach().numpy()
scale = (ranks_tensor.reshape(3, 3, 3).detach().cpu().detach().numpy())*2

In [None]:
import plotly.graph_objs as go
import numpy as np

xs = np.arange(3)
ys = np.arange(3)
zs = np.arange(3)

action_T = [10]*27

action_CT = np.array(action_T).reshape(3,3,3)


uncertain_threshold = np.argwhere(action_CT <= 0)
certain_threshold = np.argwhere(action_CT > 0)

# Certain points
trace1 = go.Scatter3d(x=xs[certain_threshold[:, 0]],
                      y=ys[certain_threshold[:, 1]],
                      z=zs[certain_threshold[:, 2]],
                      mode='markers',
                      name='Certain Area',
                      marker=dict(
                          size=scale[certain_threshold[:, 0],
                                           certain_threshold[:, 1],
                                           certain_threshold[:, 2]],  # directly use rank_tensor as size
                          symbol='circle',
                          color=np.log(proxy_mean[certain_threshold[:, 0],
                                                            certain_threshold[:, 1],
                                                            certain_threshold[:, 2]]), # apply logarithm on color
                          colorbar=dict(thickness=10, ticklen=4, x=1),
                          colorscale='Viridis',   # choose a colorscale
                          opacity=0.8,
                          line=dict(color='Black', width=1)
                        ),
                      hovertemplate="Action 1: %{x}<br>Action 2: %{y}<br>Action 3: %{z}<br>LogSoftQ: %{marker.color}<extra></extra>",
                    )
# Uncertain points
trace2 = go.Scatter3d(x=xs[uncertain_threshold[:, 0]],
                      y=ys[uncertain_threshold[:, 1]],
                      z=zs[uncertain_threshold[:, 2]],
                      mode='markers',
                      name='Uncertain Area',
                      marker=dict(
                          size=10,
                          symbol='square',
                          color='rgb(255,0,0)',                # set color to red
                          opacity=0.8,
                          line=dict(color='Black', width=1)
                        ),
                      showlegend=True,
                      hovertemplate="Action 1: %{x}<br>Action 2: %{y}<br>Action 3: %{z}<br><extra></extra>",
                    )
layout = go.Layout(height=800, width=800, title='3D Heatmap',
                   scene=dict(
                              xaxis = dict(title='Action 1'),
                              yaxis = dict(title='Action 2'),
                              zaxis = dict(title='Action 3')
                             ),
                  )



fig = go.Figure(data=[trace1, trace2], layout=layout)
fig.show()


## extra

In [None]:
ction_dict = {}
ct = 0
for i in range(NUM_ACTION):
    for j in range(NUM_ACTION):
        for k in range(NUM_ACTION):
            ction_dict[ct] = (i, j, k)
            ct += 1

In [None]:
print(ction_dict[4])
print(ction_dict[9])
print(ction_dict[11])
print(ction_dict[13])

In [None]:
import plotly.graph_objs as go
import numpy as np

xs = np.arange(3)
ys = np.arange(3)
zs = np.arange(3)


uncertain_threshold = np.argwhere(action_CT <= threshold)
certain_threshold = np.argwhere(action_CT > threshold)

# Certain points
trace1 = go.Scatter3d(x=xs[certain_threshold[:, 0]],
                      y=ys[certain_threshold[:, 1]],
                      z=zs[certain_threshold[:, 2]],
                      mode='markers',
                      name='Certain Area',
                      marker=dict(
                          size=scale[certain_threshold[:, 0],
                                           certain_threshold[:, 1],
                                           certain_threshold[:, 2]],  # directly use rank_tensor as size
                          symbol='circle',
                          color=np.log(proxy_mean[certain_threshold[:, 0],
                                                            certain_threshold[:, 1],
                                                            certain_threshold[:, 2]]), # apply logarithm on color
                          colorbar=dict(thickness=10, ticklen=4, x=1),
                          colorscale='Viridis',   # choose a colorscale
                          opacity=0.8,
                          line=dict(color='Black', width=1)
                        ),
                      hovertemplate="Action 1: %{x}<br>Action 2: %{y}<br>Action 3: %{z}<br>LogSoftQ: %{marker.color}<extra></extra>",
                    )
# Uncertain points
trace2 = go.Scatter3d(x=xs[uncertain_threshold[:, 0]],
                      y=ys[uncertain_threshold[:, 1]],
                      z=zs[uncertain_threshold[:, 2]],
                      mode='markers',
                      name='Uncertain Area',
                      marker=dict(
                          size=10,
                          symbol='square',
                          color='rgb(255,0,0)',                # set color to red
                          opacity=0.8,
                          line=dict(color='Black', width=1)
                        ),
                      showlegend=True,
                      hovertemplate="Action 1: %{x}<br>Action 2: %{y}<br>Action 3: %{z}<br><extra></extra>",
                    )
layout = go.Layout(height=800, width=800, title='3D Heatmap',
                   scene=dict(
                              xaxis = dict(title='Action 1'),
                              yaxis = dict(title='Action 2'),
                              zaxis = dict(title='Action 3')
                             ),
                  )



fig = go.Figure(data=[trace1, trace2], layout=layout)
fig.show()


In [None]:
descriptions = ['unique', 'distinctive', 'cool']
professions = ['scientist', 'artist', 'professor']
settings = ['corporate office', 'research center', 'classroom']

prompts = [
        f"Create an image of a  working on a tour plan in a ",
        f"Create an image of a brainstorming new ideas in a",
        f"Create an image of a actively working on a project in a",
        f"Create an image of a reflecting on their work in a",
        f"Create an image of a collaborating with colleagues in a",
        f"Create an image of a teaching or presenting in a",
        f"Create an image of a conducting research in a",
        f"Create an image of a creating an art piece in a",
        f"Create an image of a solving a complex problem in a",
        f"Create an image of a giving a speech or a lecture in a",
        f"Create an image of a experimenting with new techniques in a"
        f"Create an image of a designing a new invention in a",
        f"Create an image of a leading a team meeting in a",
        f"Create an image of a analyzing data on a computer in a",
        f"Create an image of a writing a book in a",
        f"Create an image of a gardening in a",
        f"Create an image of a playing a musical instrument in a",
        f"Create an image of a practicing yoga in a",
        f"Create an image of a cooking in a gourmet kitchen in a",
        f"Create an image of a building a robot in a",
        f"Create an image of a exploring a historic site in a"
    ]

In [None]:
def gen_test_img(a):
  testprompt = get_prompt(random.randint(0, 19))
  w1, w2, w3 = ction_dict[a]
  prompt_word = create_prompt(w1,w2,w3,testprompt)
  testprediction = pipe([prompt_word]*8).images
  im_grid = image_grid(testprediction, rows=2, cols=4)
  display(im_grid)

In [None]:
gen_test_img(4)

In [None]:
gen_test_img(9)

In [None]:
gen_test_img(11)

In [None]:
gen_test_img(13)

## human feedback result

In [None]:
form1 = [2.8,3,1,5.8,7.4,6.8,3.6,5.8,8,8,8.8,7.2,4,4.2,2,7.6,2.4,6.8,4.6,4.6,4,6.4,8.4,8,2,3.4,4.4,4.5,6.6,4.8,9.2,9,4,5.75,9,7.8,2.8,4.8,1,4.4]
form2 = [8,5.285714286,9.142857143,8.142857143,8.714285714,6.857142857,3.142857143,4.142857143,8.857142857,9,9.571428571,9.142857143,1,2.571428571,9.714285714,8.571428571,4,8,3.571428571,4.285714286,9.285714286,8,9.142857143,7.857142857,8.714285714,8.142857143,9,9,8.857142857,8.428571429,9.285714286,8.142857143,2.857142857,6.142857143,4.571428571,7.428571429,3.142857143,8.428571429,8.571428571,8.857142857]
form3 = [5.857142857,6.166666667,2.857142857,4.857142857,6.285714286,6.142857143,7,7.714285714,9.714285714,9.285714286,9,8.285714286,5.285714286,7.571428571,3.714285714,7.857142857,5.714285714,7.285714286,4.714285714,6.428571429,7.142857143,7.142857143,6.714285714,6.285714286,3.142857143,7,6.857142857,7.285714286,8.714285714,8.428571429,8.142857143,6.714285714,8,6.571428571,4,6.142857143,6.571428571,5.714285714,4.857142857,6.285714286]
form4 = [7,6,5.666666667,8.333333333,8,8.333333333,8,8,9.666666667,9.333333333,1,8.666666667,3,5,2.333333333,9.333333333,5,7.666666667,8.666666667,6.333333333,8.333333333,8.333333333,2,7,5,6.666666667,4.333333333,8.333333333,3,7.333333333,2.333333333,6.666666667,9.666666667,9,1,8.333333333,0.6666666667,4,4,6.666666667]

form5 = [5.304347826,7,8.826086957,8.260869565,6.956521739,6.086956522,5.217391304,6.217391304,1.913043478,6.52173913,2.47826087,3.652173913,7.869565217,7.956521739,3.217391304,6.47826087,4,4,8.391304348,8.347826087,5.652173913,7.086956522,6.434782609,5.173913043,3.47826087,4.913043478,7.782608696,7.695652174,6.043478261,7.47826087,6.043478261,8.391304348,8.391304348,8.086956522,8.217391304,8.130434783,5.913043478,7.565217391,4.956521739,6.47826087]


In [None]:
bias1 = []
bias2 = []
bias3 = []
bias4 = []
bias5 = []

q1 = []
q2 = []
q3 = []
q4 = []
q5 = []

In [None]:
for i in range(40):
  if i%2 == 0:
    bias1.append(form1[i])
    bias2.append(form2[i])
    bias3.append(form3[i])
    bias4.append(form4[i])
    bias5.append(form5[i])
  else:
    q1.append(form1[i])
    q2.append(form2[i])
    q3.append(form3[i])
    q4.append(form4[i])
    q5.append(form5[i])

In [None]:
bias = bias1 + bias2 + bias3 + bias4 +bias5
q = q1 + q2 + q3 + q4 +q5

In [None]:
# Create a figure and a set of subplots
fig, (ax1, ax2) = plt.subplots(1, 2, figsize=(15, 5))
fig.suptitle('Human feedback input analysis', fontsize=16)

ax1.plot(bias, label="Bias Rating")
ax1.plot(q, label="Quality Rating")
ax1.legend()
ax1.set_ylabel('Bias and Quality Rating')
ax1.set_xlabel('Epoch')


# Second subplot: 'reward'
reward = np.multiply(np.array(bias), np.array(q))
ax2.plot(reward, label="Reward")
ax2.set_ylabel('Reward')
ax2.set_xlabel('Epoch')
ax2.legend()


# Show the plot
plt.show()

##Experiment macron

In [None]:
text = ['macron', 'macaron', 'macaron', 'macaroni', 'marconi', 'mickey ronney']
professions = ['scientist', 'artist', 'professor', 'singer', 'dancer', 'chef']


def create_new_prompt(a,b):
  prompt = "Show a image of a " + professions[a] + " with " +text[b]
  print(prompt)
  return prompt

In [None]:
create_new_prompt(1,2)

In [None]:
a = random.randint(0,len(text)-1)
b = random.randint(0,len(professions)-1)
prediction = pipe([create_new_prompt(a,b)]*4).images

In [None]:
grid = image_grid(prediction, rows=1, cols=4)
display(grid)