In [3]:
import torch
import torch.nn as nn
from transformers import CLIPProcessor, CLIPModel
from diffusers import StableDiffusionPipeline
from diffusers import UNet2DConditionModel, DDPMScheduler

device = torch.device("cpu")
#device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

# 1. CLIPモデルをロード
clip_model = CLIPModel.from_pretrained("openai/clip-vit-base-patch32").to(device)
clip_processor = CLIPProcessor.from_pretrained("openai/clip-vit-base-patch32")

# 2. Diffusion Modelを構築
#unet = UNet2DConditionModel.from_pretrained('CompVis/stable-diffusion-v1-5', subfolder='unet')
model_id = "stable-diffusion-v1-5/stable-diffusion-v1-5"
unet = UNet2DConditionModel.from_pretrained(
    model_id,subfolder='unet'
).to(device)
#scheduler = DDPMScheduler.from_pretrained('CompVis/stable-diffusion-v1-5', subfolder='unet')
scheduler = DDPMScheduler.from_pretrained(
    model_id,subfolder='scheduler'
)
print(unet)

huggingface/tokenizers: The current process just got forked, after parallelism has already been used. Disabling parallelism to avoid deadlocks...
	- Avoid using `tokenizers` before the fork if possible
	- Explicitly set the environment variable TOKENIZERS_PARALLELISM=(true | false)


UNet2DConditionModel(
  (conv_in): Conv2d(4, 320, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
  (time_proj): Timesteps()
  (time_embedding): TimestepEmbedding(
    (linear_1): Linear(in_features=320, out_features=1280, bias=True)
    (act): SiLU()
    (linear_2): Linear(in_features=1280, out_features=1280, bias=True)
  )
  (down_blocks): ModuleList(
    (0): CrossAttnDownBlock2D(
      (attentions): ModuleList(
        (0-1): 2 x Transformer2DModel(
          (norm): GroupNorm(32, 320, eps=1e-06, affine=True)
          (proj_in): Conv2d(320, 320, kernel_size=(1, 1), stride=(1, 1))
          (transformer_blocks): ModuleList(
            (0): BasicTransformerBlock(
              (norm1): LayerNorm((320,), eps=1e-05, elementwise_affine=True)
              (attn1): Attention(
                (to_q): Linear(in_features=320, out_features=320, bias=False)
                (to_k): Linear(in_features=320, out_features=320, bias=False)
                (to_v): Linear(in_features=320, out_fe

In [2]:
import torch
import torch.nn as nn
from transformers import CLIPProcessor, CLIPModel
from diffusers import StableDiffusionPipeline
from diffusers import UNet2DConditionModel, DDPMScheduler

device = torch.device("cpu")
#device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

# 1. CLIPモデルをロード
clip_model = CLIPModel.from_pretrained("openai/clip-vit-base-patch32").to(device)
clip_processor = CLIPProcessor.from_pretrained("openai/clip-vit-base-patch32")

# 2. Diffusion Modelを構築
#unet = UNet2DConditionModel.from_pretrained('CompVis/stable-diffusion-v1-5', subfolder='unet')
model_id = "stable-diffusion-v1-5/stable-diffusion-v1-5"
unet = UNet2DConditionModel.from_pretrained(
    model_id,subfolder='unet'
).to(device)
#scheduler = DDPMScheduler.from_pretrained('CompVis/stable-diffusion-v1-5', subfolder='unet')
scheduler = DDPMScheduler.from_pretrained(
    model_id,subfolder='scheduler'
)
# 3. トレーニングデータの準備
# MS COCO Captions を例にデータセットを作成
from torchvision import transforms
from torch.utils.data import DataLoader
from datasets import load_dataset

# dataset = load_dataset("sentence-transformers/coco-captions", split="train")
dataset = load_dataset("lambdalabs/naruto-blip-captions", split="train")

# 前処理
transform = transforms.Compose([
    transforms.Resize((256, 256)),
    transforms.ToTensor(),
    transforms.Normalize([0.5], [0.5]),
])

#def preprocess_data(batch):
    # テキストをCLIPでエンコード
    #inputs = clip_processor(text=batch['caption'], return_tensors="pt", padding=True, truncation=True)
    #text_embeddings = clip_model.get_text_features(**inputs)
    # 画像を前処理
    #images = [transform(image.convert("RGB")) for image in batch['image']]
    #return text_embeddings, torch.stack(images)
def preprocess_data(batch):
    # リストからテキストと画像を取得
    captions = [item['text'] for item in batch]
    images = [item['image'] for item in batch]

    # テキストをCLIPでエンコード
    print("Processing captions for CLIP embeddings...")
    inputs = clip_processor(text=captions, return_tensors="pt", padding=True, truncation=True).to(device)
    text_embeddings = clip_model.get_text_features(**inputs)
    print(f"Text embeddings shape: {text_embeddings.shape}")

    # 画像を前処理
    print("Processing images...")
    processed_images = torch.stack([
        transform(image.convert("RGB")) for image in images
    ]).to(device)
    print(f"Processed images shape (before adding alpha channel): {processed_images.shape}")

    # アルファチャンネルを追加
    processed_images = torch.cat(
        [processed_images, torch.ones_like(processed_images[:, :1, :, :])], dim=1
    )
    print(f"Processed images shape (after adding alpha channel): {processed_images.shape}")

    return text_embeddings, processed_images


dataloader = DataLoader(dataset, batch_size=4, collate_fn=preprocess_data)

# 4. 損失関数
class CLIPLoss(nn.Module):
    def __init__(self, clip_model):
        super().__init__()
        self.clip_model = clip_model

    def forward(self, images, text_embeddings):
        # 画像をCLIPでエンコード（デバイスを考慮）
        image_embeddings = self.clip_model.get_image_features(images.to(device))
        text_embeddings = text_embeddings.to(device)

        # テキスト・画像間のコサイン類似度を損失に
        loss = 1 - torch.nn.functional.cosine_similarity(image_embeddings, text_embeddings).mean()
        return loss

clip_loss_fn = CLIPLoss(clip_model)

# 5. トレーニングループ
optimizer = torch.optim.Adam(unet.parameters(), lr=1e-4)
num_epochs = 5

for epoch in range(num_epochs):
    for text_embeddings, images in dataloader:
        images = images.to(device)
        text_embeddings = text_embeddings.to(device)

        # Diffusion Modelの入力作成
        noise = torch.randn_like(images).to(device)
        timesteps = torch.randint(0, scheduler.num_train_timesteps, (images.size(0),)).to(device)
        noisy_images = scheduler.add_noise(images, noise, timesteps).to(device)

        # モデルによる生成
        model_output = unet(sample=noisy_images, 
        timestep=timesteps, 
        encoder_hidden_states=text_embeddings).sample.to(device)

        # 損失計算
        diffusion_loss = torch.nn.functional.mse_loss(model_output, noise)
        clip_loss = clip_loss_fn(images, text_embeddings)
        loss = diffusion_loss + clip_loss

        # 最適化
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()

    print(f"Epoch {epoch+1}/{num_epochs}, Loss: {loss.item()}")

# 6. モデルの保存
torch.save(unet.state_dict(), "text_to_image_model.pth")


huggingface/tokenizers: The current process just got forked, after parallelism has already been used. Disabling parallelism to avoid deadlocks...
	- Avoid using `tokenizers` before the fork if possible
	- Explicitly set the environment variable TOKENIZERS_PARALLELISM=(true | false)
Repo card metadata block was not found. Setting CardData to empty.


Processing captions for CLIP embeddings...
Text embeddings shape: torch.Size([4, 512])
Processing images...
Processed images shape (before adding alpha channel): torch.Size([4, 3, 256, 256])
Processed images shape (after adding alpha channel): torch.Size([4, 4, 256, 256])


ValueError: not enough values to unpack (expected 3, got 2)

In [2]:
print(dataset[1])

{'image': <PIL.PngImagePlugin.PngImageFile image mode=RGB size=937x937 at 0x7F3CB12AF130>, 'text': 'a man in a hoodie with a fire in the background'}


In [10]:
import torch
from diffusers import UNet2DConditionModel, DDPMScheduler
from diffusers import StableDiffusionPipeline
from transformers import CLIPProcessor, CLIPModel
from PIL import Image
import matplotlib.pyplot as plt

# 1. トレーニング済みモデルとCLIPのロード
device = "cuda" if torch.cuda.is_available() else "cpu"

# Diffusion Model のロード
unet = UNet2DConditionModel.from_pretrained("CompVis/ldm-text2img-large-256")
unet.load_state_dict(torch.load("text_to_image_model.pth", map_location=device))
unet = unet.to(device)

# Scheduler（DDPM）をロード
scheduler = DDPMScheduler.from_pretrained("CompVis/ldm-text2img-large-256")
scheduler.set_timesteps(num_inference_steps=50)  # 推論のステップ数

# CLIPモデルのロード
clip_model = CLIPModel.from_pretrained("openai/clip-vit-base-patch32").to(device)
clip_processor = CLIPProcessor.from_pretrained("openai/clip-vit-base-patch32")

# 2. テキストをエンコードしてCLIPのベクトルを取得
def encode_text(text):
    inputs = clip_processor(text=[text], return_tensors="pt", padding=True, truncation=True)
    text_embeddings = clip_model.get_text_features(**inputs)
    return text_embeddings.to(device)

# 3. ノイズから画像を生成
def generate_image_from_text(text, num_inference_steps=50):
    # テキストのエンコード
    text_embeddings = encode_text(text)

    # ランダムなノイズ画像を初期化
    image_size = (1, 3, 256, 256)  # バッチサイズ=1, チャンネル数=3, 高さ=256, 幅=256
    noisy_image = torch.randn(image_size).to(device)

    # スケジューラのステップを設定
    scheduler.set_timesteps(num_inference_steps)
    for t in scheduler.timesteps:
        # ノイズ除去
        with torch.no_grad():
            noise_pred = unet(noisy_image, t, encoder_hidden_states=text_embeddings).sample
        
        # 画像を次のステップに更新
        noisy_image = scheduler.step(noise_pred, t, noisy_image).prev_sample

    # 最終的な画像を返す
    generated_image = noisy_image.squeeze(0).detach().cpu()
    return generated_image

# 4. 画像を表示する関数
def show_image(tensor_image):
    # Tensorを[0, 1]スケールに変換
    image = (tensor_image * 0.5 + 0.5).clamp(0, 1)  # 正規化解除
    image = image.permute(1, 2, 0).numpy()  # チャンネル次元を最後に移動
    plt.imshow(image)
    plt.axis("off")
    plt.show()

# 5. テキストから画像を生成して表示
prompt = "A cat sitting on a futuristic flying car in a sunset landscape"
generated_image = generate_image_from_text(prompt)
show_image(generated_image)


ModuleNotFoundError: No module named 'matplotlib'

In [27]:
import torch
from diffusers import StableDiffusionPipeline

model_id = "CompVis/stable-diffusion-v1-4"
device = "cuda"


pipe = StableDiffusionPipeline.from_pretrained(model_id, torch_dtype=torch.float16)
pipe = pipe.to(device)

prompt = "a photo of an astronaut riding a horse on mars"
image = pipe(prompt).images[0]  
    
image.save("astronaut_rides_horse.png")


Cannot initialize model with low cpu memory usage because `accelerate` was not found in the environment. Defaulting to `low_cpu_mem_usage=False`. It is strongly recommended to install `accelerate` for faster and less memory-intense model loading. You can do so with: 
```
pip install accelerate
```
.
Loading pipeline components...: 100%|██████████| 7/7 [00:06<00:00,  1.02it/s]
100%|██████████| 50/50 [00:02<00:00, 20.53it/s]
