# CLIP 기반 임베딩 생성 모델

## 목표
이미지를 512차원 벡터로 변환하여 유사도 비교에 사용

## 모델
- **백본**: CLIP (openai/clip-vit-base-patch32)
- **출력**: 512차원 L2 정규화 벡터
- **용도**: 다회용기 이미지 간 유사도 계산

## 특징
- 사전학습된 모델 사용 (학습 불필요)
- 코사인 유사도로 이미지 비교
- 빠른 추론 속도

## 1. 환경 설정

In [None]:
import torch
import numpy as np
from PIL import Image
import matplotlib.pyplot as plt
from transformers import CLIPProcessor, CLIPModel
from sklearn.metrics.pairwise import cosine_similarity
from sklearn.manifold import TSNE
import os
from tqdm import tqdm

# GPU 설정
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
print(f"Using device: {device}")

## 2. CLIP 모델 로드

In [None]:
# CLIP 모델 및 프로세서 로드
model_name = "openai/clip-vit-base-patch32"

print(f"Loading CLIP model: {model_name}")
model = CLIPModel.from_pretrained(model_name)
processor = CLIPProcessor.from_pretrained(model_name)

model = model.to(device)
model.eval()

print(f"✓ Model loaded successfully")
print(f"Embedding dimension: 512")

## 3. 임베딩 생성 함수

In [None]:
def generate_embedding(image_path, model, processor, device):
    """
    이미지에서 512차원 임베딩 벡터 생성
    
    Args:
        image_path: 이미지 파일 경로
        model: CLIP 모델
        processor: CLIP 프로세서
        device: 디바이스 (cuda/cpu)
    
    Returns:
        numpy array: L2 정규화된 512차원 벡터
    """
    # 이미지 로딩
    image = Image.open(image_path).convert('RGB')
    
    # 전처리
    inputs = processor(images=image, return_tensors="pt").to(device)
    
    # 임베딩 생성
    with torch.no_grad():
        image_features = model.get_image_features(**inputs)
    
    # L2 정규화 (코사인 유사도 계산 최적화)
    image_features = image_features / image_features.norm(p=2, dim=-1, keepdim=True)
    
    # numpy 배열로 변환
    embedding = image_features.cpu().numpy().flatten()
    
    return embedding

def generate_embedding_batch(image_paths, model, processor, device, batch_size=32):
    """
    여러 이미지의 임베딩을 배치로 생성
    
    Args:
        image_paths: 이미지 파일 경로 리스트
        model: CLIP 모델
        processor: CLIP 프로세서
        device: 디바이스
        batch_size: 배치 크기
    
    Returns:
        numpy array: (N, 512) 형태의 임베딩 행렬
    """
    embeddings = []
    
    for i in tqdm(range(0, len(image_paths), batch_size), desc='Generating embeddings'):
        batch_paths = image_paths[i:i+batch_size]
        
        # 이미지 로딩
        images = [Image.open(path).convert('RGB') for path in batch_paths]
        
        # 전처리
        inputs = processor(images=images, return_tensors="pt", padding=True).to(device)
        
        # 임베딩 생성
        with torch.no_grad():
            image_features = model.get_image_features(**inputs)
        
        # L2 정규화
        image_features = image_features / image_features.norm(p=2, dim=-1, keepdim=True)
        
        embeddings.append(image_features.cpu().numpy())
    
    return np.vstack(embeddings)

## 4. 유사도 계산 함수

In [None]:
def calculate_cosine_similarity(embedding1, embedding2):
    """
    두 임베딩 벡터 간 코사인 유사도 계산
    
    Args:
        embedding1: 첫 번째 임베딩 벡터
        embedding2: 두 번째 임베딩 벡터
    
    Returns:
        float: 코사인 유사도 (0.0 ~ 1.0)
    """
    # L2 정규화되어 있으면 내적이 곧 코사인 유사도
    similarity = np.dot(embedding1, embedding2)
    return float(similarity)

def find_most_similar(query_embedding, database_embeddings, threshold=0.7):
    """
    데이터베이스에서 가장 유사한 임베딩 찾기
    
    Args:
        query_embedding: 쿼리 임베딩 (512,)
        database_embeddings: DB 임베딩 행렬 (N, 512)
        threshold: 최소 유사도 임계값
    
    Returns:
        tuple: (가장 유사한 인덱스, 유사도) 또는 (None, 0.0)
    """
    if len(database_embeddings) == 0:
        return None, 0.0
    
    # 배치 코사인 유사도 계산
    similarities = np.dot(database_embeddings, query_embedding)
    
    # 최대값 찾기
    max_idx = np.argmax(similarities)
    max_similarity = similarities[max_idx]
    
    # 임계값 체크
    if max_similarity >= threshold:
        return int(max_idx), float(max_similarity)
    else:
        return None, float(max_similarity)

## 5. 테스트: 단일 이미지 임베딩 생성

In [None]:
# 테스트 이미지 경로 (직접 지정)
test_image_path = '../data/test_images/tumbler1.jpg'

# 임베딩 생성
embedding = generate_embedding(test_image_path, model, processor, device)

print(f"Embedding shape: {embedding.shape}")
print(f"Embedding norm (should be ~1.0): {np.linalg.norm(embedding):.4f}")
print(f"First 10 values: {embedding[:10]}")

# 이미지 표시
img = Image.open(test_image_path)
plt.figure(figsize=(6, 6))
plt.imshow(img)
plt.axis('off')
plt.title('Test Image')
plt.show()

## 6. 유사도 테스트: 같은 물체 vs 다른 물체

In [None]:
# 테스트 이미지 3개 준비
# - tumbler1.jpg: 기준 텀블러
# - tumbler2.jpg: 같은 텀블러 (다른 각도)
# - cup.jpg: 다른 컵

image1_path = '../data/test_images/tumbler1.jpg'
image2_path = '../data/test_images/tumbler2.jpg'  # 같은 물체
image3_path = '../data/test_images/cup.jpg'       # 다른 물체

# 임베딩 생성
emb1 = generate_embedding(image1_path, model, processor, device)
emb2 = generate_embedding(image2_path, model, processor, device)
emb3 = generate_embedding(image3_path, model, processor, device)

# 유사도 계산
sim_same = calculate_cosine_similarity(emb1, emb2)
sim_diff = calculate_cosine_similarity(emb1, emb3)

print(f"Similarity (same object, different angle): {sim_same:.4f}")
print(f"Similarity (different object): {sim_diff:.4f}")
print(f"\nDifference: {sim_same - sim_diff:.4f}")

# 시각화
fig, axes = plt.subplots(1, 3, figsize=(15, 5))

axes[0].imshow(Image.open(image1_path))
axes[0].set_title('Reference Image')
axes[0].axis('off')

axes[1].imshow(Image.open(image2_path))
axes[1].set_title(f'Same Object\nSimilarity: {sim_same:.3f}')
axes[1].axis('off')

axes[2].imshow(Image.open(image3_path))
axes[2].set_title(f'Different Object\nSimilarity: {sim_diff:.3f}')
axes[2].axis('off')

plt.tight_layout()
plt.show()

## 7. 배치 임베딩 생성 테스트

In [None]:
# 여러 이미지 경로 준비
test_dir = '../data/test_images'
image_paths = [os.path.join(test_dir, f) for f in os.listdir(test_dir) if f.endswith(('.jpg', '.png'))]

print(f"Found {len(image_paths)} images")

# 배치 임베딩 생성
embeddings = generate_embedding_batch(image_paths, model, processor, device, batch_size=8)

print(f"\nGenerated embeddings shape: {embeddings.shape}")
print(f"All norms ≈ 1.0: {np.allclose(np.linalg.norm(embeddings, axis=1), 1.0)}")

## 8. 유사도 행렬 계산 및 시각화

In [None]:
import seaborn as sns

# 유사도 행렬 계산
similarity_matrix = np.dot(embeddings, embeddings.T)

# 시각화
plt.figure(figsize=(10, 8))
sns.heatmap(similarity_matrix, annot=True, fmt='.2f', cmap='coolwarm', 
            xticklabels=[os.path.basename(p) for p in image_paths],
            yticklabels=[os.path.basename(p) for p in image_paths])
plt.title('Cosine Similarity Matrix')
plt.tight_layout()
plt.show()

## 9. t-SNE 시각화

In [None]:
# t-SNE로 2D 투영
if len(embeddings) >= 3:
    tsne = TSNE(n_components=2, random_state=42, perplexity=min(30, len(embeddings)-1))
    embeddings_2d = tsne.fit_transform(embeddings)
    
    # 시각화
    plt.figure(figsize=(12, 8))
    plt.scatter(embeddings_2d[:, 0], embeddings_2d[:, 1], s=100)
    
    for i, path in enumerate(image_paths):
        plt.annotate(os.path.basename(path), 
                    (embeddings_2d[i, 0], embeddings_2d[i, 1]),
                    fontsize=8)
    
    plt.title('t-SNE Visualization of Image Embeddings')
    plt.xlabel('t-SNE Dimension 1')
    plt.ylabel('t-SNE Dimension 2')
    plt.grid(True, alpha=0.3)
    plt.show()
else:
    print("Need at least 3 images for t-SNE")

## 10. 실전 시나리오: 다회용기 매칭

In [None]:
# 시나리오:
# 1. 사용자가 다회용기 3개를 등록 (데이터베이스)
# 2. 사용 인증 시 촬영한 이미지가 어느 다회용기인지 매칭

# 데이터베이스: 사용자 등록 다회용기 3개
registered_paths = [
    '../data/registered/tumbler_a.jpg',
    '../data/registered/tumbler_b.jpg',
    '../data/registered/tumbler_c.jpg',
]

# 등록 다회용기 임베딩 생성
registered_embeddings = []
for path in registered_paths:
    if os.path.exists(path):
        emb = generate_embedding(path, model, processor, device)
        registered_embeddings.append(emb)

if len(registered_embeddings) > 0:
    registered_embeddings = np.array(registered_embeddings)
    
    # 쿼리: 사용 인증 시 촬영한 이미지
    query_path = '../data/usage/query_tumbler.jpg'
    
    if os.path.exists(query_path):
        query_embedding = generate_embedding(query_path, model, processor, device)
        
        # 매칭
        matched_idx, similarity = find_most_similar(query_embedding, registered_embeddings, threshold=0.7)
        
        if matched_idx is not None:
            print(f"✓ Matched with registered tumbler #{matched_idx}")
            print(f"  Similarity: {similarity:.4f}")
            print(f"  Path: {registered_paths[matched_idx]}")
        else:
            print(f"✗ No match found")
            print(f"  Highest similarity: {similarity:.4f} (below threshold 0.7)")
    else:
        print(f"Query image not found: {query_path}")
else:
    print("No registered images found")

## 11. 임계값 분석

In [None]:
# 다양한 임계값에서 매칭 결과 분석
thresholds = np.arange(0.5, 1.0, 0.05)

if len(registered_embeddings) > 0 and os.path.exists(query_path):
    results = []
    
    for threshold in thresholds:
        matched_idx, similarity = find_most_similar(query_embedding, registered_embeddings, threshold=threshold)
        results.append({
            'threshold': threshold,
            'matched': matched_idx is not None,
            'similarity': similarity
        })
    
    # 시각화
    plt.figure(figsize=(10, 6))
    plt.plot(thresholds, [r['similarity'] for r in results], 'b-', linewidth=2, label='Similarity Score')
    plt.axhline(y=0.7, color='r', linestyle='--', label='User Threshold (0.7)')
    plt.axhline(y=0.75, color='orange', linestyle='--', label='Admin Threshold (0.75)')
    plt.xlabel('Threshold')
    plt.ylabel('Similarity')
    plt.title('Threshold Analysis')
    plt.legend()
    plt.grid(True, alpha=0.3)
    plt.show()
    
    print("\nRecommended thresholds:")
    print(f"  - User registered: 0.70 (덜 엄격)")
    print(f"  - Admin standard: 0.75 (더 엄격)")

## 12. 성능 벤치마크

In [None]:
import time

if os.path.exists(test_image_path):
    # 단일 이미지 추론 속도
    num_runs = 100
    
    start_time = time.time()
    for _ in range(num_runs):
        _ = generate_embedding(test_image_path, model, processor, device)
    end_time = time.time()
    
    avg_time = (end_time - start_time) / num_runs * 1000  # ms
    
    print(f"Single image embedding generation:")
    print(f"  Average time: {avg_time:.2f} ms")
    print(f"  Throughput: {1000/avg_time:.1f} images/sec")
    
    # 유사도 계산 속도
    emb1 = generate_embedding(test_image_path, model, processor, device)
    emb2 = generate_embedding(test_image_path, model, processor, device)
    
    start_time = time.time()
    for _ in range(10000):
        _ = calculate_cosine_similarity(emb1, emb2)
    end_time = time.time()
    
    avg_time_sim = (end_time - start_time) / 10000 * 1000000  # μs
    
    print(f"\nCosine similarity calculation:")
    print(f"  Average time: {avg_time_sim:.2f} μs")
    print(f"  Throughput: {1000000/avg_time_sim:.0f} comparisons/sec")

## 13. 저장 및 로드 테스트

In [None]:
# 임베딩을 파일로 저장 (numpy)
save_path = '../models/weights/sample_embeddings.npy'

if len(embeddings) > 0:
    os.makedirs(os.path.dirname(save_path), exist_ok=True)
    np.save(save_path, embeddings)
    print(f"Embeddings saved to: {save_path}")
    
    # 로드 테스트
    loaded_embeddings = np.load(save_path)
    print(f"Loaded embeddings shape: {loaded_embeddings.shape}")
    print(f"Data match: {np.allclose(embeddings, loaded_embeddings)}")

## 14. FastAPI 통합용 함수

In [None]:
def generate_embedding_from_bytes(image_bytes, model, processor, device):
    """
    바이트 데이터에서 직접 임베딩 생성 (FastAPI 통합용)
    
    Args:
        image_bytes: 이미지 바이트 데이터
        model: CLIP 모델
        processor: CLIP 프로세서
        device: 디바이스
    
    Returns:
        list: 512차원 임베딩 벡터 (리스트 형태)
    """
    from io import BytesIO
    
    # 바이트 → PIL Image
    image = Image.open(BytesIO(image_bytes)).convert('RGB')
    
    # 전처리
    inputs = processor(images=image, return_tensors="pt").to(device)
    
    # 임베딩 생성
    with torch.no_grad():
        image_features = model.get_image_features(**inputs)
    
    # L2 정규화
    image_features = image_features / image_features.norm(p=2, dim=-1, keepdim=True)
    
    # 리스트로 변환 (JSON 직렬화 가능)
    embedding = image_features.cpu().numpy().flatten().tolist()
    
    return embedding

# 테스트
if os.path.exists(test_image_path):
    with open(test_image_path, 'rb') as f:
        image_bytes = f.read()
    
    embedding_list = generate_embedding_from_bytes(image_bytes, model, processor, device)
    
    print(f"Embedding type: {type(embedding_list)}")
    print(f"Embedding length: {len(embedding_list)}")
    print(f"First 5 values: {embedding_list[:5]}")

## 요약

### CLIP 임베딩 모델 특징
- **차원**: 512
- **정규화**: L2 norm = 1.0
- **유사도**: 코사인 유사도 (내적 계산)
- **추론 속도**: ~300ms/image (GPU)

### 권장 임계값
- **사용자 등록 다회용기**: 0.70 (덜 엄격)
- **관리자 표준 DB**: 0.75 (더 엄격)

### 다음 단계
1. FastAPI 서버에 통합 (`ai-server/models/embedding.py`)
2. 실전 데이터로 임계값 조정
3. 데이터베이스에 임베딩 저장 및 검색 최적화