In [1]:

import warnings
warnings.filterwarnings("ignore")

import os
os.environ['PYTHONWARNINGS'] = 'ignore'

import math
import random
from pathlib import Path
from typing import List, Optional, Dict
import pandas as pd
from PIL import Image
import numpy as np
import torch
import torch.nn as nn
import torch.nn.functional as F
from torch.utils.data import Dataset, DataLoader
import pytorch_lightning as pl
import clip  

In [2]:
# 시드 설정 함수
def set_seed(seed: int = 42):
    random.seed(seed)
    np.random.seed(seed)
    torch.manual_seed(seed)
    torch.cuda.manual_seed_all(seed)

In [3]:
# CLIP Contrastive Loss
def clip_contrastive_loss(logits_per_image: torch.Tensor, logits_per_text: torch.Tensor) -> torch.Tensor:
    b = logits_per_image.size(0)
    target = torch.arange(b, device=logits_per_image.device)
    return 0.5 * (F.cross_entropy(logits_per_image, target) +
                  F.cross_entropy(logits_per_text,  target))

In [4]:
class CLIPBackbone(nn.Module):
    def __init__(self, model_name: str = "ViT-B/32", device: Optional[str] = None, finetune: bool = False):
        super().__init__()
        dev = device or ("cuda" if torch.cuda.is_available() else "cpu")
        # clip.load은 모델과 preprocess(transform)를 함께 반환
        model, preprocess = clip.load(model_name, device=dev, jit=False)
        self.model = model
        self.preprocess = preprocess  # PIL->Tensor 변환(논문 통계/해상도 포함)

        if not finetune:
            for p in self.model.parameters():
                p.requires_grad = False

    @property
    def context_length(self) -> int:
        # CLIP 기본 컨텍스트 길이: 77
        return self.model.context_length

    @property
    def logit_scale(self) -> torch.Tensor:
        return self.model.logit_scale

    @property
    def temperature(self) -> torch.Tensor:
        return self.model.logit_scale.exp()

    def tokenize(self, texts: List[str]) -> torch.LongTensor:
        # 공식 BPE 토크나이저 (77 토큰, 필요시 truncate)
        return clip.tokenize(texts, context_length=self.context_length, truncate=True)

    def encode_image(self, images: torch.Tensor) -> torch.Tensor:
        # 이미 L2 normalize된 임베딩 반환
        return self.model.encode_image(images)

    def encode_text(self, text_tokens: torch.LongTensor) -> torch.Tensor:
        return self.model.encode_text(text_tokens)

    def forward(self, images: torch.Tensor, text_tokens: torch.LongTensor):
        img_emb = self.model.encode_image(images)          # (B, D), L2 norm
        txt_emb = self.model.encode_text(text_tokens)      # (B, D), L2 norm
        # 논문식 로짓: s * (I @ T^T)
        logits = self.temperature * img_emb @ txt_emb.t()
        return {
            "image_embeds": img_emb,
            "text_embeds": txt_emb,
            "logits_per_image": logits,
            "logits_per_text": logits.t(),
        }

In [5]:
IMG_EXTS = {".jpg", ".jpeg", ".png", ".bmp", ".webp"}

class ImageTextCsv(Dataset):
    def __init__(self, csv_path: str, img_root: str, preprocess):
        self.df = pd.read_csv(csv_path)
        self.root = Path(img_root)
        self.preprocess = preprocess

    def __len__(self): 
        return len(self.df)

    def __getitem__(self, i) -> Dict[str, object]:
        row = self.df.iloc[i]
        img = Image.open(self.root / row["image_path"]).convert("RGB")
        return {"image": self.preprocess(img), "text": str(row["caption"])}

def build_collate_clip(tokenize, context_length: int = 77):
    def _fn(batch: List[dict]):
        images = torch.stack([b["image"] for b in batch])
        texts  = [b["text"] for b in batch]
        tokens = tokenize(texts)  # (B, 77)
        return {"images": images, "text_tokens": tokens, "raw_texts": texts}
    return _fn

In [6]:
class CLIPLightning(pl.LightningModule):
    def __init__(self, model_name: str = "ViT-B/32", lr: float = 1e-4, weight_decay: float = 0.01,
                 finetune: bool = False):
        super().__init__()
        self.save_hyperparameters()
        self.backbone = CLIPBackbone(model_name=model_name, finetune=finetune)

    @property
    def temperature(self):
        return self.backbone.temperature

    def training_step(self, batch, _):
        out = self.backbone(images=batch["images"], text_tokens=batch["text_tokens"])
        loss = clip_contrastive_loss(out["logits_per_image"], out["logits_per_text"])
        self.log("train/loss", loss, prog_bar=True, on_step=True, on_epoch=True)
        self.log("train/logit_scale", self.temperature, on_step=True, prog_bar=False)
        return loss

    def validation_step(self, batch, _):
        out = self.backbone(images=batch["images"], text_tokens=batch["text_tokens"])
        loss = clip_contrastive_loss(out["logits_per_image"], out["logits_per_text"])
        self.log("val/loss", loss, prog_bar=True, on_epoch=True)

    def on_before_optimizer_step(self, optimizer):
        # 공식 구현 관행: logit_scale 상한 제한(안정화)
        with torch.no_grad():
            self.backbone.logit_scale.clamp_(max=math.log(100.0))

    def configure_optimizers(self):
        # 인코더를 finetune=False로 두면 학습되는 것은 logit_scale뿐. finetune=True면 전체 학습.
        opt = torch.optim.AdamW(self.parameters(), lr=self.hparams.lr, weight_decay=self.hparams.weight_decay, betas=(0.9, 0.98))
        sch = torch.optim.lr_scheduler.CosineAnnealingLR(opt, T_max=self.trainer.max_steps or 1000)
        return {"optimizer": opt, "lr_scheduler": {"scheduler": sch, "interval": "step"}}

In [7]:
class CLIPDataModule(pl.LightningDataModule):
    def __init__(self, train_csv: str, img_root: str, val_csv: str = None, batch_size: int = 128,
                 num_workers: int = 4, model_name: str = "ViT-B/32"):
        super().__init__()
        # preprocess/tokenize는 모델에서 가져오되, 여기서는 별도 로딩 방지를 위해 한 번만 생성
        bb = CLIPBackbone(model_name=model_name, finetune=False)  # 데이터 준비 용도이므로 freeze OK
        self.preprocess = bb.preprocess
        self.tokenize = bb.tokenize
        self.context_length = bb.context_length

        self.train_csv, self.val_csv = train_csv, val_csv
        self.img_root = img_root
        self.batch_size, self.num_workers = batch_size, num_workers

    def setup(self, stage=None):
        self.ds_train = ImageTextCsv(self.train_csv, self.img_root, self.preprocess)
        self.ds_val   = ImageTextCsv(self.val_csv,   self.img_root, self.preprocess) if self.val_csv else None

    def train_dataloader(self):
        return DataLoader(self.ds_train, batch_size=self.batch_size, shuffle=True,
                          num_workers=self.num_workers, pin_memory=True,
                          persistent_workers=self.num_workers>0, prefetch_factor=2 if self.num_workers>0 else None,
                          drop_last=True, collate_fn=build_collate_clip(self.tokenize, self.context_length))

    def val_dataloader(self):
        if self.ds_val is None: return None
        return DataLoader(self.ds_val, batch_size=self.batch_size, shuffle=False,
                          num_workers=self.num_workers, pin_memory=True,
                          persistent_workers=self.num_workers>0, prefetch_factor=2 if self.num_workers>0 else None,
                          drop_last=False, collate_fn=build_collate_clip(self.tokenize, self.context_length))

In [8]:
# 학습 하이퍼파라미터 설정
model_name = "ViT-B/32"  # "ViT-B/32","ViT-B/16","ViT-L/14","RN50" 등
train_csv = "data/train.csv"
val_csv = None  # 검증 데이터가 있으면 경로 지정
img_root = "data/images"
batch_size = 128
num_workers = 0  # 윈도우에서는 0 권장
epochs = 3
lr = 1e-4
weight_decay = 0.01
finetune = False  # True면 전체 미세조정, False면 logit_scale만 학습
seed = 42

In [13]:
project_root = Path("C:/python/clip-demo")  # 실제 프로젝트 경로
if project_root.exists():
    os.chdir(project_root)
    print(f"작업 디렉토리를 {project_root}로 변경했습니다")
else:
    print("프로젝트 디렉토리를 찾을 수 없습니다. 경로를 확인해주세요.")

print(f"현재 작업 디렉토리: {os.getcwd()}")

# 시드 설정
set_seed(seed)

# 데이터 모듈 생성
dm = CLIPDataModule(train_csv, img_root, val_csv,
                    batch_size=batch_size, num_workers=num_workers,
                    model_name=model_name)

# 모델 생성
model = CLIPLightning(model_name=model_name, lr=lr, weight_decay=weight_decay, finetune=finetune)

# 트레이너 설정
trainer = pl.Trainer(
    max_epochs=epochs,
    precision="32",  # FP32로 변경하여 mixed precision 문제 해결
    gradient_clip_val=1.0,
    log_every_n_steps=10,
    enable_checkpointing=False,
    accelerator="gpu" if torch.cuda.is_available() else "cpu",
    devices=1,
    num_sanity_val_steps=0,  # validation sanity check 비활성화
    limit_val_batches=0,  # validation 배치 수를 0으로 설정
)

# 학습 시작
trainer.fit(model, dm)

작업 디렉토리를 C:\python\clip-demo로 변경했습니다
현재 작업 디렉토리: C:\python\clip-demo


GPU available: True (cuda), used: True
TPU available: False, using: 0 TPU cores
HPU available: False, using: 0 HPUs
LOCAL_RANK: 0 - CUDA_VISIBLE_DEVICES: [0]

  | Name     | Type         | Params | Mode 
--------------------------------------------------
0 | backbone | CLIPBackbone | 151 M  | train
--------------------------------------------------
0         Trainable params
151 M     Non-trainable params
151 M     Total params
605.109   Total estimated model params size (MB)
1         Modules in train mode
227       Modules in eval mode
`Trainer.fit` stopped: No training batches.


In [16]:
# 학습된 모델로 간단한 테스트
model.eval()

# 디바이스 확인 및 설정
device = next(model.parameters()).device
print(f"모델이 위치한 디바이스: {device}")

with torch.no_grad():
    # 텍스트 임베딩 생성 (정규화된 임베딩 직접 사용)
    texts = ["a photo of a cat", "a photo of a dog"]
    text_tokens = model.backbone.tokenize(texts)
    
    # 토큰을 모델과 같은 디바이스로 이동
    text_tokens = text_tokens.to(device)
    print(f"텍스트 토큰 디바이스: {text_tokens.device}")
    
    # 정규화된 텍스트 임베딩 얻기 (temperature 곱하기 전)
    text_embeds = model.backbone.encode_text(text_tokens)
    
    print(f"텍스트 임베딩 크기: {text_embeds.shape}")
    print(f"텍스트 임베딩 디바이스: {text_embeds.device}")
    print(f"임베딩 norm: {text_embeds.norm(dim=-1)}")  # L2 정규화 확인
    
    # 코사인 유사도 계산 (정규화된 벡터 간의 내적)
    cosine_similarity = (text_embeds[0] @ text_embeds[1]).item()
    print(f"텍스트 간 코사인 유사도: {cosine_similarity:.4f}")
    
    # temperature가 적용된 유사도 (CLIP에서 실제 사용되는 값)
    temperature = model.backbone.temperature.item()
    scaled_similarity = temperature * cosine_similarity
    print(f"Temperature: {temperature:.4f}")
    print(f"Temperature 적용된 유사도: {scaled_similarity:.4f}")
    
    # 추가 테스트: 다른 텍스트들과의 유사도
    more_texts = ["a photo of a cat", "a photo of a dog", "a photo of a car", "a photo of a tree"]
    more_tokens = model.backbone.tokenize(more_texts)
    more_tokens = more_tokens.to(device)  # 디바이스 맞추기
    
    more_embeds = model.backbone.encode_text(more_tokens)
    similarity_matrix = more_embeds @ more_embeds.t()
    
    print("\n텍스트 간 유사도 매트릭스 (코사인 유사도):")
    for i, text1 in enumerate(more_texts):
        for j, text2 in enumerate(more_texts):
            sim = similarity_matrix[i, j].item()
            print(f"{text1[:15]:15} vs {text2[:15]:15}: {sim:.4f}")

모델이 위치한 디바이스: cpu
텍스트 토큰 디바이스: cpu
텍스트 임베딩 크기: torch.Size([2, 512])
텍스트 임베딩 디바이스: cpu
임베딩 norm: tensor([10.4766, 10.6953], dtype=torch.float16)
텍스트 간 코사인 유사도: 104.3125
Temperature: 100.0000
Temperature 적용된 유사도: 10431.2508

텍스트 간 유사도 매트릭스 (코사인 유사도):
a photo of a ca vs a photo of a ca: 109.6875
a photo of a ca vs a photo of a do: 104.3125
a photo of a ca vs a photo of a ca: 95.2500
a photo of a ca vs a photo of a tr: 81.3125
a photo of a do vs a photo of a ca: 104.3125
a photo of a do vs a photo of a do: 114.4375
a photo of a do vs a photo of a ca: 98.2500
a photo of a do vs a photo of a tr: 83.3750
a photo of a ca vs a photo of a ca: 95.2500
a photo of a ca vs a photo of a do: 98.2500
a photo of a ca vs a photo of a ca: 109.4375
a photo of a ca vs a photo of a tr: 81.9375
a photo of a tr vs a photo of a ca: 81.3125
a photo of a tr vs a photo of a do: 83.3750
a photo of a tr vs a photo of a ca: 81.9375
a photo of a tr vs a photo of a tr: 89.6875
