In [1]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [None]:
pip install pyngrok streamlit

Collecting pyngrok
  Downloading pyngrok-7.2.11-py3-none-any.whl.metadata (9.4 kB)
Collecting streamlit
  Downloading streamlit-1.46.0-py3-none-any.whl.metadata (9.0 kB)
Collecting watchdog<7,>=2.1.5 (from streamlit)
  Downloading watchdog-6.0.0-py3-none-manylinux2014_x86_64.whl.metadata (44 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m44.3/44.3 kB[0m [31m2.3 MB/s[0m eta [36m0:00:00[0m
Collecting pydeck<1,>=0.8.0b4 (from streamlit)
  Downloading pydeck-0.9.1-py2.py3-none-any.whl.metadata (4.1 kB)
Downloading pyngrok-7.2.11-py3-none-any.whl (25 kB)
Downloading streamlit-1.46.0-py3-none-any.whl (10.1 MB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m10.1/10.1 MB[0m [31m45.4 MB/s[0m eta [36m0:00:00[0m
[?25hDownloading pydeck-0.9.1-py2.py3-none-any.whl (6.9 MB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m6.9/6.9 MB[0m [31m63.3 MB/s[0m eta [36m0:00:00[0m
[?25hDownloading watchdog-6.0.0-py3-none-manylinux2014_x86_64.wh

In [None]:
import os
from google.colab import userdata

ngrok_token = userdata.get("NGROK_API_KEY")
!ngrok config add-authtoken {ngrok_token}

Authtoken saved to configuration file: /root/.config/ngrok/ngrok.yml


In [None]:
import json
import subprocess
import os
from google.colab import userdata
import signal
import time
import streamlit as st
import torch
import numpy as np
from torch import optim
from PIL import Image
import torch.nn.functional as F
import cv2
import requests
import base64
import uuid
import re
import glob
from collections import defaultdict
from transformers import CLIPProcessor, CLIPModel
from torch.utils.data import Dataset, DataLoader
from sklearn.metrics import accuracy_score, precision_recall_fscore_support, confusion_matrix, classification_report
from torchvision import transforms
from sklearn.model_selection import train_test_split
from sklearn.metrics import accuracy_score, classification_report, confusion_matrix

### CLIP 모델 파인튜닝

In [None]:
disposal_map = {
    "알약_블리스터팩": "겉 포장지(종이 또는 플라스틱 필름)를 제거하고, 플라스틱 블리스터팩은 그대로 의약품 수거함에 버리세요.",
    "알약_블리스터팩_박스": "종이 박스는 분리하여 종이 재활용함에 버리고, 겉 포장지(종이 또는 플라스틱 필름)를 제거한 후 플라스틱 블리스터팩은 의약품 수거함에 버리세요.",
    "안약": "용기 그대로 의약품 수거함에 버리세요. 남은 약액은 그대로 두셔도 됩니다.",
    "연고": "튜브를 완전히 비우고 의약품 수거함에 버리세요.",
    "유리병": "약은 폐의약품 수거함에 버리고, 유리병은 내용물을 완전히 비운 후 깨끗이 헹궈 병류로 분리배출하세요.",
}

labels = ["알약_블리스터팩", "알약_블리스터팩_박스", "안약", "연고", "유리병"]

text_descriptions = [
    "pill blister pack with plastic packaging",
    "pill blister pack in cardboard box",
    "small eye drop bottle",
    "ointment tube for skin",
    "glass bottle for medicine"
]

class SimpleMedicineDataset(Dataset):
    def __init__(self, data_list, processor):
        self.processor = processor
        self.data = data_list
        print(f"데이터셋 크기: {len(self.data)}")

    def __len__(self):
        return len(self.data)

    def __getitem__(self, idx):
        try:
            item = self.data[idx]
            image = Image.open(item['image_path']).convert('RGB')

            return {
                'image': image,
                'text': item['text'],
                'label': item['label']
            }

        except Exception as e:
            print(f"데이터 로드 오류: {e}")
            return self.__getitem__((idx + 1) % len(self.data))

def load_data(data_dir):
    all_data = []
    existing_labels = []
    existing_texts = []
    label_mapping = {}

    new_label_idx = 0
    for original_idx, label_name in enumerate(labels):
        folder_path = os.path.join(data_dir, label_name)
        if os.path.exists(folder_path):
            img_files = [f for f in os.listdir(folder_path) if f.lower().endswith(('.jpg', '.jpeg', '.png', '.bmp'))]
            if len(img_files) > 0:
                existing_labels.append(label_name)
                existing_texts.append(text_descriptions[original_idx])
                label_mapping[original_idx] = new_label_idx

                # 데이터 로드
                for img_file in img_files:
                    all_data.append({
                        'image_path': os.path.join(folder_path, img_file),
                        'label': new_label_idx,
                        'text': text_descriptions[original_idx]
                    })

                new_label_idx += 1

    print(f"총 {len(all_data)}개 데이터 로드됨")
    print(f"실제 존재하는 클래스: {len(existing_labels)}개")

    for i, label_name in enumerate(existing_labels):
        count = sum(1 for item in all_data if item['label'] == i)
        print(f"{label_name}: {count}개")

    if len(all_data) == 0:
        return [], [], existing_labels, existing_texts

    train_data, test_data = train_test_split(
        all_data,
        test_size=0.2,
        random_state=42,
        stratify=[item['label'] for item in all_data]
    )

    print(f"학습 데이터: {len(train_data)}개")
    print(f"테스트 데이터: {len(test_data)}개")

    return train_data, test_data, existing_labels, existing_texts

def collate_fn(batch):
    images = [item['image'] for item in batch]
    texts = [item['text'] for item in batch]
    labels = [item['label'] for item in batch]

    return {
        'images': images,
        'texts': texts,
        'labels': torch.tensor(labels, dtype=torch.long)
    }

def evaluate_model(model, processor, test_data, existing_labels, existing_texts, device, batch_size=4):
    print("\n=== 모델 성능 평가 ===")

    test_dataset = SimpleMedicineDataset(test_data, processor)
    test_loader = DataLoader(test_dataset, batch_size=batch_size, shuffle=False, num_workers=0, collate_fn=collate_fn)

    model.eval()
    all_predictions = []
    all_labels = []

    with torch.no_grad():
        for batch in test_loader:
            try:
                images = batch['images']
                texts = batch['texts']
                labels = batch['labels']

                inputs = processor(
                    images=images,
                    text=existing_texts,
                    return_tensors="pt",
                    padding=True,
                    truncation=True
                )

                pixel_values = inputs['pixel_values'].to(device)
                input_ids = inputs['input_ids'].to(device)
                attention_mask = inputs['attention_mask'].to(device)

                outputs = model(
                    pixel_values=pixel_values,
                    input_ids=input_ids,
                    attention_mask=attention_mask
                )

                # 유사도 계산
                logits_per_image = outputs.logits_per_image
                predictions = torch.argmax(logits_per_image, dim=1)

                all_predictions.extend(predictions.cpu().numpy())
                all_labels.extend(labels.numpy())

            except Exception as e:
                print(f"평가 중 오류: {e}")
                continue

    accuracy = accuracy_score(all_labels, all_predictions)

    print(f"전체 정확도: {accuracy:.4f} ({accuracy*100:.2f}%)")

    return accuracy, all_predictions, all_labels

def train_model(data_dir, save_dir="/content/drive/MyDrive/trash_ai/clip_medication_test", epochs=5, batch_size=2, learning_rate=1e-5):
    print("=== CLIP 모델 학습 시작 ===")
    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

    train_data, test_data, existing_labels, existing_texts = load_data(data_dir)

    if len(train_data) == 0:
        print("❌ 학습 데이터가 없습니다.")
        return None, None

    print(f"실제 사용될 클래스들: {existing_labels}")

    model = CLIPModel.from_pretrained("openai/clip-vit-base-patch32")
    processor = CLIPProcessor.from_pretrained("openai/clip-vit-base-patch32")

    model.to(device)

    for param in model.parameters():
        param.requires_grad = True

    train_dataset = SimpleMedicineDataset(train_data, processor)
    train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True, num_workers=0, collate_fn=collate_fn)
    optimizer = optim.AdamW(model.parameters(), lr=learning_rate, weight_decay=0.01)

    os.makedirs(save_dir, exist_ok=True)

    if len(test_data) > 0:
        initial_accuracy, _, _ = evaluate_model(model, processor, test_data, existing_labels, existing_texts, device)

    model.train()
    best_accuracy = 0

    for epoch in range(epochs):
        total_loss = 0
        batch_count = 0

        for batch_idx, batch in enumerate(train_loader):
            try:
                images = batch['images']
                texts = batch['texts']
                labels = batch['labels'].to(device)

                inputs = processor(
                    images=images,
                    text=texts,
                    return_tensors="pt",
                    padding=True,
                    truncation=True
                )

                pixel_values = inputs['pixel_values'].to(device)
                input_ids = inputs['input_ids'].to(device)
                attention_mask = inputs['attention_mask'].to(device)

                outputs = model(
                    pixel_values=pixel_values,
                    input_ids=input_ids,
                    attention_mask=attention_mask,
                    return_loss=True
                )

                loss = outputs.loss

                optimizer.zero_grad()
                loss.backward()
                optimizer.step()

                total_loss += loss.item()
                batch_count += 1

            except Exception as e:
                print(f"배치 처리 오류: {e}")
                continue

        if batch_count > 0:
            avg_loss = total_loss / batch_count

            if len(test_data) > 0:
                accuracy, _, _ = evaluate_model(model, processor, test_data, existing_labels, existing_texts, device)

                # 최고 성능 모델 저장
                if accuracy > best_accuracy:
                    best_accuracy = accuracy
                    try:
                        best_dir = os.path.join(save_dir, "best")
                        os.makedirs(best_dir, exist_ok=True)
                        model.save_pretrained(best_dir)
                        processor.save_pretrained(best_dir)

                        import json
                        label_info = {
                            'labels': existing_labels,
                            'texts': existing_texts
                        }
                        with open(os.path.join(best_dir, 'label_info.json'), 'w', encoding='utf-8') as f:
                            json.dump(label_info, f, ensure_ascii=False, indent=2)

                    except Exception as e:
                        print(f"❌ 최고 성능 모델 저장 실패: {e}")
                model.train()

    # 최종 모델 저장
    try:
        final_dir = os.path.join(save_dir, "final")
        os.makedirs(final_dir, exist_ok=True)
        model.save_pretrained(final_dir)
        processor.save_pretrained(final_dir)

        import json
        label_info = {
            'labels': existing_labels,
            'texts': existing_texts
        }
        with open(os.path.join(final_dir, 'label_info.json'), 'w', encoding='utf-8') as f:
            json.dump(label_info, f, ensure_ascii=False, indent=2)

        print("✅ 최종 모델 저장 완료")
    except Exception as e:
        print(f"❌ 최종 모델 저장 실패: {e}")

    if len(test_data) > 0:
        print(f"\n=== 최종 성능 평가 ===")
        final_accuracy, predictions, true_labels = evaluate_model(model, processor, test_data, existing_labels, existing_texts, device)
        print(f"최고 달성 정확도: {best_accuracy:.4f} ({best_accuracy*100:.2f}%)")

    print("=== 학습 완료 ===")
    return model, processor

if __name__ == "__main__":
    model, processor = train_model("/content/drive/MyDrive/trash_ai/Medicine_data")

=== CLIP 모델 학습 시작 ===
총 376개 데이터 로드됨
실제 존재하는 클래스: 5개
알약_블리스터팩: 99개
알약_블리스터팩_박스: 162개
안약: 37개
연고: 59개
유리병: 19개
학습 데이터: 300개
테스트 데이터: 76개
실제 사용될 클래스들: ['알약_블리스터팩', '알약_블리스터팩_박스', '안약', '연고', '유리병']
데이터셋 크기: 300

=== 모델 성능 평가 ===
데이터셋 크기: 76
전체 정확도: 0.5921 (59.21%)

=== 모델 성능 평가 ===
데이터셋 크기: 76
전체 정확도: 0.9211 (92.11%)

=== 모델 성능 평가 ===
데이터셋 크기: 76
전체 정확도: 0.8158 (81.58%)

=== 모델 성능 평가 ===
데이터셋 크기: 76
전체 정확도: 0.9342 (93.42%)

=== 모델 성능 평가 ===
데이터셋 크기: 76
전체 정확도: 0.9342 (93.42%)

=== 모델 성능 평가 ===
데이터셋 크기: 76
전체 정확도: 0.9211 (92.11%)
✅ 최종 모델 저장 완료

=== 최종 성능 평가 ===

=== 모델 성능 평가 ===
데이터셋 크기: 76
전체 정확도: 0.9211 (92.11%)
최고 달성 정확도: 0.9342 (93.42%)
=== 학습 완료 ===


###CLIP 모델 성능 평가

In [None]:
def detailed_evaluate_model(model, processor, test_data, existing_labels, existing_texts, device, batch_size=4):
    print("최종 모델 성능 평가")
    print("="*50)

    test_dataset = SimpleMedicineDataset(test_data, processor)
    test_loader = DataLoader(test_dataset, batch_size=batch_size, shuffle=False, num_workers=0, collate_fn=collate_fn)

    model.eval()
    all_predictions = []
    all_labels = []
    class_correct = [0] * len(existing_labels)
    class_total = [0] * len(existing_labels)

    with torch.no_grad():
        for batch in test_loader:
            try:
                images = batch['images']
                texts = batch['texts']
                labels = batch['labels']

                inputs = processor(
                    images=images,
                    text=existing_texts,
                    return_tensors="pt",
                    padding=True,
                    truncation=True
                )

                pixel_values = inputs['pixel_values'].to(device)
                input_ids = inputs['input_ids'].to(device)
                attention_mask = inputs['attention_mask'].to(device)

                outputs = model(
                    pixel_values=pixel_values,
                    input_ids=input_ids,
                    attention_mask=attention_mask
                )

                logits_per_image = outputs.logits_per_image
                predictions = torch.argmax(logits_per_image, dim=1)

                all_predictions.extend(predictions.cpu().numpy())
                all_labels.extend(labels.numpy())

                for i, (pred, true) in enumerate(zip(predictions.cpu().numpy(), labels.numpy())):
                    class_total[true] += 1
                    if pred == true:
                        class_correct[true] += 1

            except Exception as e:
                print(f"평가 중 오류: {e}")
                continue

    overall_accuracy = accuracy_score(all_labels, all_predictions)

    precision, recall, f1, support = precision_recall_fscore_support(
        all_labels, all_predictions, average=None, zero_division=0
    )

    avg_precision = np.mean(precision)
    avg_recall = np.mean(recall)
    avg_f1 = np.mean(f1)

    cm = confusion_matrix(all_labels, all_predictions)

    print(f"\n📊 전체 성능 요약")
    print(f"{'='*30}")
    print(f"전체 정확도: {overall_accuracy:.4f} ({overall_accuracy*100:.2f}%)")
    print(f"평균 Precision: {avg_precision:.4f} ({avg_precision*100:.2f}%)")
    print(f"평균 Recall: {avg_recall:.4f} ({avg_recall*100:.2f}%)")
    print(f"평균 F1-Score: {avg_f1:.4f} ({avg_f1*100:.2f}%)")
    print(f"총 테스트 샘플 수: {len(all_labels)}개")

    return {
        'overall_accuracy': overall_accuracy,
        'class_accuracies': [class_correct[i] / max(class_total[i], 1) for i in range(len(existing_labels))],
        'precision': precision,
        'recall': recall,
        'f1': f1,
        'support': support,
        'confusion_matrix': cm,
        'class_correct': class_correct,
        'class_total': class_total,
        'predictions': all_predictions,
        'true_labels': all_labels
    }

def load_best_model_and_evaluate(model_dir, test_data_dir):
    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
    best_model_path = os.path.join(model_dir, "best")

    try:
        model = CLIPModel.from_pretrained(best_model_path)
        processor = CLIPProcessor.from_pretrained(best_model_path)
        model.to(device)
        print("✅ Best 모델 로드 완료")
    except Exception as e:
        print(f"❌ 모델 로드 실패: {e}")
        return None

    try:
        with open(os.path.join(best_model_path, 'label_info.json'), 'r', encoding='utf-8') as f:
            label_info = json.load(f)
        existing_labels = label_info['labels']
        existing_texts = label_info['texts']
        print(f"✅ 라벨 정보 로드 완료: {len(existing_labels)}개 클래스")
    except Exception as e:
        print(f"❌ 라벨 정보 로드 실패: {e}")
        return None

    train_data, test_data, _, _ = load_data(test_data_dir)

    if len(test_data) == 0:
        print("❌ 테스트 데이터가 없습니다.")
        return None

    results = detailed_evaluate_model(
        model, processor, test_data, existing_labels, existing_texts, device
    )

    return results

# 사용 예시
if __name__ == "__main__":
    # best 모델로 상세 평가 수행
    model_dir = "/content/drive/MyDrive/trash_ai/clip_medication_test"
    test_data_dir = "/content/drive/MyDrive/trash_ai/Medicine_data"

    results = load_best_model_and_evaluate(model_dir, test_data_dir)

✅ Best 모델 로드 완료
✅ 라벨 정보 로드 완료: 5개 클래스
총 376개 데이터 로드됨
실제 존재하는 클래스: 5개
알약_블리스터팩: 99개
알약_블리스터팩_박스: 162개
안약: 37개
연고: 59개
유리병: 19개
학습 데이터: 300개
테스트 데이터: 76개
최종 모델 성능 평가
데이터셋 크기: 76

📊 전체 성능 요약
전체 정확도: 0.9342 (93.42%)
평균 Precision: 0.9239 (92.39%)
평균 Recall: 0.8339 (83.39%)
평균 F1-Score: 0.8286 (82.86%)
총 테스트 샘플 수: 76개


###OCR 인식 및 결과 보정

In [None]:
folder_to_label = {
    "알약_블리스터팩": 0,
    "알약_블리스터팩_박스": 1,
    "안약": 2,
    "연고": 3,
    "유리병": 4,
}

medication_keywords = {
    "pill_related": [
        "캡슐", "mg", "정", "캡", "알약", "환", "tablet", "capsule", "cap", "pill",
        "Tablet", "Capsule", "Cap", "Pill", "TABLET", "CAPSULE", "CAP", "PILL",
        "MG", "mg", "Mg", "mcg", "µg", "IU", "unit", "Units", "UNITS"
    ],
    "ointment_related": [
        "연고", "크림", "젤", "로션", "밤", "연고제", "%",
        "ointment", "cream", "gel", "lotion", "balm",
        "Ointment", "Cream", "Gel", "Lotion", "Balm",
        "OINTMENT", "CREAM", "GEL", "LOTION", "BALM"
    ],
    "eye_drop_related": [
        "안약", "점안액", "점안제", "eye", "drop", "drops",
        "Eye", "Drop", "Drops", "EYE", "DROP", "DROPS",
        "ophthalmic", "Ophthalmic", "OPHTHALMIC"
    ]
}

def load_model(model_path="/content/drive/MyDrive/trash_ai/clip_medication_test/best"):
    """모델과 프로세서를 로드하는 함수"""
    print("🔄 모델 로딩 중...")

    try:
        device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
        model = CLIPModel.from_pretrained(model_path)
        processor = CLIPProcessor.from_pretrained(model_path)
        model.to(device)

        with open(os.path.join(model_path, 'label_info.json'), 'r', encoding='utf-8') as f:
            label_info = json.load(f)

        print("✅ 모델 로딩 완료!")
        print(f"📋 로드된 클래스: {label_info['labels']}")
        return model, processor, label_info

    except Exception as e:
        print(f"❌ 모델 로드 실패: {e}")
        return None, None, None

#OCR 텍스트에서 의약품 관련 키워드 분석
def analyze_medication_keywords(ocr_text):
    if not ocr_text:
        return {"pill": 0, "ointment": 0, "eye_drop": 0}, {}

    scores = {
        "pill": 0,
        "ointment": 0,
        "eye_drop": 0
    }

    found_keywords = {
        "pill": [],
        "ointment": [],
        "eye_drop": []
    }

    for keyword in medication_keywords["pill_related"]:
        if keyword in ocr_text:
            scores["pill"] += 1
            found_keywords["pill"].append(keyword)

    for keyword in medication_keywords["ointment_related"]:
        if keyword in ocr_text:
            scores["ointment"] += 1
            found_keywords["ointment"].append(keyword)

    for keyword in medication_keywords["eye_drop_related"]:
        if keyword in ocr_text:
            scores["eye_drop"] += 1
            found_keywords["eye_drop"].append(keyword)
    return scores, found_keywords

#키워드 점수를 기반으로 CLIP 예측 조정
def adjust_clip_predictions(logits, keyword_scores, boost_factor=0.15):

    adjusted_logits = logits.clone()

    label_keyword_mapping = {
        0: "pill",      # 알약_블리스터팩
        1: "pill",      # 알약_블리스터팩_박스
        2: "eye_drop",  # 안약
        3: "ointment",  # 연고
        4: "pill",
    }

    adjustments = {}
    for label_idx, keyword_type in label_keyword_mapping.items():
        if keyword_scores[keyword_type] > 0:
            # 키워드 점수에 비례해서 logit 값 증가
            boost = boost_factor * keyword_scores[keyword_type]
            adjusted_logits[0][label_idx] += boost
            adjustments[labels[label_idx]] = boost
            print(f"라벨 {labels[label_idx]} 점수 증가: +{boost:.3f} (키워드: {keyword_type}, 개수: {keyword_scores[keyword_type]})")

    return adjusted_logits, adjustments

#OCR을 위한 이미지 전처리 개선
def preprocess_image_for_ocr(image):
    try:
        img_array = np.array(image)

        if len(img_array.shape) == 3:
            img_bgr = cv2.cvtColor(img_array, cv2.COLOR_RGB2BGR)
        else:
            img_bgr = img_array

        gray = cv2.cvtColor(img_bgr, cv2.COLOR_BGR2GRAY)
        denoised = cv2.medianBlur(gray, 3)
        clahe = cv2.createCLAHE(clipLimit=2.0, tileGridSize=(8,8))
        enhanced = clahe.apply(denoised)
        blurred = cv2.GaussianBlur(enhanced, (1, 1), 0)
        binary_inv = cv2.adaptiveThreshold(
            blurred, 255, cv2.ADAPTIVE_THRESH_GAUSSIAN_C, cv2.THRESH_BINARY_INV, 11, 2
        )
        kernel = np.ones((1, 1), np.uint8)
        processed = cv2.morphologyEx(binary_inv, cv2.MORPH_CLOSE, kernel)

        result_image = Image.fromarray(processed)

        return result_image

    except Exception as e:
        print(f"이미지 전처리 오류: {e}")
        return image

def extract_text_from_image_with_clova(image_path: str):
    secret_key = userdata.get("OCR_CLOVA")
    api_url = "https://s79a9gvq9a.apigw.ntruss.com/custom/v1/42492/9274a7c96357207aa7e436c070ceba3fecf0b7f91679a2a8f72283f165493853/general"

    file_size = os.path.getsize(image_path)
    if file_size > 5 * 1024 * 1024:  # 5MB
        print("⚠️ 파일 크기가 5MB를 초과합니다.")

    with open(image_path, "rb") as f:
        image_data = base64.b64encode(f.read()).decode()

    headers = {
        "X-OCR-SECRET": secret_key,
        "Content-Type": "application/json"
    }

    payload = {
        "version": "V2",
        "requestId": str(uuid.uuid4()),
        "timestamp": int(time.time() * 1000),
        "images": [{
            "name": "temp",
            "format": "jpg",
            "data": image_data
        }]
    }

    try:
        response = requests.post(api_url, headers=headers, data=json.dumps(payload), timeout=30)
        result = response.json()

        first_image = result["images"][0]
        fields = first_image["fields"]
        texts = []

        for i, field in enumerate(fields):
            if "inferText" in field and field["inferText"]:
                text = field["inferText"]
                texts.append(text)
                confidence = field.get("inferConfidence", "N/A")

        if not texts:
            return "", 0.0

        extracted_text = " ".join(texts)
        return extracted_text, confidence

    except Exception as e:
        print(f"❌ OCR API 호출 오류: {e}")
        return "", 0.0

def classify_and_dispose(image_path: str, model_clip, processor_clip):
    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
    image = Image.open(image_path).convert("RGB")

    processed_image = preprocess_image_for_ocr(image)
    temp_path = "temp_processed.png"
    processed_image.save(temp_path)

    # OCR로 텍스트 추출
    ocr_text, ocr_confidence = extract_text_from_image_with_clova(temp_path)
    if ocr_text is None:
        ocr_text = ""

    # 키워드 분석
    keyword_scores, found_keywords = analyze_medication_keywords(ocr_text)

    # CLIP 모델로 분류
    inputs = processor_clip(text=text_descriptions, images=image, return_tensors="pt", padding=True, truncation=True)
    inputs = {k: v.to(device) for k, v in inputs.items()}

    with torch.no_grad():
        outputs = model_clip(**inputs)

    original_logits = outputs.logits_per_image
    adjusted_logits, logit_adjustments = adjust_clip_predictions(original_logits, keyword_scores)

    original_probs = original_logits.softmax(dim=1)
    adjusted_probs = adjusted_logits.softmax(dim=1)

    original_predicted_idx = torch.argmax(original_probs, dim=1).item()
    original_predicted_label = labels[original_predicted_idx]
    original_confidence = torch.max(original_probs).item()

    # 조정된 예측
    predicted_idx = torch.argmax(adjusted_probs, dim=1).item()
    predicted_label = labels[predicted_idx]
    confidence = torch.max(adjusted_probs).item()

    # OCR 기반 추가 보정
    final_label = predicted_label
    ocr_correction_applied = False

    if keyword_scores["ointment"] >= 1 and predicted_label != "연고":
        if confidence < 0.7:
            final_label = "연고"
            confidence = min(confidence + 0.15, 0.95)
            ocr_correction_applied = True

    if keyword_scores["eye_drop"] >= 1 and predicted_label != "안약":
        if confidence < 0.7:
            final_label = "안약"
            confidence = min(confidence + 0.15, 0.95)
            ocr_correction_applied = True

    context = disposal_map.get(final_label, "종류에 따라 적절한 방법으로 의약품 수거함에 버리세요.")

    return {
        "final_label": final_label,
        "disposal_context": context,
        "final_confidence": confidence,
        "ocr_text": ocr_text,
        "ocr_confidence" : ocr_confidence,
        "original_clip_label": original_predicted_label,
        "original_clip_confidence": original_confidence,
        "adjusted_clip_label": predicted_label,
        "adjusted_clip_confidence": torch.max(adjusted_probs).item(),
        "keyword_scores": keyword_scores,
        "found_keywords": found_keywords,
        "logit_adjustments": logit_adjustments,
        "ocr_correction_applied": ocr_correction_applied
    }

def process_test_dataset(test_data_dir, model, processor, label_info):
    """테스트 데이터셋 전체 처리"""
    print(f"\n🔍 테스트 데이터셋 처리 시작: {test_data_dir}")
    print("="*60)
    image_extensions = ['*.jpg', '*.jpeg', '*.png', '*.bmp']

    total_processed = 0
    total_correct = 0
    class_results = defaultdict(lambda: {'correct': 0, 'total': 0, 'predictions': []})

    # 각 클래스 폴더 처리
    for class_name in disposal_map.keys():
        class_dir = os.path.join(test_data_dir, class_name)

        if not os.path.exists(class_dir):
            print(f"⚠️  클래스 폴더가 없습니다: {class_name}")
            continue

        print(f"\n📂 처리 중: {class_name}")
        image_files = []
        for ext in image_extensions:
            image_files.extend(glob.glob(os.path.join(class_dir, ext)))

        if not image_files:
            print(f"   ℹ️  이미지 파일이 없습니다.")
            continue

        print(f"   📊 발견된 이미지: {len(image_files)}개")

        # 각 이미지 처리
        for i, image_path in enumerate(image_files, 1):
            try:
                result = classify_and_dispose(image_path, model, processor)

                if result:
                    predicted_label = result['final_label']
                    confidence = result['final_confidence']

                    # 결과 기록
                    class_results[class_name]['total'] += 1
                    class_results[class_name]['predictions'].append({
                        'file': os.path.basename(image_path),
                        'predicted': predicted_label,
                        'confidence': confidence,
                        'correct': predicted_label == class_name
                    })

                    total_processed += 1

                    if predicted_label == class_name:
                        class_results[class_name]['correct'] += 1
                        total_correct += 1
                        print(f"      ✅ 정답: {predicted_label} (신뢰도: {confidence:.1%})")
                    else:
                        print(f"      ❌ 오답: {predicted_label} (신뢰도: {confidence:.1%}) - 정답: {class_name}")
                else:
                    print(f"      ⚠️  처리 실패")

            except Exception as e:
                print(f"      ❌ 오류: {e}")
                continue

    return class_results, total_processed, total_correct

def print_detailed_results(class_results, total_processed, total_correct):
    print("\n" + "="*60)
    print("📊 최종 평가 결과")
    print("="*60)

    overall_accuracy = (total_correct / total_processed * 100) if total_processed > 0 else 0
    print(f"\n🎯 전체 성능")
    print(f"   • 전체 정확도: {total_correct}/{total_processed} = {overall_accuracy:.2f}%")
    print(f"   • 처리된 총 이미지: {total_processed}개")

    print(f"\n📋 클래스별 상세 결과")
    print("-" * 60)
    print(f"{'클래스명':<20} {'정확도':<15} {'정답/전체':<12} {'성능'}")
    print("-" * 60)

    for class_name, results in class_results.items():
        if results['total'] > 0:
            accuracy = results['correct'] / results['total'] * 100
            ratio = f"{results['correct']}/{results['total']}"

            if accuracy >= 90:
                performance = "🟢 우수"
            elif accuracy >= 70:
                performance = "🟡 보통"
            else:
                performance = "🔴 개선필요"

            print(f"{class_name:<20} {accuracy:<15.2f} {ratio:<12} {performance}")

    print(f"\n🔍 오분류 상세 분석")
    print("-" * 60)

    for class_name, results in class_results.items():
        wrong_predictions = [p for p in results['predictions'] if not p['correct']]

        if wrong_predictions:
            print(f"\n❌ {class_name} 오분류 케이스:")
            for pred in wrong_predictions:
                print(f"   • {pred['file']} → {pred['predicted']} (신뢰도: {pred['confidence']:.1%})")

    # 성능 개선 제안
    print(f"\n💡 성능 개선 제안")
    print("-" * 60)

    low_performance_classes = []
    for class_name, results in class_results.items():
        if results['total'] > 0:
            accuracy = results['correct'] / results['total'] * 100
            if accuracy < 70:
                low_performance_classes.append((class_name, accuracy, results['total']))

    if low_performance_classes:
        print("📉 성능이 낮은 클래스:")
        for class_name, accuracy, total in low_performance_classes:
            print(f"   • {class_name}: {accuracy:.1f}% (샘플 수: {total}개)")

    else:
        print("✅ 모든 클래스가 양호한 성능을 보입니다!")

def main():
    print("🏥 의약품 분류 및 분리배출 도우미 (콘솔 버전)")
    print("="*60)

    model, processor, label_info = load_model()

    if model is None:
        print("❌ 모델 로드 실패. 프로그램을 종료합니다.")
        return

    test_data_dir = "/content/drive/MyDrive/trash_ai/final_test"
    class_results, total_processed, total_correct = process_test_dataset(
        test_data_dir, model, processor, label_info
    )

    print_detailed_results(class_results, total_processed, total_correct)

if __name__ == "__main__":
    main()

🏥 의약품 분류 및 분리배출 도우미 (콘솔 버전)
🔄 모델 로딩 중...
✅ 모델 로딩 완료!
📋 로드된 클래스: ['알약_블리스터팩', '알약_블리스터팩_박스', '안약', '연고', '유리병']

🔍 테스트 데이터셋 처리 시작: /content/drive/MyDrive/trash_ai/final_test

📂 처리 중: 알약_블리스터팩
   📊 발견된 이미지: 6개
      ✅ 정답: 알약_블리스터팩 (신뢰도: 91.8%)
      ✅ 정답: 알약_블리스터팩 (신뢰도: 53.7%)
      ✅ 정답: 알약_블리스터팩 (신뢰도: 98.2%)
      ✅ 정답: 알약_블리스터팩 (신뢰도: 81.5%)
      ✅ 정답: 알약_블리스터팩 (신뢰도: 95.5%)
      ✅ 정답: 알약_블리스터팩 (신뢰도: 94.8%)

📂 처리 중: 알약_블리스터팩_박스
   📊 발견된 이미지: 7개
      ✅ 정답: 알약_블리스터팩_박스 (신뢰도: 99.4%)
      ✅ 정답: 알약_블리스터팩_박스 (신뢰도: 97.8%)
      ✅ 정답: 알약_블리스터팩_박스 (신뢰도: 99.3%)
      ✅ 정답: 알약_블리스터팩_박스 (신뢰도: 84.1%)
      ✅ 정답: 알약_블리스터팩_박스 (신뢰도: 99.1%)
라벨 알약_블리스터팩 점수 증가: +0.150 (키워드: pill, 개수: 1)
라벨 알약_블리스터팩_박스 점수 증가: +0.150 (키워드: pill, 개수: 1)
라벨 유리병 점수 증가: +0.150 (키워드: pill, 개수: 1)
      ✅ 정답: 알약_블리스터팩_박스 (신뢰도: 99.5%)
라벨 알약_블리스터팩 점수 증가: +0.300 (키워드: pill, 개수: 2)
라벨 알약_블리스터팩_박스 점수 증가: +0.300 (키워드: pill, 개수: 2)
라벨 연고 점수 증가: +0.150 (키워드: ointment, 개수: 1)
라벨 유리병 점수 증가: +0.300 (키워드: pill, 개수: 2)
      ✅ 정답: 알약_블리스터

### streamlit_code

In [2]:
streamlit_code = '''
import json
import streamlit as st
import torch
from PIL import Image
from transformers import CLIPProcessor, CLIPModel
import os
from google.colab import userdata
import cv2
import numpy as np
import requests
import base64
import uuid
import time
from torch.utils.data import Dataset, DataLoader
from torchvision import transforms
import torch.optim as optim
import torch.nn.functional as F

st.set_page_config(
    page_title="의약품 분리배출 AI",
    page_icon="🏥",
    layout="wide"
)

disposal_map = {
    "알약_블리스터팩": "겉 포장지(종이 또는 플라스틱 필름)를 제거하고, 플라스틱 블리스터팩은 그대로 의약품 수거함에 버리세요.",
    "알약_블리스터팩_박스": "종이 박스는 분리하여 종이 재활용함에 버리고, 겉 포장지(종이 또는 플라스틱 필름)를 제거한 후 플라스틱 블리스터팩은 의약품 수거함에 버리세요.",
    "유리병": "약은 폐의약품 수거함에 버리고, 유리병은 내용물을 완전히 비운 후 깨끗이 헹궈 병류로 분리배출하세요.",
    "안약": "용기 그대로 의약품 수거함에 버리세요. 남은 약액은 그대로 두셔도 됩니다.",
    "연고": "튜브를 완전히 비우고 의약품 수거함에 버리세요."
}

text_descriptions = [
    "pill blister pack with plastic packaging",
    "pill blister pack in cardboard box",
    "glass bottle for medicine",
    "small eye drop bottle",
    "ointment tube for skin"
]

labels = ["알약_블리스터팩", "알약_블리스터팩_박스", "유리병", "안약", "연고"]

folder_to_label = {
    "알약_블리스터팩": 0,
    "알약_블리스터팩_박스": 1,
    "유리병": 2,
    "안약": 3,
    "연고": 4
}

medication_keywords = {
    "pill_related": [
        "캡슐", "mg", "정", "캡", "알약", "환", "tablet", "capsule", "cap", "pill",
        "Tablet", "Capsule", "Cap", "Pill", "TABLET", "CAPSULE", "CAP", "PILL",
        "MG", "mg", "Mg", "mcg", "µg", "IU", "unit", "Units", "UNITS"
    ],
    "ointment_related": [
        "연고", "크림", "젤", "로션", "밤", "연고제", "%",
        "ointment", "cream", "gel", "lotion", "balm",
        "Ointment", "Cream", "Gel", "Lotion", "Balm",
        "OINTMENT", "CREAM", "GEL", "LOTION", "BALM"
    ],
    "eye_drop_related": [
        "안약", "점안액", "점안제", "eye", "drop", "drops",
        "Eye", "Drop", "Drops", "EYE", "DROP", "DROPS",
        "ophthalmic", "Ophthalmic", "OPHTHALMIC"
    ]
}

def analyze_medication_keywords(ocr_text):
    if not ocr_text:
        return {"pill": 0, "ointment": 0, "eye_drop": 0}, {}

    scores = {
        "pill": 0,
        "ointment": 0,
        "eye_drop": 0
    }

    found_keywords = {
        "pill": [],
        "ointment": [],
        "eye_drop": []
    }

    for keyword in medication_keywords["pill_related"]:
        if keyword in ocr_text:
            scores["pill"] += 1
            found_keywords["pill"].append(keyword)

    for keyword in medication_keywords["ointment_related"]:
        if keyword in ocr_text:
            scores["ointment"] += 1
            found_keywords["ointment"].append(keyword)

    for keyword in medication_keywords["eye_drop_related"]:
        if keyword in ocr_text:
            scores["eye_drop"] += 1
            found_keywords["eye_drop"].append(keyword)
    return scores, found_keywords

def adjust_clip_predictions(logits, keyword_scores, boost_factor=0.15):

    adjusted_logits = logits.clone()

    label_keyword_mapping = {
        0: "pill",      # 알약_블리스터팩
        1: "pill",      # 알약_블리스터팩_박스
        2: "pill",      # 유리병
        3: "eye_drop",  # 안약
        4: "ointment"   # 연고
    }

    adjustments = {}
    for label_idx, keyword_type in label_keyword_mapping.items():
        if keyword_scores[keyword_type] > 0:
            boost = boost_factor * keyword_scores[keyword_type]
            adjusted_logits[0][label_idx] += boost
            adjustments[labels[label_idx]] = boost
            print(f"라벨 {labels[label_idx]} 점수 증가: +{boost:.3f} (키워드: {keyword_type}, 개수: {keyword_scores[keyword_type]})")

    return adjusted_logits, adjustments

def preprocess_image_for_ocr(image):
    try:
        img_array = np.array(image)

        if len(img_array.shape) == 3:
            img_bgr = cv2.cvtColor(img_array, cv2.COLOR_RGB2BGR)
        else:
            img_bgr = img_array

        gray = cv2.cvtColor(img_bgr, cv2.COLOR_BGR2GRAY)
        denoised = cv2.medianBlur(gray, 3)
        clahe = cv2.createCLAHE(clipLimit=2.0, tileGridSize=(8,8))
        enhanced = clahe.apply(denoised)
        blurred = cv2.GaussianBlur(enhanced, (1, 1), 0)
        binary_inv = cv2.adaptiveThreshold(
            blurred, 255, cv2.ADAPTIVE_THRESH_GAUSSIAN_C, cv2.THRESH_BINARY_INV, 11, 2
        )
        kernel = np.ones((1, 1), np.uint8)
        processed = cv2.morphologyEx(binary_inv, cv2.MORPH_CLOSE, kernel)

        result_image = Image.fromarray(processed)

        return result_image

    except Exception as e:
        print(f"이미지 전처리 오류: {e}")
        return image

def extract_text_from_image_with_clova(image_path: str):
    secret_key = userdata.get("OCR_CLOVA")
    api_url = "https://s79a9gvq9a.apigw.ntruss.com/custom/v1/42492/9274a7c96357207aa7e436c070ceba3fecf0b7f91679a2a8f72283f165493853/general"

    file_size = os.path.getsize(image_path)
    if file_size > 5 * 1024 * 1024:  # 5MB
        print("⚠️ 파일 크기가 5MB를 초과합니다.")

    with open(image_path, "rb") as f:
        image_data = base64.b64encode(f.read()).decode()

    headers = {
        "X-OCR-SECRET": secret_key,
        "Content-Type": "application/json"
    }

    payload = {
        "version": "V2",
        "requestId": str(uuid.uuid4()),
        "timestamp": int(time.time() * 1000),
        "images": [{
            "name": "temp",
            "format": "jpg",
            "data": image_data
        }]
    }

    try:
        response = requests.post(api_url, headers=headers, data=json.dumps(payload), timeout=30)
        result = response.json()

        first_image = result["images"][0]
        fields = first_image["fields"]
        texts = []

        for i, field in enumerate(fields):
            if "inferText" in field and field["inferText"]:
                text = field["inferText"]
                texts.append(text)
                confidence = field.get("inferConfidence", "N/A")
                print(f"텍스트 {i+1}: '{text}' (신뢰도: {confidence})")

        if not texts:
            return "", 0.0

        extracted_text = " ".join(texts)
        return extracted_text, confidence

    except Exception as e:
        print(f"❌ OCR API 호출 오류: {e}")
        return "", 0.0

def classify_and_dispose(image_path: str, model_clip, processor_clip):
    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
    image = Image.open(image_path).convert("RGB")

    processed_image = preprocess_image_for_ocr(image)
    temp_path = "temp_processed.png"
    processed_image.save(temp_path)

    ocr_text, ocr_confidence = extract_text_from_image_with_clova(temp_path)
    if ocr_text is None:
        ocr_text = ""

    keyword_scores, found_keywords = analyze_medication_keywords(ocr_text)

    inputs = processor_clip(text=text_descriptions, images=image, return_tensors="pt", padding=True, truncation=True)
    inputs = {k: v.to(device) for k, v in inputs.items()}

    with torch.no_grad():
        outputs = model_clip(**inputs)

    original_logits = outputs.logits_per_image
    adjusted_logits, logit_adjustments = adjust_clip_predictions(original_logits, keyword_scores)

    original_probs = original_logits.softmax(dim=1)
    adjusted_probs = adjusted_logits.softmax(dim=1)

    original_predicted_idx = torch.argmax(original_probs, dim=1).item()
    original_predicted_label = labels[original_predicted_idx]
    original_confidence = torch.max(original_probs).item()

    predicted_idx = torch.argmax(adjusted_probs, dim=1).item()
    predicted_label = labels[predicted_idx]
    confidence = torch.max(adjusted_probs).item()

    final_label = predicted_label
    ocr_correction_applied = False

    if keyword_scores["ointment"] >= 1 and predicted_label != "연고":
        if confidence < 0.7:
            final_label = "연고"
            confidence = min(confidence + 0.15, 0.95)
            ocr_correction_applied = True

    if keyword_scores["eye_drop"] >= 1 and predicted_label != "안약":
        if confidence < 0.7:
            final_label = "안약"
            confidence = min(confidence + 0.15, 0.95)
            ocr_correction_applied = True

    context = disposal_map.get(final_label, "종류에 따라 적절한 방법으로 의약품 수거함에 버리세요.")

    return {
        "final_label": final_label,
        "disposal_context": context,
        "final_confidence": confidence,
        "ocr_text": ocr_text,
        "ocr_confidence" : ocr_confidence,
        "original_clip_label": original_predicted_label,
        "original_clip_confidence": original_confidence,
        "adjusted_clip_label": predicted_label,
        "adjusted_clip_confidence": torch.max(adjusted_probs).item(),
        "keyword_scores": keyword_scores,
        "found_keywords": found_keywords,
        "logit_adjustments": logit_adjustments,
        "ocr_correction_applied": ocr_correction_applied
    }

@st.cache_resource
def load_model():
    try:
        model_path = "/content/drive/MyDrive/trash_ai/clip_medication_test/best"
        if os.path.exists(model_path):
            device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
            model = CLIPModel.from_pretrained(model_path).to(device)
            processor = CLIPProcessor.from_pretrained(model_path)
            return model, processor
        else:
            st.error(f"❌ 모델 경로를 찾을 수 없습니다: {model_path}")
            return None, None
    except Exception as e:
        st.error(f"❌ 모델 로딩 오류: {str(e)}")
        return None, None

def main():
    st.title("🏥 의약품 분리배출 AI")
    st.markdown("---")
    st.write("의약품 이미지를 업로드하면 종류를 분류하고 올바른 분리배출 방법을 알려드립니다.")

    with st.sidebar:
        st.header("📋 분류 가능한 의약품")
        st.write("• 💊 알약 블리스터팩")
        st.write("• 📦 알약 블리스터팩 (박스 포함)")
        st.write("• 🍼 유리병")
        st.write("• 👁️ 안약")
        st.write("• 🧴 연고")

        st.header("📖 사용 방법")
        st.write("1. 의약품 사진 업로드")
        st.write("2. AI 자동 분석 대기")
        st.write("3. 분리배출 방법 확인")
        st.warning("⚠️ 텍스트가 선명하게 보이도록 촬영하면 더 정확한 분류가 가능합니다.")

        st.header("🔧 시스템 정보")
        device = "GPU" if torch.cuda.is_available() else "CPU"
        st.write(f"디바이스: {device}")

    with st.spinner("모델 로딩 중... 잠시만 기다려주세요."):
        model_clip, processor_clip = load_model()

    if model_clip is None or processor_clip is None:
        st.error("🚫 모델을 로드할 수 없습니다. 모델 경로와 파일을 확인해주세요.")
        st.info("💡 모델 학습이 완료되었는지 확인하세요.")
        return

    st.success("✅ 모델 로딩 완료!")

    uploaded_file = st.file_uploader(
        "📸 의약품 이미지를 업로드하세요",
        type=["jpg", "jpeg", "png"],
        help="JPG, JPEG, PNG 형식의 이미지를 업로드할 수 있습니다. 텍스트가 선명하게 보이는 이미지일수록 정확도가 높아집니다."
    )

    if uploaded_file is not None:
        try:
            col1, col2 = st.columns([1, 1])

            with col1:
                st.subheader("📷 업로드된 이미지")
                st.image(uploaded_file, caption="분석할 이미지", use_container_width=True)

            temp_path = f"temp_{uploaded_file.name}"
            with open(temp_path, "wb") as f:
                f.write(uploaded_file.getbuffer())

            if st.button("🔍 분석 시작", type="primary"):
                with col2:
                    st.subheader("📊 분석 결과")

                    # 프로그레스 바
                    progress_bar = st.progress(0)
                    status_text = st.empty()

                    status_text.text("이미지 전처리 중...")
                    progress_bar.progress(20)

                    status_text.text("OCR 텍스트 추출 중...")
                    progress_bar.progress(40)

                    status_text.text("키워드 분석 중...")
                    progress_bar.progress(50)

                    status_text.text("AI 모델 실행 중...")
                    progress_bar.progress(70)

                    result = classify_and_dispose(temp_path, model_clip, processor_clip)

                    progress_bar.progress(90)
                    status_text.text("결과 생성 중...")

                    progress_bar.progress(100)
                    status_text.text("분석 완료!")

                    st.success("✅ 분석 완료!")

                    st.markdown(f"""
                    <div style="padding: 1rem; border-radius: 0.5rem; background-color: #e8f4fd; margin: 1rem 0; border-left: 4px solid #1f77b4;">
                        <h3>🔍 최종 분류 결과: {result['final_label']}</h3>
                    </div>
                    """, unsafe_allow_html=True)

                    if result['final_confidence'] > 0.8:
                        st.success(f"📊 신뢰도: {result['final_confidence']:.1%} (높음)")
                    elif result['final_confidence'] > 0.6:
                        st.warning(f"📊 신뢰도: {result['final_confidence']:.1%} (보통)")
                    else:
                        st.error(f"📊 신뢰도: {result['final_confidence']:.1%} (낮음)")
                        st.warning("신뢰도가 낮습니다. 더 선명한 이미지를 사용해보세요.")

                    st.info(f"""♻️ **분리배출 방법**: {result['disposal_context']}""")

                    with st.expander("🔍 분석 과정 상세 정보", expanded=True):
                        st.markdown("### 1️⃣ CLIP 모델 원본 예측")
                        st.write(f"- 예측: **{result['original_clip_label']}**")
                        st.write(f"- 신뢰도: {result['original_clip_confidence']:.1%}")

                        st.markdown("### 2️⃣ OCR 키워드 분석")
                        if result['ocr_text'] and result['ocr_confidence']:
                            st.write("**추출된 텍스트:**")
                            st.code(result['ocr_text'])
                            st.write("**OCR 결과 신뢰도:**")
                            st.code(result['ocr_confidence'])

                            col_k1, col_k2 = st.columns(2)
                            with col_k1:
                                st.write("**키워드 점수:**")
                                for category, score in result['keyword_scores'].items():
                                    emoji = {"pill": "💊", "liquid": "🧴", "ointment": "🧴", "eye_drop": "👁️"}
                                    st.write(f"{emoji.get(category, '•')} {category}: {score}개")

                            with col_k2:
                                st.write("**발견된 키워드:**")
                                for category, keywords in result['found_keywords'].items():
                                    if keywords:
                                        st.write(f"**{category}:** {', '.join(keywords)}")
                        else:
                            st.warning("OCR로 텍스트를 추출하지 못했습니다.")

                        st.markdown("### 3️⃣ 키워드 기반 점수 조정")
                        if result['logit_adjustments']:
                            st.write("**점수 조정 내역:**")
                            for label, boost in result['logit_adjustments'].items():
                                st.write(f"- {label}: +{boost:.3f}")
                            st.write(f"- 조정 후 예측: **{result['adjusted_clip_label']}**")
                            st.write(f"- 조정 후 신뢰도: {result['adjusted_clip_confidence']:.1%}")
                        else:
                            st.write("키워드 기반 점수 조정이 적용되지 않았습니다.")

                        st.markdown("### 4️⃣ OCR 기반 최종 보정")
                        if result['ocr_correction_applied']:
                            st.success(f"✅ OCR 키워드 기반으로 '{result['final_label']}'로 최종 보정되었습니다.")
                        else:
                            st.info("OCR 기반 추가 보정이 적용되지 않았습니다.")

                        st.markdown("### 📋 변경 사항 요약")
                        changes = []
                        if result['original_clip_label'] != result['adjusted_clip_label']:
                            changes.append(f"CLIP 키워드 조정: {result['original_clip_label']} → {result['adjusted_clip_label']}")
                        if result['ocr_correction_applied']:
                            changes.append(f"OCR 기반 보정: {result['adjusted_clip_label']} → {result['final_label']}")

                        if changes:
                            for change in changes:
                                st.write(f"• {change}")
                        else:
                            st.write("• 원본 CLIP 예측이 그대로 유지되었습니다.")

            if os.path.exists(temp_path):
                os.remove(temp_path)

        except Exception as e:
            st.error(f"❌ 분석 중 오류가 발생했습니다: {str(e)}")
            st.write("오류 상세 정보:")
            st.code(str(e))

    st.markdown("---")
    st.markdown(
        "<div style='text-align: center; color: #666;'>"
        "<p>🌱 환경을 생각하는 AI 기반 의약품 분리배출 도우미 (키워드 분석 시각화 버전)</p>"
        "</div>",
        unsafe_allow_html=True
    )

if __name__ == "__main__":
    main()
'''

with open("/content/drive/MyDrive/trash_ai/medicine_app.py", "w", encoding="utf-8") as f:
    f.write(streamlit_code)

In [None]:
import threading
import subprocess
import time
from google.colab import output

def start_streamlit():
    subprocess.run([
        "streamlit", "run", "/content/drive/MyDrive/trash_ai/medicine_app.py",
        "--server.port", "8502",
        "--server.address", "0.0.0.0",
        "--server.headless", "true",
        "--server.fileWatcherType", "none",
        "--server.enableXsrfProtection", "false",
        "--browser.gatherUsageStats", "false",
        "--server.enableWebsocketCompression", "false",
        "--server.enableCORS", "false"
    ])

thread = threading.Thread(target=start_streamlit)
thread.daemon = True
thread.start()

print("⏳ 의약품 분리배출 AI 시작 중...")
print("브라우저에서 아래 URL로 접속하세요:")

# Colab의 권장 방법 사용 - iframe
try:
    output.serve_kernel_port_as_iframe(8502, height=800)
except:
    print("http://localhost:8502")
    print("또는 포트 포워딩 URL을 사용하세요.")

⏳ 의약품 분리배출 AI 시작 중...
브라우저에서 아래 URL로 접속하세요:


<IPython.core.display.Javascript object>

In [None]:
def kill_existing_streamlit():
    try:
        result = subprocess.run(['pgrep', '-f', 'streamlit'], capture_output=True, text=True)
        if result.stdout:
            pids = result.stdout.strip().split('\n')
            for pid in pids:
                if pid:
                    try:
                        os.kill(int(pid), signal.SIGTERM)
                        print(f"종료된 PID: {pid}")
                    except:
                        pass
        subprocess.run(['fuser', '-k', '8501/tcp'], capture_output=True)
        print("기존 프로세스 정리 완료")
    except Exception as e:
        print(f"프로세스 정리 중 오류: {e}")

kill_existing_streamlit()

기존 프로세스 정리 완료
