# 4.3.1 Stable Diffusionのサンプル


In [None]:
!pip install --upgrade diffusers[torch] transformers



In [None]:
from diffusers import StableDiffusionPipeline, EulerDiscreteScheduler
import torch


model_id = "stabilityai/stable-diffusion-2-1"
pipe = StableDiffusionPipeline.from_pretrained(
    model_id,
    torch_dtype=torch.float16
).to("cuda")

# テキストからの画像生成
prompt = "a photo of an astronaut riding a horse on mars"
image = pipe(prompt, height=768, width=768).images[0]
image

# 4.3.2 CLIPへの干渉

In [None]:
from transformers import CLIPTextModel
from transformers.models.clip.configuration_clip import CLIPTextConfig

from typing import Any, Optional, Tuple, Union
from transformers.modeling_outputs import BaseModelOutputWithPooling
import torch


class FetishTextModel(CLIPTextModel):
  def __init__(self, config: CLIPTextConfig):
    super().__init__(config)

  def forward(
        self,
        input_ids: Optional[torch.Tensor] = None,
        attention_mask: Optional[torch.Tensor] = None,
        position_ids: Optional[torch.Tensor] = None,
        output_attentions: Optional[bool] = None,
        output_hidden_states: Optional[bool] = None,
        return_dict: Optional[bool] = None,
        **kwargs
    ) -> Union[Tuple, BaseModelOutputWithPooling]:
    outputs = super().forward(
        input_ids=input_ids,
        attention_mask=attention_mask,
        position_ids=position_ids,
        output_attentions=output_attentions,
        output_hidden_states=output_hidden_states,
        return_dict=return_dict,
        **kwargs
    )

    # outputs.last_hidden_stateと同じ形状のランダムなテンソルを生成
    random_tensor = torch.rand(
      outputs.last_hidden_state.shape,
      device=outputs.last_hidden_state.device
    )
    random_tensor = random_tensor * 0.4 - 0.2  # 0~1の範囲を0.4倍して-0.2することで、-0.2~0.2の範囲に変換

    modified_last_hidden_state = outputs.last_hidden_state + random_tensor

    # 新しいBaseModelOutputWithPoolingオブジェクトを作成
    modified_outputs = BaseModelOutputWithPooling(
        last_hidden_state=modified_last_hidden_state,
        pooler_output=outputs.pooler_output,
        hidden_states=outputs.hidden_states,
        attentions=outputs.attentions,
    )
    # 新しいオブジェクトを返す
    return modified_outputs

# パイプラインの準備
model_id = "stabilityai/stable-diffusion-2-1"
pipe = StableDiffusionPipeline.from_pretrained(
    model_id,
    scheduler=EulerDiscreteScheduler.from_pretrained(
        model_id,
        subfolder="scheduler"
    ),
    torch_dtype=torch.float16,
    # フェチモデルを指定
    text_encoder=FetishTextModel.from_pretrained(
        model_id,
        subfolder="text_encoder",
        torch_dtype=torch.float16
    ),
).to("cuda")
pipe.enable_attention_slicing()

prompt = "a photo of an astronaut riding a horse on mars"
image = pipe(prompt, height=768, width=768).images[0]
image

# 4.3.3 フェチ評価

In [None]:
!pip install openai

In [None]:
import base64

from openai import OpenAI
from google.colab import userdata
import numpy as np
import io

system_prompt ="""
You are about to work on a simulation that mirrors human development, focusing on the process of acquiring a fetish for illustration as a teenager. Your assignment consists of four main activities centered around exploring and defining your own preferences for illustrations.
Here's what you will do:

1. evaluate the illustrations on a 100-point scale based on your personal fascination vector. The evaluation should be rigorous, and high scores (50 or more) should be modest to truly resonate with your personal preferences, but high scores should be given to items that strongly align with your own preferences.
2. reflect on and discuss your fascination as inferred from the evaluations you have made
3. convert your fascinationes from the discussion into a 16-dimensional vector.
4. export the ratings and fascination vectors to JSON format for processing.


Please find the template below for the output:
　　　　{
  　　　　"evaluate": int,  // Evaluation score ranging from 0 to 100. Note that the evaluation should be rigorous, and high scores (50 or more) should be modest to truly resonate with your personal preferences, but high scores should be given to items that strongly align with your own preferences.
  　　　　"fascination_vector": [float] // A 16-dimensional array representing your fascination
　　　　}
Ensure the output is strictly in JSON format without any additional comments or information.
"""

def generate_fetish_seed(dim=16, device="cuda", dtype=torch.float16):
    return torch.randn((dim), device=device, dtype=dtype)

def encode_image(image):
  # PILの画像データをバイトデータに変換
  buffer = io.BytesIO()
  image.save(buffer, format="PNG")
  img_bytes = buffer.getvalue()

  # バイトデータをbase64エンコード
  img_base64 = base64.b64encode(img_bytes)
  return img_base64.decode("utf-8")


# OpenAI APIのクライアントを作成する.
# Google Colaobratory のシークレットを使ってAPI KEYを指定する
client = OpenAI(
    # google colaboratoryで実行して、シークレット機能を使っている
    api_key=userdata.get('OPENAI_API_KEY'),
)

def evaluate_image(image):
  base64_image = encode_image(image)
  fetish_seed = generate_fetish_seed()
  # チャットの応答を生成する
  response = client.chat.completions.create(
      model="gpt-4-vision-preview",
      messages=[
          {
              "role": "system",
              "content": system_prompt # 長くなるので誌面に別途記載
          },
          {
              "role": "user",
              "content": [
                  {
                    "type": "text",
                    "text": str(fetish_seed.tolist())
                  },
                  {
                    "type": "image_url",
                    "image_url": f"data:image/jpeg;base64,{base64_image}"
                  },
              ],
          }
      ],
      max_tokens=300,
  )

  return response.choices[0].message.content

In [None]:
evaluate_image(image)

# 4.3.4 フェチ学習

In [None]:
!pip install evotorch


In [None]:
from evotorch.logging import Logger
import torch

class MyLogger(Logger):
    def __init__(self, searcher, problem):
        super().__init__(searcher)
        self.problem = problem
        self.networks = []

    def _log(self, status: dict):
        net = self.problem.parameterize_net(status["best"])
        print(net)
        self.networks.append(net)

        torch.save({
            'model_state_dict': net.state_dict(),
            }, "./checkpoint.pt")

In [None]:
import torch
import torch.nn as nn
import torch.nn.functional as F

class FetishModel(nn.Module):
    def __init__(self):
        super().__init__()

        # 1D Convolutional layers
        self.conv1 = nn.Conv1d(in_channels=1, out_channels=16, kernel_size=3, stride=1, padding=1)
        self.conv2 = nn.Conv1d(in_channels=16, out_channels=32, kernel_size=3, stride=1, padding=1)
        self.conv3 = nn.Conv1d(in_channels=32, out_channels=64, kernel_size=3, stride=1, padding=1)
        self.conv4 = nn.Conv1d(in_channels=64, out_channels=128, kernel_size=3, stride=1, padding=1)
        self.conv5 = nn.Conv1d(in_channels=128, out_channels=256, kernel_size=3, stride=1, padding=1)

        # Pooling layer
        self.pool = nn.AdaptiveAvgPool1d(4)

        # Fully connected layers
        # Note: The number of input features of the first linear layer depends on the output of the last convolutional layer
        self.fc1 = nn.Linear(256 * 4, 1024)  # Adjust the input features according to the output of the pool layer

        # Output layer to reshape the final output to (7, 1024)
        # This is a trick to use an additional linear layer to adjust the output size as needed
        self.fc2 = nn.Linear(1024, 7*1024)

    def forward(self, x):
        # Reshape input to (batch_size, channels, length)
        x = x.view(-1, 1, 32)

        # Convolutional layers with ReLU activation
        x = F.relu(self.conv1(x))
        x = F.relu(self.conv2(x))
        x = F.relu(self.conv3(x))
        x = F.relu(self.conv4(x))
        x = F.relu(self.conv5(x))

        # Pooling
        x = self.pool(x)

        # Flatten
        x = x.view(x.size(0), -1)

        # Fully connected layers
        x = F.relu(self.fc1(x))
        x = self.fc2(x)

        # Reshape output to (batch_size, 7, 1024)
        x = x.view(-1, 7, 1024)

        return x


In [None]:
import torch
from evotorch.tools import dtype_of, device_of


def network_eval_func(network: torch.nn.Module):
    # Generate 32 random gaussian vectors
    samples = torch.randn((32), dtype=dtype_of(network), device=device_of(network))
    # Apply the network to the gaussian vectors
    network_out = network(samples)
    fitness = int(input("1~5の評価を入力してください。 "))
    return fitness

In [None]:
from evotorch.neuroevolution import NEProblem

problem = NEProblem(
    objective_sense="max",
    network=FetishModel,
    network_eval_func=network_eval_func,
)

In [None]:
from evotorch.algorithms import PGPE
from evotorch.logging import PandasLogger

searcher = PGPE(
    problem,
    popsize=2,
    radius_init=2.25,
    center_learning_rate=0.2,
    stdev_learning_rate=0.1,
)
logger = MyLogger(searcher, problem)
searcher.run(2)