In [None]:
import os
import datetime
from azure.storage.blob import BlobServiceClient, generate_blob_sas, BlobSasPermissions
from openai import AzureOpenAI
from dotenv import load_dotenv
import json
from pathlib import Path
import pandas as pd
import numpy as np
from sklearn.metrics import confusion_matrix, classification_report
import matplotlib.pyplot as plt
import seaborn as sns
from pydantic import BaseModel, Field
from typing import Literal
from concurrent.futures import ThreadPoolExecutor, as_completed
import threading
from functools import partial
import time
from PIL import Image, ImageDraw, ImageFont
import requests
from io import BytesIO

In [None]:
# Load environment variables
load_dotenv(override=True)

# Azure Storage Blob configuration
BLOB_CONNECTION_STRING = os.getenv("BLOB_CONNECTION_STRING")
BLOB_CONTAINER_NAME = os.getenv("BLOB_CONTAINER_NAME")

# Azure OpenAI configuration
AZURE_OPENAI_ENDPOINT = os.getenv("AZURE_OPENAI_ENDPOINT")
AZURE_OPENAI_API_KEY = os.getenv("AZURE_OPENAI_API_KEY")
AOAI_API_VERSION = '2025-03-01-preview'

In [None]:
def get_blob_service_client():
    """Returns a BlobServiceClient instance."""
    return BlobServiceClient.from_connection_string(BLOB_CONNECTION_STRING)

def upload_to_blob(file_path, blob_name):
    """
    Uploads a file to Azure Blob Storage and returns its URL with SAS token.
    
    Args:
        file_path: Path to the local file
        blob_name: Name to use in blob storage
        
    Returns:
        tuple: (blob_url, sas_token)
    """
    blob_service_client = get_blob_service_client()
    blob_client = blob_service_client.get_blob_client(container=BLOB_CONTAINER_NAME, blob=blob_name)
    
    with open(file_path, "rb") as data:
        blob_client.upload_blob(data, overwrite=True)
    
    start_time = datetime.datetime.now(datetime.timezone.utc)
    expiry_time = start_time + datetime.timedelta(days=1)

    # Generate SAS token
    sas_token = generate_blob_sas(
        account_name=blob_client.account_name,
        container_name=blob_client.container_name,
        blob_name=blob_client.blob_name,
        account_key=blob_service_client.credential.account_key,
        permission=BlobSasPermissions(read=True),
        expiry=expiry_time,
        start=start_time
    )
    
    blob_url = blob_client.url
    return blob_url, sas_token

def get_openai_client():
    """Returns an AzureOpenAI client instance for GPT-4o model."""
    client = AzureOpenAI(
        azure_endpoint=AZURE_OPENAI_ENDPOINT,
        api_key=AZURE_OPENAI_API_KEY,
        api_version=AOAI_API_VERSION
    )
    return client

# 段階的分類用のPydanticモデル
class StepOneResult(BaseModel):
    """Step 1: OK/NG二値分類の結果"""
    classification: Literal["OK", "NG"]
    reasoning: str
    confidence: int = Field(
        ...,
        ge=1,
        le=3,
        description="分類の確信度（1〜3の整数：3=確実、2=一般的根拠あり、1=根拠なし/判断不能）"
    )

class StepTwoResult(BaseModel):
    """Step 2: 汚れ/加工不良二値分類の結果"""
    classification: Literal["汚れ", "加工不良"]
    reasoning: str
    confidence: int = Field(
        ...,
        ge=1,
        le=3,
        description="分類の確信度（1〜3の整数：3=確実、2=一般的根拠あり、1=根拠なし/判断不能）"
    )

class StepThreeResult(BaseModel):
    """Step 3: 欠け/削り節二値分類の結果"""
    classification: Literal["欠け", "削り節"]
    reasoning: str
    confidence: int = Field(
        ...,
        ge=1,
        le=3,
        description="分類の確信度（1〜3の整数：3=確実、2=一般的根拠あり、1=根拠なし/判断不能）"
    )

# 最終分類結果
class ClassificationResult(BaseModel):
    """最終的な分類結果"""
    final_classification: Literal["OK", "汚れ", "欠け", "削り節"]
    step_one: StepOneResult
    step_two: StepTwoResult = None
    step_three: StepThreeResult = None
    overall_confidence: float = Field(
        ...,
        ge=0.0,
        le=3.0,
        description="全体的な確信度（1〜3の範囲）"
    )

In [None]:
def create_collage_from_urls(image_urls, labels, title="Comparison", image_size=(300, 300)):
    """
    Create a collage image from two image URLs with labels
    
    Args:
        image_urls: List of image URLs (expects 2 URLs)
        labels: List of labels for each image (expects 2 labels)
        title: Title for the collage
        image_size: Size to resize each image to (width, height)
    
    Returns:
        PIL Image object containing the collage
    """
    if len(image_urls) != 2 or len(labels) != 2:
        raise ValueError("Expected exactly 2 image URLs and 2 labels")
    
    # Download and process images
    images = []
    for url in image_urls:
        response = requests.get(url)
        img = Image.open(BytesIO(response.content))
        img = img.convert('RGB')
        img = img.resize(image_size, Image.Resampling.LANCZOS)
        images.append(img)
    
    # Create collage canvas
    canvas_width = image_size[0] * 2 + 60  # Extra space for labels and margins
    canvas_height = image_size[1] + 100    # Extra space for title and labels
    collage = Image.new('RGB', (canvas_width, canvas_height), 'white')
    
    # Try to use a better font, fall back to default if not available
    try:
        font_title = ImageFont.truetype("arial.ttf", 24)
        font_label = ImageFont.truetype("arial.ttf", 18)
    except:
        try:
            font_title = ImageFont.truetype("DejaVuSans.ttf", 24)
            font_label = ImageFont.truetype("DejaVuSans.ttf", 18)
        except:
            font_title = ImageFont.load_default()
            font_label = ImageFont.load_default()
    
    draw = ImageDraw.Draw(collage)
    
    # Add title
    title_bbox = draw.textbbox((0, 0), title, font=font_title)
    title_width = title_bbox[2] - title_bbox[0]
    title_x = (canvas_width - title_width) // 2
    draw.text((title_x, 10), title, fill='black', font=font_title)
    
    # Add images and labels
    y_offset = 50
    x_positions = [20, image_size[0] + 40]
    
    for i, (img, label) in enumerate(zip(images, labels)):
        # Paste image
        collage.paste(img, (x_positions[i], y_offset))
        
        # Add label below image
        label_bbox = draw.textbbox((0, 0), label, font=font_label)
        label_width = label_bbox[2] - label_bbox[0]
        label_x = x_positions[i] + (image_size[0] - label_width) // 2
        label_y = y_offset + image_size[1] + 10
        draw.text((label_x, label_y), label, fill='black', font=font_label)
    
    return collage

def upload_pil_image_to_blob(pil_image, blob_name, format='PNG'):
    """
    Upload a PIL image to Azure Blob Storage
    
    Args:
        pil_image: PIL Image object
        blob_name: Name for the blob
        format: Image format (default: PNG)
    
    Returns:
        tuple: (blob_url, sas_token)
    """
    # Convert PIL image to bytes
    img_buffer = BytesIO()
    pil_image.save(img_buffer, format=format)
    img_buffer.seek(0)
    
    blob_service_client = get_blob_service_client()
    blob_client = blob_service_client.get_blob_client(
        container=BLOB_CONTAINER_NAME, 
        blob=blob_name
    )
    
    # Upload the image
    blob_client.upload_blob(img_buffer.getvalue(), overwrite=True)
    
    # Generate SAS token
    sas_token = generate_blob_sas(
        account_name=blob_service_client.account_name,
        container_name=BLOB_CONTAINER_NAME,
        blob_name=blob_name,
        account_key=blob_service_client.credential.account_key,
        permission=BlobSasPermissions(read=True),
        expiry=datetime.datetime.utcnow() + datetime.timedelta(hours=24)
    )
    
    blob_url = blob_client.url
    return blob_url, sas_token

In [None]:
def upload_images_from_folder(folder_path):
    """
    Upload all images from a folder structure to Blob Storage.
    
    Args:
        folder_path: Path to the input folder
        
    Returns:
        dict: Dictionary with image paths as keys and (blob_url, sas_token) as values
    """
    uploaded_images = {}
    folder_path = Path(folder_path)
    
    for image_file in folder_path.rglob("*.png"):
        # Create blob name preserving folder structure
        relative_path = image_file.relative_to(folder_path)
        blob_name = str(relative_path).replace("\\", "/")
        
        try:
            blob_url, sas_token = upload_to_blob(str(image_file), blob_name)
            full_url = f"{blob_url}?{sas_token}"
            uploaded_images[str(image_file)] = full_url
            print(f"Uploaded: {blob_name}")
        except Exception as e:
            print(f"Error uploading {image_file}: {e}")
    
    return uploaded_images

def load_sample_images(uploaded_images, sample_folder="sample"):
    """
    Load sample images for few-shot learning.
    
    Args:
        uploaded_images: Dictionary of uploaded images
        sample_folder: Folder name to look for (default: "sample")
    
    Returns:
        dict: Dictionary with labels as keys and list of image URLs as values
    """
    samples = {"OK": [], "汚れ": [], "欠け": [], "削り節": []}
    
    for image_path, url in uploaded_images.items():
        # Normalize path separators and check if it contains sample folder
        normalized_path = image_path.replace("\\", "/")
        
        if f"/{sample_folder}/" in normalized_path or normalized_path.endswith(f"/{sample_folder}"):
            # Extract label from path - look for any of our target labels
            for label in samples.keys():
                if f"/{label}/" in normalized_path or f"\\{label}\\" in image_path:
                    samples[label].append(url)
                    print(f"Added {label} sample: {Path(image_path).name}")
                    break
    
    return samples

def create_step_one_messages_with_collage(sample_images):
    """
    Step 1: OK/NG二値分類用のメッセージをコラージュで作成
    """
    messages = [
        {
            "role": "system",
            "content": """あなたは製造業の品質検査の専門家です。

以下のFew-shotサンプルでは、左側にOK（良品）の例、右側にNG（不良品）の例を示し、それぞれの特徴の説明を提供します。
これらの見分け方の例に基づいて、実際の画像データを1つのラベル（OKまたはNG）に分類してください。

─── 分類定義 ───
• OK（良品）: 製品の外観に異常がなく、形状・表面状態ともに規定の仕様範囲内
• NG（不良品）: 製品に何らかの異常があり、品質基準を満たさない状態

─── 確信度定義（3段階）───
• 3：確実、はっきり判断可能
• 2：一般的な根拠あり
• 1：根拠なし、判断不能、そう見えなくもない

必ず「OK」または「NG」のどちらかに分類し、その理由と確信度を提供してください。
            """
        }
    ]
    
    # OK vs NG サンプルコラージュを作成
    ok_reasoning = [
        "画像内の部品を詳細に観察した結果、製品の表面は滑らかで均一な仕上がりを示しており、欠け・割れ・ヒビは一切確認できません。エッジ部分も設計通りの形状を保持し、異物付着や汚れも見当たりません。全体的に規定の仕様範囲内の良好な状態です。",
        "画像内の部品を詳細に観察した結果、製品の表面は滑らかで均一な仕上がりを示していますが、部品奥方向のエッジで切削くずの付着があるようにも見えますが非常に極小なため規定の仕様範囲内の良好な状態と判断できます。",
        "画像内の部品を詳細に観察した結果、製品の表面は滑らかで均一な仕上がりを示していますが、部品奥方向と上部のエッジで切削くずの付着があるようにも見えますが非常に極小なため規定の仕様範囲内の良好な状態と判断できます。",
    ]
    
    ng_reasoning = [
        "部品表面に基材とは異なる濃いグレーの色調の斜めの線がはっきりと確認でき、品質基準を満たさない不良品です。",
        "部品上部に明確な欠落が確認でき、製品の品質基準を満たさない不良品です。", 
        "製品上部と奥方向のエッジ部分に大きな切削くずの付着が確認でき、品質基準を満たさない不良品です。"
    ]
    
    # Create collages for OK vs NG comparison
    ng_samples = []
    ng_labels = ["汚れ", "欠け", "削り節"]
    for label in ng_labels:
        if sample_images[label]:
            ng_samples.append(sample_images[label][0])
    
    for i in range(min(len(sample_images["OK"]), len(ng_samples), 3)):
        # Create collage with OK (left) vs NG (right)
        collage = create_collage_from_urls(
            [sample_images["OK"][i], ng_samples[i]], 
            ["OK（良品）", "NG（不良品）"], 
            f"品質検査比較例 {i+1}"
        )
        
        # Upload collage to blob storage
        collage_blob_name = f"collages/step1_comparison_{i+1}.png"
        blob_url, sas_token = upload_pil_image_to_blob(collage, collage_blob_name)
        collage_url = f"{blob_url}?{sas_token}"
        
        # Create explanation for the comparison
        comparison_explanation = f"""左側（OK）の特徴: {ok_reasoning[i]}

右側（NG）の特徴: {ng_reasoning[i]}

この比較例から、良品と不良品の違いを学習し、同様の基準で新しい画像を分類してください。"""
        
        messages.extend([
            {
                "role": "user",
                "content": [
                    {"type": "text", "text": "この比較画像を参考に、OK/NGの見分け方を教えてください。"},
                    {"type": "image_url", "image_url": {"url": collage_url, "detail": "high"}}
                ]
            },
            {
                "role": "assistant", 
                "content": comparison_explanation
            }
        ])
    
    return messages

# 元の関数も保持（後方互換性のため）
def create_step_one_messages(sample_images):
    """
    Step 1: OK/NG二値分類用のメッセージを作成（従来版）
    """
    return create_step_one_messages_with_collage(sample_images)

def create_step_two_messages_with_collage(sample_images):
    """
    Step 2: 汚れ/加工不良二値分類用のメッセージをコラージュで作成
    """
    messages = [
        {
            "role": "system",
            "content": """あなたは製造業の品質検査の専門家です。

以下のFew-shotサンプルでは、左側に汚れの例、右側に加工不良の例を示し、それぞれの特徴の説明を提供します。
これらの見分け方の例に基づいて、実際のNG部品画像データを1つのラベル（汚れまたは加工不良）に分類してください。

─── 分類定義 ───
• 汚れ: 表面に油脂・ほこり・粉じん・液体シミなどの異物が付着し、拭いても残るもの
• 加工不良: 製造工程で発生した物理的な損傷や加工ミス、切削くずの残留

─── 確信度定義（3段階）───
• 3：確実、はっきり判断可能
• 2：一般的な根拠あり  
• 1：根拠なし、判断不能、そう見えなくもない

必ず「汚れ」または「加工不良」のどちらかに分類し、その理由と確信度を提供してください。
            """
        }
    ]
    
    stain_reasoning = [
        "部品表面に基材とは異なる濃いグレーの色調の斜めの線がはっきりと確認でき、品質基準を満たさない不良品です。",
        "部品表面に基材とは異なる濃いグレーの色調の太めの線が部品上部から下部にかけて確認でき、品質基準を満たさない不良品です。",
        "部品中央付近に基材とは異なる色調の付着物が確認できます。品質基準を満たさない不良品と判断されます。"
    ]
    
    defect_reasoning = [
        "部品上部に明確な欠落が確認でき、製品の品質基準を満たさない不良品です。",
        "製品上部と奥方向のエッジ部分に大きな切削くずの付着が確認でき、品質基準を満たさない不良品です。",
        "部品奥方向のエッジ部分に非常に大きな切削くずの付着が確認でき、品質基準を満たさない不良品です。"
    ]
    
    # Create collages for 汚れ vs 加工不良 comparison
    defect_samples = []
    defect_labels = ["欠け", "削り節"]
    for label in defect_labels:
        if sample_images[label]:
            defect_samples.extend(sample_images[label][:2])
    
    for i in range(min(len(sample_images["汚れ"]), len(defect_samples), 3)):
        # Create collage with 汚れ (left) vs 加工不良 (right)
        collage = create_collage_from_urls(
            [sample_images["汚れ"][i], defect_samples[i]], 
            ["汚れ", "加工不良"], 
            f"汚れ vs 加工不良 比較例 {i+1}"
        )
        
        # Upload collage to blob storage
        collage_blob_name = f"collages/step2_comparison_{i+1}.png"
        blob_url, sas_token = upload_pil_image_to_blob(collage, collage_blob_name)
        collage_url = f"{blob_url}?{sas_token}"
        
        # Create explanation for the comparison
        comparison_explanation = f"""左側（汚れ）の特徴: {stain_reasoning[i]}

右側（加工不良）の特徴: {defect_reasoning[i]}

この比較例から、汚れと加工不良の違いを学習し、同様の基準で新しいNG部品画像を分類してください。"""
        
        messages.extend([
            {
                "role": "user",
                "content": [
                    {"type": "text", "text": "この比較画像を参考に、汚れ/加工不良の見分け方を教えてください。"},
                    {"type": "image_url", "image_url": {"url": collage_url, "detail": "high"}}
                ]
            },
            {
                "role": "assistant", 
                "content": comparison_explanation
            }
        ])
    
    return messages

def create_step_three_messages_with_collage(sample_images):
    """
    Step 3: 欠け/削り節二値分類用のメッセージをコラージュで作成
    """
    messages = [
        {
            "role": "system",
            "content": """あなたは製造業の品質検査の専門家です。

以下のFew-shotサンプルでは、左側に欠けの例、右側に削り節の例を示し、それぞれの特徴の説明を提供します。
これらの見分け方の例に基づいて、実際の加工不良部品画像データを1つのラベル（欠けまたは削り節）に分類してください。

─── 分類定義 ───
• 欠け: 表面またはエッジの一部が物理的に欠落し、凹状の傷・穴があるもの
• 削り節: 製造工程で発生した切削くずが製品上部やエッジに残留・飛び出しているもの

─── 確信度定義（3段階）───
• 3：確実、はっきり判断可能
• 2：一般的な根拠あり
• 1：根拠なし、判断不能、そう見えなくもない

必ず「欠け」または「削り節」のどちらかに分類し、その理由と確信度を提供してください。
            """
        }
    ]
    
    crack_reasoning = [
        "部品上部に明確な欠落が確認でき、製品の品質基準を満たさない不良品です。",
        "部品上部に明確な欠落が複数個所確認でき、製品の品質基準を満たさない不良品です。欠けた部分が飛び出して削り節のように見える部分もありますが、欠けが確認できるため欠けと判断されます。",
        "部品上部中央付近に明確な欠落が確認でき、製品の品質基準を満たさない不良品です。"
    ]
    
    chips_reasoning = [
        "製品上部と奥方向のエッジ部分に大きな切削くずの付着が確認でき、品質基準を満たさない不良品です。",
        "部品奥方向のエッジ部分に非常に大きな切削くずの付着が確認でき、品質基準を満たさない不良品です。",
        "部品中央付近に切削くずの付着が確認でき、品質基準を満たさない不良品です。"
    ]
    
    # Create collages for 欠け vs 削り節 comparison  
    for i in range(min(len(sample_images["欠け"]), len(sample_images["削り節"]), 3)):
        # Create collage with 欠け (left) vs 削り節 (right)
        collage = create_collage_from_urls(
            [sample_images["欠け"][i], sample_images["削り節"][i]], 
            ["欠け", "削り節"], 
            f"欠け vs 削り節 比較例 {i+1}"
        )
        
        # Upload collage to blob storage
        collage_blob_name = f"collages/step3_comparison_{i+1}.png"
        blob_url, sas_token = upload_pil_image_to_blob(collage, collage_blob_name)
        collage_url = f"{blob_url}?{sas_token}"
        
        # Create explanation for the comparison
        comparison_explanation = f"""左側（欠け）の特徴: {crack_reasoning[i]}

右側（削り節）の特徴: {chips_reasoning[i]}

この比較例から、欠けと削り節の違いを学習し、同様の基準で新しい加工不良部品画像を分類してください。"""
        
        messages.extend([
            {
                "role": "user",
                "content": [
                    {"type": "text", "text": "この比較画像を参考に、欠け/削り節の見分け方を教えてください。"},
                    {"type": "image_url", "image_url": {"url": collage_url, "detail": "high"}}
                ]
            },
            {
                "role": "assistant", 
                "content": comparison_explanation
            }
        ])
    
    return messages

def classify_image(client, image_url, few_shot_messages):
    """
    Classify a single image using Azure OpenAI with Pydantic structured output.
    """
    messages = few_shot_messages.copy()
    messages.append({
        "role": "user",
        "content": [
            {"type": "text", "text": "上記の比較例を参考にして、この部品の画像を分類してください。Few-shotの見分け方の例に基づいて判定してください。"},
            {"type": "image_url", "image_url": {"url": image_url, "detail": "high"}}
        ]
    })
    
    try:
        completion = client.beta.chat.completions.parse(
            model="gpt-4.1",  # Replace with your actual model deployment name
            messages=messages,
            response_format=ClassificationResult,
            max_tokens=1000,
            temperature=0  # Lower temperature for more consistent results
        )
        
        result = completion.choices[0].message.parsed
        return {
            "classification": result.classification,
            "reasoning": result.reasoning,
            "confidence": result.confidence
        }
    except Exception as e:
        print(f"Error classifying image: {e}")
        return {
            "classification": "Error",
            "reasoning": f"Classification failed: {str(e)}",
            "confidence": 0
        }

In [None]:
# Update existing functions to use collage versions by default
def create_step_two_messages(sample_images):
    """
    Step 2: 汚れ/加工不良二値分類用のメッセージを作成（コラージュ版を使用）
    """
    return create_step_two_messages_with_collage(sample_images)

def create_step_three_messages(sample_images):
    """
    Step 3: 欠け/削り節二値分類用のメッセージを作成（コラージュ版を使用）
    """
    return create_step_three_messages_with_collage(sample_images)

In [None]:
def classify_image_binary_step(client, image_url, messages, result_class):
    """
    Execute a single step of binary classification.
    
    Args:
        client: Azure OpenAI client
        image_url: Image URL to classify
        messages: Few-shot messages for this step
        result_class: Pydantic model class for this step
    
    Returns:
        dict: Classification result
    """
    step_messages = messages.copy()
    step_messages.append({
        "role": "user",
        "content": [
            {"type": "text", "text": "上記の比較例を参考にして、この部品の画像を分類してください。Few-shotの見分け方の例に基づいて判定してください。"},
            {"type": "image_url", "image_url": {"url": image_url, "detail": "high"}}
        ]
    })
    
    try:
        completion = client.beta.chat.completions.parse(
            model="gpt-4.1",  # Replace with your actual model deployment name
            messages=step_messages,
            response_format=result_class,
            max_tokens=1000,
            temperature=0  # Lower temperature for more consistent results
        )
        
        result = completion.choices[0].message.parsed
        return {
            "classification": result.classification,
            "reasoning": result.reasoning,
            "confidence": result.confidence
        }
    
    except Exception as e:
        print(f"Error in classification step: {e}")
        return {"classification": "ERROR", "reasoning": f"Error: {e}", "confidence": 0}

def classify_image_hierarchical(client, image_url, step_messages):
    """
    Perform hierarchical binary classification: OK/NG → 汚れ/加工不良 → 欠け/削り節
    
    Args:
        client: Azure OpenAI client
        image_url: Image URL to classify
        step_messages: Dict containing messages for each step
    
    Returns:
        dict: Complete classification result
    """
    # Step 1: OK/NG classification
    step_one_result = classify_image_binary_step(
        client, image_url, step_messages["step_one"], StepOneResult
    )
    
    result = {
        "step_one": step_one_result,
        "step_two": None,
        "step_three": None,
        "final_classification": step_one_result["classification"],
        "overall_confidence": step_one_result["confidence"]
    }
    
    # If OK, stop here
    if step_one_result["classification"] == "OK":
        return result
    
    # Step 2: 汚れ/加工不良 classification (for NG items)
    step_two_result = classify_image_binary_step(
        client, image_url, step_messages["step_two"], StepTwoResult
    )
    result["step_two"] = step_two_result
    
    # If 汚れ, stop here
    if step_two_result["classification"] == "汚れ":
        result["final_classification"] = "汚れ"
        result["overall_confidence"] = (step_one_result["confidence"] + step_two_result["confidence"]) / 2
        return result
    
    # Step 3: 欠け/削り節 classification (for 加工不良 items)
    step_three_result = classify_image_binary_step(
        client, image_url, step_messages["step_three"], StepThreeResult
    )
    result["step_three"] = step_three_result
    result["final_classification"] = step_three_result["classification"]
    result["overall_confidence"] = (
        step_one_result["confidence"] + 
        step_two_result["confidence"] + 
        step_three_result["confidence"]
    ) / 3
    
    return result

def classify_image_hierarchical_wrapper(args):
    """
    Wrapper function for parallel hierarchical classification.
    """
    image_url, true_label, image_path, client, step_messages, index, total = args
    
    print(f"[{index+1}/{total}] Hierarchical classification: {Path(image_path).name}")
    
    try:
        result = classify_image_hierarchical(client, image_url, step_messages)
        
        return {
            'image_path': image_path,
            'image_url': image_url,
            'true_label': true_label,
            'predicted': result['final_classification'],
            'step_one_classification': result['step_one']['classification'],
            'step_one_confidence': result['step_one']['confidence'],
            'step_one_reasoning': result['step_one']['reasoning'],
            'step_two_classification': result['step_two']['classification'] if result['step_two'] else None,
            'step_two_confidence': result['step_two']['confidence'] if result['step_two'] else None,
            'step_two_reasoning': result['step_two']['reasoning'] if result['step_two'] else None,
            'step_three_classification': result['step_three']['classification'] if result['step_three'] else None,
            'step_three_confidence': result['step_three']['confidence'] if result['step_three'] else None,
            'step_three_reasoning': result['step_three']['reasoning'] if result['step_three'] else None,
            'overall_confidence': result['overall_confidence'],
            'status': 'success',
            'index': index
        }
    except Exception as e:
        print(f"Error in hierarchical classification {Path(image_path).name}: {e}")
        return {
            'image_path': image_path,
            'image_url': image_url,
            'true_label': true_label,
            'predicted': 'ERROR',
            'step_one_classification': 'ERROR',
            'step_one_confidence': 0,
            'step_one_reasoning': f'Classification failed: {str(e)}',
            'step_two_classification': None,
            'step_two_confidence': None,
            'step_two_reasoning': None,
            'step_three_classification': None,
            'step_three_confidence': None,
            'step_three_reasoning': None,
            'overall_confidence': 0,
            'status': 'error',
            'index': index
        }

def classify_images_hierarchical_parallel(test_data, client, step_messages, max_workers=3):
    """
    Classify images using hierarchical binary classification in parallel.
    """
    results = []
    total = len(test_data)
    
    # Prepare arguments for parallel processing
    args_list = [
        (image_url, true_label, image_path, client, step_messages, i, total)
        for i, (image_url, true_label, image_path) in enumerate(test_data)
    ]
    
    print(f"Starting hierarchical classification with {max_workers} workers...")
    start_time = time.time()
    
    with ThreadPoolExecutor(max_workers=max_workers) as executor:
        # Submit all tasks
        future_to_args = {executor.submit(classify_image_hierarchical_wrapper, args): args for args in args_list}
        
        # Collect results as they complete
        for future in as_completed(future_to_args):
            try:
                result = future.result()
                results.append(result)
                
                # Print progress
                completed = len(results)
                elapsed = time.time() - start_time
                avg_time = elapsed / completed
                eta = avg_time * (total - completed)
                
                if completed % 5 == 0 or completed == total:
                    print(f"Progress: {completed}/{total} ({completed/total*100:.1f}%) - "
                          f"ETA: {eta:.1f}s")
                          
            except Exception as e:
                print(f"Future failed: {e}")
    
    # Sort results by index to maintain order
    results.sort(key=lambda x: x['index'])
    
    total_time = time.time() - start_time
    print(f"\nHierarchical classification completed in {total_time:.1f}s")
    print(f"Average time per image: {total_time/total:.2f}s")
    
    return results

In [None]:
def load_test_images(uploaded_images, test_folder="test"):
    """
    Load test images with their true labels.
    
    Args:
        uploaded_images: Dictionary of uploaded images
        test_folder: Folder name to look for (default: "test")
    
    Returns:
        list: List of tuples (image_url, true_label, image_path)
    """
    test_data = []
    
    for image_path, url in uploaded_images.items():
        # Normalize path separators and check if it contains test folder
        normalized_path = image_path.replace("\\", "/")
        
        if f"/{test_folder}/" in normalized_path:
            # Extract true label from folder structure
            for label in ["OK", "汚れ", "欠け", "削り節"]:
                if f"/{label}/" in normalized_path or f"\\{label}\\" in image_path:
                    test_data.append((url, label, image_path))
                    break
    
    return test_data

def get_openai_client():
    """
    Initialize Azure OpenAI client.
    """
    return AzureOpenAI(
        azure_endpoint=AZURE_OPENAI_ENDPOINT,
        api_key=AZURE_OPENAI_API_KEY,
        api_version=AOAI_API_VERSION
    )

def evaluate_results(results_df):
    """
    Evaluate classification results and create confusion matrix.
    """
    # Calculate accuracy
    accuracy = (results_df['predicted'] == results_df['true_label']).mean()
    print(f"Accuracy: {accuracy:.3f}")
    
    # Classification report
    print("\nClassification Report:")
    print(classification_report(results_df['true_label'], results_df['predicted']))
    
    # Confusion matrix
    labels = ["OK", "汚れ", "欠け", "削り節"]
    cm = confusion_matrix(results_df['true_label'], results_df['predicted'], labels=labels)
    
    plt.figure(figsize=(10, 8))
    sns.heatmap(cm, annot=True, fmt='d', cmap='Blues', 
                xticklabels=labels, yticklabels=labels)
    plt.title('Confusion Matrix')
    plt.xlabel('Predicted')
    plt.ylabel('True')
    plt.show()
    
    return accuracy, cm

In [None]:
def classify_image_hierarchical(client, image_url, step_messages):
    """
    Execute hierarchical binary classification (Step 1 -> Step 2 -> Step 3).
    
    Args:
        client: Azure OpenAI client
        image_url: Image URL to classify
        step_messages: Dictionary containing messages for each step
    
    Returns:
        dict: Hierarchical classification result
    """
    # Step 1: OK/NG classification
    step_one_result = classify_image_binary_step(
        client, image_url, step_messages["step_one"], StepOneResult
    )
    
    result = {
        "step_one": step_one_result,
        "step_two": None,
        "step_three": None,
        "final_classification": step_one_result["classification"],
        "overall_confidence": step_one_result["confidence"]
    }
    
    # If OK, stop here
    if step_one_result["classification"] == "OK":
        return result
    
    # Step 2: 汚れ/加工不良 classification (for NG items)
    step_two_result = classify_image_binary_step(
        client, image_url, step_messages["step_two"], StepTwoResult
    )
    result["step_two"] = step_two_result
    
    # If 汚れ, stop here
    if step_two_result["classification"] == "汚れ":
        result["final_classification"] = "汚れ"
        result["overall_confidence"] = (step_one_result["confidence"] + step_two_result["confidence"]) / 2
        return result
    
    # Step 3: 欠け/削り節 classification (for 加工不良 items)
    step_three_result = classify_image_binary_step(
        client, image_url, step_messages["step_three"], StepThreeResult
    )
    result["step_three"] = step_three_result
    result["final_classification"] = step_three_result["classification"]
    result["overall_confidence"] = (
        step_one_result["confidence"] + 
        step_two_result["confidence"] + 
        step_three_result["confidence"]
    ) / 3
    
    return result

def classify_image_hierarchical_wrapper(args):
    """
    Wrapper function for parallel hierarchical classification.
    """
    image_url, true_label, image_path, client, step_messages, index, total = args
    
    print(f"[{index+1}/{total}] Hierarchical classification: {Path(image_path).name}")
    
    try:
        result = classify_image_hierarchical(client, image_url, step_messages)
        
        return {
            'image_path': image_path,
            'image_url': image_url,
            'true_label': true_label,
            'predicted': result['final_classification'],
            'step_one_classification': result['step_one']['classification'],
            'step_one_confidence': result['step_one']['confidence'],
            'step_one_reasoning': result['step_one']['reasoning'],
            'step_two_classification': result['step_two']['classification'] if result['step_two'] else None,
            'step_two_confidence': result['step_two']['confidence'] if result['step_two'] else None,
            'step_two_reasoning': result['step_two']['reasoning'] if result['step_two'] else None,
            'step_three_classification': result['step_three']['classification'] if result['step_three'] else None,
            'step_three_confidence': result['step_three']['confidence'] if result['step_three'] else None,
            'step_three_reasoning': result['step_three']['reasoning'] if result['step_three'] else None,
            'overall_confidence': result['overall_confidence'],
            'status': 'success',
            'index': index
        }
    except Exception as e:
        print(f"Error in hierarchical classification {Path(image_path).name}: {e}")
        return {
            'image_path': image_path,
            'image_url': image_url,
            'true_label': true_label,
            'predicted': 'ERROR',
            'step_one_classification': 'ERROR',
            'step_one_confidence': 0,
            'step_one_reasoning': f'Classification failed: {str(e)}',
            'step_two_classification': None,
            'step_two_confidence': None,
            'step_two_reasoning': None,
            'step_three_classification': None,
            'step_three_confidence': None,
            'step_three_reasoning': None,
            'overall_confidence': 0,
            'status': 'error',
            'index': index
        }

def classify_images_hierarchical_parallel(test_data, client, step_messages, max_workers=3):
    """
    Classify images using hierarchical binary classification in parallel.
    """
    results = []
    total = len(test_data)
    
    # Prepare arguments for parallel processing
    args_list = [
        (image_url, true_label, image_path, client, step_messages, i, total)
        for i, (image_url, true_label, image_path) in enumerate(test_data)
    ]
    
    print(f"Starting hierarchical classification with {max_workers} workers...")
    start_time = time.time()
    
    with ThreadPoolExecutor(max_workers=max_workers) as executor:
        # Submit all tasks
        future_to_args = {executor.submit(classify_image_hierarchical_wrapper, args): args for args in args_list}
        
        # Collect results as they complete
        for future in as_completed(future_to_args):
            try:
                result = future.result()
                results.append(result)
                
                # Print progress
                completed = len(results)
                elapsed = time.time() - start_time
                avg_time = elapsed / completed
                eta = avg_time * (total - completed)
                
                if completed % 5 == 0 or completed == total:
                    print(f"Progress: {completed}/{total} ({completed/total*100:.1f}%) - "
                          f"ETA: {eta:.1f}s")
                          
            except Exception as e:
                print(f"Future failed: {e}")
    
    # Sort results by index to maintain order
    results.sort(key=lambda x: x['index'])
    
    total_time = time.time() - start_time
    print(f"\nHierarchical classification completed in {total_time:.1f}s")
    print(f"Average time per image: {total_time/total:.2f}s")
    
    return results

In [None]:
# Upload all images from input/背景カットなし folder only
print("Uploading images from 背景カットなし folder to Blob Storage...")
uploaded_images = upload_images_from_folder("input/背景カットなし")
print(f"Uploaded {len(uploaded_images)} images")

In [None]:
# Load sample images for few-shot learning
sample_images = load_sample_images(uploaded_images)
print("Sample images loaded:")
for label, urls in sample_images.items():
    print(f"{label}: {len(urls)} images")

print("\n🖼️ Creating collage-based few-shot learning messages...")

# Create step-wise messages for hierarchical classification using collages
step_messages = {
    "step_one": create_step_one_messages_with_collage(sample_images),
    "step_two": create_step_two_messages_with_collage(sample_images),
    "step_three": create_step_three_messages_with_collage(sample_images)
}

print(f"\n✅ Created hierarchical classification messages with collages:")
print(f"Step 1 (OK vs NG): {len(step_messages['step_one'])} messages")
print(f"Step 2 (汚れ vs 加工不良): {len(step_messages['step_two'])} messages")
print(f"Step 3 (欠け vs 削り節): {len(step_messages['step_three'])} messages")

In [None]:
# Load test images
test_data = load_test_images(uploaded_images)
print(f"Loaded {len(test_data)} test images")

# Initialize OpenAI client
client = get_openai_client()

# Configure parallel processing
max_workers = 3  # Conservative setting for hierarchical classification
print(f"Using hierarchical binary classification with {max_workers} workers...")

# Execute hierarchical classification
results = classify_images_hierarchical_parallel(test_data, client, step_messages, max_workers)

# Convert to DataFrame
results_df = pd.DataFrame(results)

# Remove the index and status columns if not needed for evaluation
results_df = results_df.drop(['index', 'status'], axis=1, errors='ignore')

print("\nHierarchical classification completed!")
print(results_df[['image_path', 'true_label', 'predicted', 'step_one_classification']].head())

# Show error summary
error_count = len(results_df[results_df['predicted'] == 'ERROR'])
if error_count > 0:
    print(f"\nWarning: {error_count} images failed to classify")
    print("Error details:")
    error_df = results_df[results_df['predicted'] == 'ERROR']
    for _, row in error_df.iterrows():
        print(f"  {Path(row['image_path']).name}: {row['step_one_reasoning']}")

# Show step-wise classification statistics
print(f"\n=== Step-wise Classification Statistics ===")
print(f"Step 1 (OK/NG) distribution:")
step_one_dist = results_df['step_one_classification'].value_counts()
print(step_one_dist)

ng_df = results_df[results_df['step_one_classification'] == 'NG']
if len(ng_df) > 0:
    print(f"\nStep 2 (汚れ/加工不良) distribution for NG items:")
    step_two_dist = ng_df['step_two_classification'].value_counts()
    print(step_two_dist)
    
    defect_df = ng_df[ng_df['step_two_classification'] == '加工不良']
    if len(defect_df) > 0:
        print(f"\nStep 3 (欠け/削り節) distribution for 加工不良 items:")
        step_three_dist = defect_df['step_three_classification'].value_counts()
        print(step_three_dist)

In [None]:
# Evaluate hierarchical classification results (excluding error cases)
valid_results_df = results_df[results_df['predicted'] != 'ERROR'].copy()

print(f"=== Hierarchical Classification Evaluation ===")
print(f"Valid results: {len(valid_results_df)}/{len(results_df)} ({len(valid_results_df)/len(results_df)*100:.1f}%)")

# Overall accuracy
overall_accuracy = (valid_results_df['predicted'] == valid_results_df['true_label']).mean()
print(f"\nOverall Accuracy: {overall_accuracy:.3f}")

# Step-wise accuracy analysis
print(f"\n=== Step-wise Accuracy Analysis ===")

# Step 1: OK/NG accuracy
step1_true = valid_results_df['true_label'].apply(lambda x: 'OK' if x == 'OK' else 'NG')
step1_pred = valid_results_df['step_one_classification']
step1_accuracy = (step1_true == step1_pred).mean()
print(f"Step 1 (OK/NG) Accuracy: {step1_accuracy:.3f}")

# Step 2: 汚れ/加工不良 accuracy (for NG items only)
ng_items = valid_results_df[valid_results_df['step_one_classification'] == 'NG']
if len(ng_items) > 0:
    step2_true = ng_items['true_label'].apply(lambda x: '汚れ' if x == '汚れ' else '加工不良')
    step2_pred = ng_items['step_two_classification']
    step2_accuracy = (step2_true == step2_pred).mean()
    print(f"Step 2 (汚れ/加工不良) Accuracy: {step2_accuracy:.3f} (on {len(ng_items)} NG items)")

# Step 3: 欠け/削り節 accuracy (for 加工不良 items only)
defect_items = ng_items[ng_items['step_two_classification'] == '加工不良'] if len(ng_items) > 0 else pd.DataFrame()
if len(defect_items) > 0:
    step3_true = defect_items['true_label']
    step3_pred = defect_items['step_three_classification']
    step3_accuracy = (step3_true == step3_pred).mean()
    print(f"Step 3 (欠け/削り節) Accuracy: {step3_accuracy:.3f} (on {len(defect_items)} 加工不良 items)")

# Detailed classification report and confusion matrix
print(f"\n=== Final Classification Report ===")
labels = ["OK", "汚れ", "欠け", "削り節"]
print(classification_report(valid_results_df['true_label'], valid_results_df['predicted'], labels=labels))

# Confusion matrix
cm = confusion_matrix(valid_results_df['true_label'], valid_results_df['predicted'], labels=labels)
plt.figure(figsize=(10, 8))
sns.heatmap(cm, annot=True, fmt='d', cmap='Blues', 
            xticklabels=labels, yticklabels=labels)
plt.title('Hierarchical Classification - Confusion Matrix')
plt.xlabel('Predicted')
plt.ylabel('True')
plt.show()

# Save detailed results with step information
output_dir = 'analysis-results'
if not os.path.exists(output_dir):
	os.makedirs(output_dir)

timestamp = datetime.datetime.now().strftime("%Y%m%d%H%M")
results_filename = f'hierarchical_classification_results_{timestamp}.csv'
results_df.to_csv(f"{output_dir}/{results_filename}", index=False, encoding='utf-8-sig')
print(f"\nDetailed results saved to '{output_dir}/{results_filename}'")

# Display confidence statistics
print(f"\n=== Confidence Statistics ===")
print(f"Overall confidence: {valid_results_df['overall_confidence'].describe()}")
print(f"Step 1 confidence: {valid_results_df['step_one_confidence'].describe()}")

if len(ng_items) > 0:
    print(f"Step 2 confidence: {ng_items['step_two_confidence'].describe()}")
if len(defect_items) > 0:
    print(f"Step 3 confidence: {defect_items['step_three_confidence'].describe()}")

# Show some example predictions with step details
print(f"\n=== Example Hierarchical Predictions ===")
for idx, row in valid_results_df.head(3).iterrows():
    print(f"\nImage: {Path(row['image_path']).name}")
    print(f"True: {row['true_label']}, Final Predicted: {row['predicted']}")
    print(f"Step 1: {row['step_one_classification']} (confidence: {row['step_one_confidence']})")
    if row['step_two_classification']:
        print(f"Step 2: {row['step_two_classification']} (confidence: {row['step_two_confidence']})")
    if row['step_three_classification']:
        print(f"Step 3: {row['step_three_classification']} (confidence: {row['step_three_confidence']})")
    print(f"Overall confidence: {row['overall_confidence']:.1f}")

# Error analysis
print(f"\n=== Error Analysis ===")
incorrect_predictions = valid_results_df[valid_results_df['predicted'] != valid_results_df['true_label']]
if len(incorrect_predictions) > 0:
    print(f"Incorrect predictions: {len(incorrect_predictions)}")
    print("Error breakdown by true label:")
    error_breakdown = incorrect_predictions.groupby('true_label')['predicted'].value_counts()
    print(error_breakdown)

In [None]:
# 段階的二値分類の実行完了
# 
# 実行手順:
# 1. OK/NG の二値分類
# 2. NG の場合 → 汚れ/加工不良 の二値分類  
# 3. 加工不良の場合 → 欠け/削り節 の二値分類
#
# この段階的アプローチにより、各段階で特化した判定を行い、
# 全体的な分類精度の向上を図っています。

In [None]:
# =============================================================================
# 詳細誤り分析 - 各ステップの分類誤りを画像付きで表示
# =============================================================================

import matplotlib.pyplot as plt
import requests
from PIL import Image
from io import BytesIO

def display_image_from_url(image_url, title=""):
    """URLから画像を取得して表示（色味を正しく保持）"""
    try:
        response = requests.get(image_url)
        img = Image.open(BytesIO(response.content))
        
        # カラーモードを確認し、必要に応じてRGBに変換
        if img.mode == 'RGBA':
            # RGBAモードの場合、白背景でRGBに変換
            background = Image.new('RGB', img.size, (255, 255, 255))
            background.paste(img, mask=img.split()[-1])  # アルファチャンネルをマスクとして使用
            img = background
        elif img.mode == 'CMYK':
            # CMYKモードの場合、RGBに変換
            img = img.convert('RGB')
        elif img.mode not in ['RGB', 'L']:
            # その他のモードもRGBに変換
            img = img.convert('RGB')
        
        plt.figure(figsize=(6, 6))
        plt.imshow(img, cmap='gray', vmin=0, vmax=255)
        plt.title(title)
        plt.axis('off')
        plt.show()
    except Exception as e:
        print(f"画像表示エラー: {e}")

def analyze_step_errors(results_df, step_name, true_col, pred_col, reasoning_col, max_samples=5):
    """各ステップの分類誤りを分析"""
    print(f"\n{'='*60}")
    print(f"📊 {step_name} 分類誤り分析")
    print(f"{'='*60}")
    
    # 該当ステップの結果のみ抽出（NaNを除外）
    step_df = results_df.dropna(subset=[pred_col])
    
    if len(step_df) == 0:
        print(f"❌ {step_name} のデータがありません")
        return
    
    # 誤分類を特定
    errors = step_df[step_df[true_col] != step_df[pred_col]]
    
    print(f"📈 分析対象: {len(step_df)} 件")
    print(f"❌ 誤分類: {len(errors)} 件")
    print(f"✅ 精度: {((len(step_df) - len(errors)) / len(step_df) * 100):.1f}%")
    
    if len(errors) == 0:
        print("🎉 すべて正解です！")
        return
    
    # 誤分類パターンの統計
    print(f"\n📋 誤分類パターン:")
    error_patterns = errors.groupby([true_col, pred_col]).size().sort_values(ascending=False)
    for (true_label, pred_label), count in error_patterns.items():
        print(f"  {true_label} → {pred_label}: {count} 件")
    
    # 最大max_samples件の詳細表示
    print(f"\n🔍 詳細分析 (最大{max_samples}件):")
    sample_errors = errors.head(max_samples)
    
    for idx, (_, row) in enumerate(sample_errors.iterrows(), 1):
        print(f"\n--- 誤り {idx} ---")
        print(f"画像: {Path(row['image_path']).name}")
        print(f"正解: {row[true_col]}")
        print(f"予測: {row[pred_col]}")
        print(f"理由: {row[reasoning_col]}")
        
        # 画像表示
        title = f"誤り{idx}: {row[true_col]} → {row[pred_col]}"
        display_image_from_url(row['image_url'], title)

print("🔍 段階的分類誤り分析を開始します...")

# Step 1: OK/NG 分類誤り分析
step1_true = valid_results_df['true_label'].apply(lambda x: 'OK' if x == 'OK' else 'NG')
step1_df = valid_results_df.copy()
step1_df['step1_true'] = step1_true

analyze_step_errors(
    step1_df, 
    "Step 1 (OK/NG)", 
    'step1_true', 
    'step_one_classification', 
    'step_one_reasoning'
)

# Step 2: 汚れ/加工不良 分類誤り分析（NGアイテムのみ）
if len(ng_items) > 0:
    step2_true = ng_items['true_label'].apply(lambda x: '汚れ' if x == '汚れ' else '加工不良')
    step2_df = ng_items.copy()
    step2_df['step2_true'] = step2_true
    
    analyze_step_errors(
        step2_df,
        "Step 2 (汚れ/加工不良)",
        'step2_true',
        'step_two_classification',
        'step_two_reasoning'
    )

# Step 3: 欠け/削り節 分類誤り分析（加工不良アイテムのみ）
if len(defect_items) > 0:
    analyze_step_errors(
        defect_items,
        "Step 3 (欠け/削り節)",
        'true_label',
        'step_three_classification',
        'step_three_reasoning'
    )

# 最終分類の全体誤り分析
print(f"\n{'='*60}")
print(f"📊 最終分類 全体誤り分析")
print(f"{'='*60}")

final_errors = valid_results_df[valid_results_df['predicted'] != valid_results_df['true_label']]
print(f"📈 分析対象: {len(valid_results_df)} 件")
print(f"❌ 最終誤分類: {len(final_errors)} 件") 
print(f"✅ 最終精度: {overall_accuracy:.3f}")

if len(final_errors) > 0:
    print(f"\n📋 最終誤分類パターン:")
    final_error_patterns = final_errors.groupby(['true_label', 'predicted']).size().sort_values(ascending=False)
    for (true_label, pred_label), count in final_error_patterns.items():
        print(f"  {true_label} → {pred_label}: {count} 件")
    
    print(f"\n🔍 最終分類誤りの詳細 (最大5件):")
    sample_final_errors = final_errors.head(5)
    
    for idx, (_, row) in enumerate(sample_final_errors.iterrows(), 1):
        print(f"\n--- 最終誤り {idx} ---")
        print(f"画像: {Path(row['image_path']).name}")
        print(f"正解: {row['true_label']}")
        print(f"最終予測: {row['predicted']}")
        print(f"Step1: {row['step_one_classification']} (確信度: {row['step_one_confidence']})")
        if row['step_two_classification']:
            print(f"Step2: {row['step_two_classification']} (確信度: {row['step_two_confidence']})")
        if row['step_three_classification']:
            print(f"Step3: {row['step_three_classification']} (確信度: {row['step_three_confidence']})")
        print(f"全体確信度: {row['overall_confidence']:.1f}")
        
        # 画像表示
        title = f"最終誤り{idx}: {row['true_label']} → {row['predicted']}"
        display_image_from_url(row['image_url'], title)

print("\n✅ 誤り分析が完了しました。")

In [None]:
# =============================================================================
# 低確信度予測の分析 - 判定に迷いがある画像を特定
# =============================================================================

def analyze_low_confidence_predictions(results_df, confidence_threshold=2, max_samples=5):
    """確信度が低い予測を分析"""
    print(f"\n{'='*60}")
    print(f"🤔 低確信度予測分析 (確信度 ≤ {confidence_threshold})")
    print(f"{'='*60}")
    
    # 低確信度の予測を抽出
    low_confidence = results_df[results_df['overall_confidence'] <= confidence_threshold]
    
    print(f"📈 全体予測: {len(results_df)} 件")
    print(f"⚠️ 低確信度: {len(low_confidence)} 件 ({len(low_confidence)/len(results_df)*100:.1f}%)")
    
    if len(low_confidence) == 0:
        print("✅ 低確信度の予測はありません")
        return
    
    # 確信度の統計
    print(f"\n📊 確信度統計:")
    print(f"平均確信度: {low_confidence['overall_confidence'].mean():.1f}")
    print(f"最低確信度: {low_confidence['overall_confidence'].min():.1f}")
    print(f"最高確信度: {low_confidence['overall_confidence'].max():.1f}")
    
    # 正解/不正解の分布
    correct_low_conf = low_confidence[low_confidence['predicted'] == low_confidence['true_label']]
    incorrect_low_conf = low_confidence[low_confidence['predicted'] != low_confidence['true_label']]
    
    print(f"\n📋 低確信度予測の正解率:")
    print(f"  正解: {len(correct_low_conf)} 件")
    print(f"  不正解: {len(incorrect_low_conf)} 件")
    print(f"  正解率: {len(correct_low_conf)/len(low_confidence)*100:.1f}%")
    
    # カテゴリ別の低確信度分布
    print(f"\n📈 カテゴリ別低確信度分布:")
    category_dist = low_confidence['true_label'].value_counts()
    for category, count in category_dist.items():
        print(f"  {category}: {count} 件")
    
    # 最も確信度の低い予測を表示
    print(f"\n🔍 最も確信度の低い予測 (最大{max_samples}件):")
    lowest_conf = low_confidence.nsmallest(max_samples, 'overall_confidence')
    
    for idx, (_, row) in enumerate(lowest_conf.iterrows(), 1):
        print(f"\n--- 低確信度 {idx} (確信度: {row['overall_confidence']:.1f}) ---")
        print(f"画像: {Path(row['image_path']).name}")
        print(f"正解: {row['true_label']}")
        print(f"予測: {row['predicted']} {'✅' if row['predicted'] == row['true_label'] else '❌'}")
        print(f"Step1: {row['step_one_classification']} (確信度: {row['step_one_confidence']})")
        if row['step_two_classification']:
            print(f"Step2: {row['step_two_classification']} (確信度: {row['step_two_confidence']})")
        if row['step_three_classification']:
            print(f"Step3: {row['step_three_classification']} (確信度: {row['step_three_confidence']})")
        
        # 画像表示
        title = f"低確信度{idx}: {row['true_label']} → {row['predicted']} (確信度: {row['overall_confidence']:.1f})"
        display_image_from_url(row['image_url'], title)

# 低確信度予測の分析実行
analyze_low_confidence_predictions(valid_results_df, confidence_threshold=2, max_samples=5)

In [None]:
# =============================================================================
# ステップ間予測変化分析 - 段階的分類でどのように判定が変わったかを追跡
# =============================================================================

def analyze_prediction_flow(results_df, max_samples=3):
    """段階的分類の予測フローを分析"""
    print(f"\n{'='*60}")
    print(f"🔄 段階的分類フロー分析")
    print(f"{'='*60}")
    
    # Step1でOKと判定されたもの
    ok_predictions = results_df[results_df['step_one_classification'] == 'OK']
    print(f"📊 Step1でOK判定: {len(ok_predictions)} 件")
    
    # Step1でOKだが実際はNGのもの（誤判定）
    ok_but_ng = ok_predictions[ok_predictions['true_label'] != 'OK']
    if len(ok_but_ng) > 0:
        print(f"❌ Step1でOK誤判定: {len(ok_but_ng)} 件")
        print("詳細:")
        for label in ['汚れ', '欠け', '削り節']:
            count = len(ok_but_ng[ok_but_ng['true_label'] == label])
            if count > 0:
                print(f"  {label}をOKと誤判定: {count} 件")
        
        # サンプル表示
        print(f"\n🔍 Step1 OK誤判定サンプル (最大{max_samples}件):")
        for idx, (_, row) in enumerate(ok_but_ng.head(max_samples).iterrows(), 1):
            print(f"\n--- OK誤判定 {idx} ---")
            print(f"画像: {Path(row['image_path']).name}")
            print(f"正解: {row['true_label']} → Step1予測: OK")
            print(f"Step1理由: {row['step_one_reasoning']}")
            print(f"Step1確信度: {row['step_one_confidence']}")
            
            title = f"OK誤判定{idx}: {row['true_label']} → OK"
            display_image_from_url(row['image_url'], title)
    
    # Step2の分析
    ng_predictions = results_df[results_df['step_one_classification'] == 'NG']
    if len(ng_predictions) > 0:
        print(f"\n📊 Step1でNG判定: {len(ng_predictions)} 件")
        
        # Step2で汚れと判定されたもの
        dirt_predictions = ng_predictions[ng_predictions['step_two_classification'] == '汚れ']
        print(f"  Step2で汚れ判定: {len(dirt_predictions)} 件")
        
        # Step2で汚れだが実際は加工不良のもの
        dirt_but_defect = dirt_predictions[dirt_predictions['true_label'].isin(['欠け', '削り節'])]
        if len(dirt_but_defect) > 0:
            print(f"  ❌ Step2で汚れ誤判定: {len(dirt_but_defect)} 件")
            
            print(f"\n🔍 Step2 汚れ誤判定サンプル (最大{max_samples}件):")
            for idx, (_, row) in enumerate(dirt_but_defect.head(max_samples).iterrows(), 1):
                print(f"\n--- 汚れ誤判定 {idx} ---")
                print(f"画像: {Path(row['image_path']).name}")
                print(f"正解: {row['true_label']} → Step2予測: 汚れ")
                print(f"Step2理由: {row['step_two_reasoning']}")
                print(f"Step2確信度: {row['step_two_confidence']}")
                
                title = f"汚れ誤判定{idx}: {row['true_label']} → 汚れ"
                display_image_from_url(row['image_url'], title)
        
        # Step3の分析
        defect_predictions = ng_predictions[ng_predictions['step_two_classification'] == '加工不良']
        if len(defect_predictions) > 0:
            print(f"\n📊 Step2で加工不良判定: {len(defect_predictions)} 件")
            
            # Step3での欠け/削り節の判定誤り
            step3_errors = defect_predictions[defect_predictions['step_three_classification'] != defect_predictions['true_label']]
            if len(step3_errors) > 0:
                print(f"  ❌ Step3で誤判定: {len(step3_errors)} 件")
                
                print(f"\n🔍 Step3 誤判定サンプル (最大{max_samples}件):")
                for idx, (_, row) in enumerate(step3_errors.head(max_samples).iterrows(), 1):
                    print(f"\n--- Step3誤判定 {idx} ---")
                    print(f"画像: {Path(row['image_path']).name}")
                    print(f"正解: {row['true_label']} → Step3予測: {row['step_three_classification']}")
                    print(f"Step3理由: {row['step_three_reasoning']}")
                    print(f"Step3確信度: {row['step_three_confidence']}")
                    
                    title = f"Step3誤判定{idx}: {row['true_label']} → {row['step_three_classification']}"
                    display_image_from_url(row['image_url'], title)

# 段階的分類フロー分析の実行
analyze_prediction_flow(valid_results_df, max_samples=3)

In [None]:
# =============================================================================
# 段階的分類サマリーレポート
# =============================================================================

def generate_summary_report(results_df):
    """段階的分類の包括的なサマリーレポートを生成"""
    print(f"\n{'='*80}")
    print(f"📊 段階的二値分類 包括サマリーレポート")
    print(f"{'='*80}")
    
    total_images = len(results_df)
    valid_images = len(results_df[results_df['predicted'] != 'ERROR'])
    
    print(f"🔢 基本統計:")
    print(f"  総画像数: {total_images}")
    print(f"  有効分類: {valid_images} ({valid_images/total_images*100:.1f}%)")
    print(f"  最終精度: {overall_accuracy:.3f}")
    
    # 各ステップの精度
    step1_true = valid_results_df['true_label'].apply(lambda x: 'OK' if x == 'OK' else 'NG')
    step1_accuracy = (step1_true == valid_results_df['step_one_classification']).mean()
    
    print(f"\n📈 ステップ別精度:")
    print(f"  Step 1 (OK/NG): {step1_accuracy:.3f}")
    
    if len(ng_items) > 0:
        step2_true = ng_items['true_label'].apply(lambda x: '汚れ' if x == '汚れ' else '加工不良')
        step2_accuracy = (step2_true == ng_items['step_two_classification']).mean()
        print(f"  Step 2 (汚れ/加工不良): {step2_accuracy:.3f}")
    
    if len(defect_items) > 0:
        step3_accuracy = (defect_items['true_label'] == defect_items['step_three_classification']).mean()
        print(f"  Step 3 (欠け/削り節): {step3_accuracy:.3f}")
    
    # 確信度統計
    print(f"\n🎯 確信度統計:")
    print(f"  全体確信度平均: {valid_results_df['overall_confidence'].mean():.1f}")
    print(f"  Step1確信度平均: {valid_results_df['step_one_confidence'].mean():.1f}")
    if len(ng_items) > 0:
        print(f"  Step2確信度平均: {ng_items['step_two_confidence'].mean():.1f}")
    if len(defect_items) > 0:
        print(f"  Step3確信度平均: {defect_items['step_three_confidence'].mean():.1f}")
    
    # カテゴリ別分析
    print(f"\n📊 カテゴリ別性能:")
    for label in ['OK', '汚れ', '欠け', '削り節']:
        true_count = len(valid_results_df[valid_results_df['true_label'] == label])
        correct_count = len(valid_results_df[
            (valid_results_df['true_label'] == label) & 
            (valid_results_df['predicted'] == label)
        ])
        if true_count > 0:
            accuracy = correct_count / true_count
            print(f"  {label}: {correct_count}/{true_count} ({accuracy:.3f})")
    
    # 主要な問題点
    print(f"\n⚠️ 主要な問題点:")
    
    # Step1でOKと誤判定
    ok_errors = valid_results_df[
        (valid_results_df['step_one_classification'] == 'OK') & 
        (valid_results_df['true_label'] != 'OK')
    ]
    if len(ok_errors) > 0:
        print(f"  • 不良品をOKと誤判定: {len(ok_errors)} 件")
    
    # Step2で汚れと誤判定
    dirt_errors = ng_items[
        (ng_items['step_two_classification'] == '汚れ') & 
        (ng_items['true_label'].isin(['欠け', '削り節']))
    ] if len(ng_items) > 0 else pd.DataFrame()
    if len(dirt_errors) > 0:
        print(f"  • 加工不良を汚れと誤判定: {len(dirt_errors)} 件")
    
    # 低確信度予測
    low_conf = valid_results_df[valid_results_df['overall_confidence'] <= 5]
    if len(low_conf) > 0:
        print(f"  • 低確信度予測 (≤5): {len(low_conf)} 件 ({len(low_conf)/len(valid_results_df)*100:.1f}%)")
    
    # 改善提案
    print(f"\n💡 改善提案:")
    
    if len(ok_errors) > 5:
        print(f"  • Step1のNG検出感度向上が必要")
        print(f"    - より多様なNG例をサンプルに追加")
        print(f"    - プロンプトでの異常検出基準を強化")
    
    if len(dirt_errors) > 3:
        print(f"  • Step2の汚れ/加工不良判別精度向上が必要")
        print(f"    - 汚れと物理的損傷の特徴差を明確化")
        print(f"    - より具体的な判定基準を提供")
    
    if len(low_conf) > len(valid_results_df) * 0.1:
        print(f"  • 確信度向上が必要")
        print(f"    - サンプル数の増加")
        print(f"    - より明確な判定基準の設定")
    
    print(f"\n✅ レポート生成完了")

# サマリーレポートの生成
generate_summary_report(valid_results_df)