In [1]:
import os
import json
import torch
import wandb
from torch.optim import Adam
from torch.utils.data import Dataset, DataLoader
from transformers import CLIPProcessor, CLIPModel, Trainer, TrainingArguments
from PIL import Image
from torch.nn.utils.rnn import pad_sequence

In [None]:
model = CLIPModel.from_pretrained("openai/clip-vit-base-patch32")
processor = CLIPProcessor.from_pretrained("openai/clip-vit-base-patch32")


In [None]:
# 1. Custom Dataset 준비 (JSON 파일에서 이미지와 텍스트 정보 로드)
class CustomCLIPDataset(Dataset):
    def __init__(self, json_path, image_dir, processor):
        with open(json_path, 'r') as f:
            self.data = json.load(f)  # JSON 파일 로드
        self.processor = processor
        self.image_dir = image_dir


    def __len__(self):
        return len(self.data)

    def __getitem__(self, idx):
        item = self.data[idx]
        image_id = str(item['image_id']).zfill(12)
        image_path = os.path.join(self.image_dir, f"{image_id}.jpg")
        caption = item['caption']
        
        # 이미지 로드 및 전처리
        image = Image.open(image_path).convert("RGB")
        inputs = self.processor(images=image, text=[caption], return_tensors="pt", padding=True)
        
        return {
            "input_ids": inputs["input_ids"].squeeze(),
            "attention_mask": inputs["attention_mask"].squeeze(),
            "pixel_values": inputs["pixel_values"].squeeze()
        }

In [None]:
def collate_fn(batch):
    input_ids = [item['input_ids'].squeeze(0) for item in batch]
    pixel_values = [item['pixel_values'].squeeze(0) for item in batch]
    
    input_ids_padded = pad_sequence(input_ids, batch_first=True)
    pixel_values = torch.stack(pixel_values)  
    
    return {
        'input_ids': input_ids_padded,
        'pixel_values': pixel_values
    }

In [None]:
# # 커스텀 Trainer 클래스 정의
# class CLIPTrainer(Trainer):
#     def compute_loss(self, model, inputs, return_outputs=False):
#         # 모델에서 출력 얻기
#         outputs = model(input_ids=inputs["input_ids"], 
#                         attention_mask=inputs["attention_mask"], 
#                         pixel_values=inputs["pixel_values"])
        
#         # 텍스트 및 이미지 임베딩 추출
#         image_features = outputs.image_embeds
#         text_features = outputs.text_embeds
        
#         # Contrastive Loss 계산
#         loss_fn = ContrastiveLoss()
#         loss = loss_fn(image_features, text_features)
        
#         return (loss, outputs) if return_outputs else loss

## 캐글: Add-ons에서 본인 wandb키 입력하기 
## 로컬: 알아서 ㅎㅎ

In [None]:
from kaggle_secrets import UserSecretsClient
user_secrets = UserSecretsClient()
secret_value = user_secrets.get_secret("wandbkey")
os.environ["WANDB_API_KEY"] = secret_value

wandb.login()

## 파라미터

In [None]:
sweep_config = {
    'method': 'random',  # 하이퍼파라미터 검색 방법 ('grid', 'random', 'bayes' 중 선택)
    'metric': {
        'name': 'loss',  # 최적화할 메트릭 이름
        'goal': 'minimize'  # 목표: 'maximize' 또는 'minimize'
    },
    'parameters': {
        'batch_size': {
            'values': [16, 32, 64]  # 실험할 배치 크기 값 목록
        },
        'learning_rate': {
            'distribution': 'uniform',  # 'uniform' 분포에서 값을 샘플링
            'min': 0.0001,
            'max': 0.001
        },
        'epochs': {
            'values': [10, 20, 30]  # 실험할 epoch 값 목록
        }
    }
}

In [None]:
sweep_id = wandb.sweep(sweep_config, project="clip_experiment")


## 데이터로더 부분 경로 바꿔줘야합니다

In [None]:
train_image_dir = "/kaggle/input/1000-coco-final/realrealreal_coco_dataset/realreal_train"
val_image_dir = "/kaggle/input/1000-coco-final/realrealreal_coco_dataset/realreal_val"
train_json_file = "/kaggle/input/1000-coco-final/realrealreal_coco_dataset/realreal_train_captions.json"
val_json_file = "/kaggle/input/1000-coco-final/realrealreal_coco_dataset/realreal_val_captions.json"

# 3. Initialize Dataset and DataLoader
train_dataset = CustomCLIPDataset(train_json_file, train_image_dir, processor)
val_dataset = CustomCLIPDataset(val_json_file, val_image_dir, processor)



## TRAIN!!

In [None]:
device = "cuda" if torch.cuda.is_available() else "cpu"

def train():
    wandb.init()  # Initialize W&B run
    batch_size = wandb.config.batch_size
    learning_rate = wandb.config.learning_rate
    epochs = wandb.config.epochs

    train_dataloader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True, collate_fn=collate_fn)
    val_dataloader = DataLoader(val_dataset, batch_size=batch_size, shuffle=False, collate_fn=collate_fn)

    optimizer = Adam(model.parameters(), lr=learning_rate)
    loss_fn = torch.nn.CrossEntropyLoss()

    model.to(device)

    best_val_accuracy = 0.0  # Initialize the best validation accuracy

    for epoch in range(epochs):
        model.train()
        total_loss = 0
        
        for batch in train_dataloader:
            optimizer.zero_grad()

            input_ids = batch['input_ids'].to(device)
            pixel_values = batch['pixel_values'].to(device)
            
            outputs = model(input_ids=input_ids, pixel_values=pixel_values)
            logits_per_image = outputs.logits_per_image
            labels = torch.arange(logits_per_image.size(0)).to(logits_per_image.device)

            # Compute loss
            loss = loss_fn(logits_per_image, labels)
            loss.backward()
            optimizer.step()

            total_loss += loss.item()

        avg_train_loss = total_loss / len(train_dataloader)
        print(f"Epoch {epoch + 1}/{epochs}, Loss: {avg_train_loss}")

        # Validation Loop
        model.eval()
        correct = 0
        total = 0

        with torch.no_grad():
            for batch in val_dataloader:
                input_ids = batch['input_ids'].to(device)
                pixel_values = batch['pixel_values'].to(device)

                outputs = model(input_ids=input_ids, pixel_values=pixel_values)
                logits_per_image = outputs.logits_per_image
                predictions = torch.argmax(logits_per_image, dim=1)
                labels = torch.arange(predictions.size(0)).to(predictions.device)

                correct += (predictions == labels).sum().item()
                total += labels.size(0)

        val_accuracy = correct / total
        print(f"Validation Accuracy: {val_accuracy}")

        # Log metrics to W&B
        wandb.log({"epoch": epoch, "loss": avg_train_loss, "val_accuracy": val_accuracy})

        # Check if this is the best model so far
        if val_accuracy > best_val_accuracy:
            best_val_accuracy = val_accuracy
            # Save the best model
            save_path = "best_model.pth"
            torch.save(model.state_dict(), save_path)
            print(f"Best model saved with accuracy: {best_val_accuracy}")

            # Save model to W&B
            artifact = wandb.Artifact('best_model', type='model')
            artifact.add_file(save_path)
            wandb.log_artifact(artifact)

# Start the W&B sweep agent
wandb.agent(sweep_id, train)

## 코사인 유사도 비교

In [None]:
trained_model = CLIPModel.from_pretrained("openai/clip-vit-base-patch32")
trained_model.load_state_dict(torch.load("best_model.pth"))
trained_model.to(device)
trained_model.eval()

original_model = CLIPModel.from_pretrained("openai/clip-vit-base-patch32")
original_model.to(device)
original_model.eval()

processor = CLIPProcessor.from_pretrained("openai/clip-vit-base-patch32")


## 원하는 텍스트 데이터 입력
- 1.	“A black SUV parked next to a ‘Pay Here’ parking meter on a busy street.”
- 2.	“Cars lined up along the street near a parking sign, with a view of distant hills.”
- 3.	“A parking area with vehicles and a visible parking payment station in the foreground.”
- 4.	“An urban street scene with parked cars and a ‘Pay Here’ sign for parking fees.”
- 5.	“A black vehicle with a license plate parked beside a meter that says ‘Pay Here’.”

In [None]:
image_path = "/Users/psjj/Downloads/coco2017/realrealreal_coco_dataset/realreal_test/000000092212.jpg"  # 테스트할 이미지 경로
text = "Your descriptive text here" 

image = Image.open(image_path).convert("RGB")
inputs = processor(images=image, text=[text], return_tensors="pt", padding=True)
pixel_values = inputs["pixel_values"].to(device)
input_ids = inputs["input_ids"].to(device)
attention_mask = inputs["attention_mask"].to(device)



In [None]:
with torch.no_grad():
    # 학습된 모델 임베딩
    trained_outputs = trained_model(input_ids=input_ids, pixel_values=pixel_values)
    trained_image_embedding = trained_outputs.image_embeds
    trained_text_embedding = trained_outputs.text_embeds

    # 기존 모델 임베딩
    original_outputs = original_model(input_ids=input_ids, pixel_values=pixel_values)
    original_image_embedding = original_outputs.image_embeds
    original_text_embedding = original_outputs.text_embeds

In [None]:
import torch.nn.functional as F

trained_cos_sim = F.cosine_similarity(trained_image_embedding, trained_text_embedding)
original_cos_sim = F.cosine_similarity(original_image_embedding, original_text_embedding)

print(f"Trained Model Cosine Similarity: {trained_cos_sim.item()}")
print(f"Original Model Cosine Similarity: {original_cos_sim.item()}")

# 비교
if trained_cos_sim > original_cos_sim:
    print("The trained model has a higher cosine similarity.")
else:
    print("The original model has a higher or equal cosine similarity.")