# 傷検出AI開発 - 完全版サンプルコード

このノートブックでは、製造現場で使える傷検出AIアプリを開発します。

## 重要：Teachable Machineのモデルエクスポート方法
1. Teachable Machineで「モデルをエクスポート」をクリック
2. 「Tensorflow」タブを選択
3. 「Keras」を選択
4. 「モデルをダウンロード」をクリック
→ `keras_model.h5`と`labels.txt`がダウンロードされます

## 開発の流れ
1. データ収集と整理
2. Teachable Machineでモデル作成
3. Google Vision APIとの連携（オプション）
4. Gradioで実用的なWebアプリ作成

In [None]:
# 必要なライブラリのインストール
!pip install tensorflow gradio opencv-python pillow numpy matplotlib
!pip install google-cloud-vision  # Vision API用（オプション）

In [None]:
# Google Driveのマウント
from google.colab import drive
drive.mount('/content/drive')

# プロジェクトフォルダの作成
import os
project_path = '/content/drive/MyDrive/damage_detection_project'
os.makedirs(project_path, exist_ok=True)
os.makedirs(f'{project_path}/dataset/good', exist_ok=True)
os.makedirs(f'{project_path}/dataset/bad', exist_ok=True)
os.makedirs(f'{project_path}/models', exist_ok=True)
os.makedirs(f'{project_path}/results', exist_ok=True)

print("プロジェクトフォルダを作成しました")

## 1. データ収集ヘルパー関数

In [None]:
import cv2
import numpy as np
from PIL import Image
import matplotlib.pyplot as plt
from google.colab import files
from datetime import datetime
import json

# 日本語フォントの設定
!apt-get -y install fonts-ipafont-gothic
plt.rcParams['font.family'] = 'IPAGothic'

class DataCollector:
    def __init__(self, project_path):
        self.project_path = project_path
        self.metadata = {
            'good': [],
            'bad': [],
            'created_at': datetime.now().strftime('%Y-%m-%d %H:%M:%S')
        }
    
    def upload_images(self, category='good'):
        """
        画像をアップロードしてデータセットに追加
        category: 'good' (良品) or 'bad' (不良品)
        """
        print(f"{'良品' if category == 'good' else '不良品'}の画像をアップロードしてください")
        uploaded = files.upload()
        
        for filename in uploaded.keys():
            # ファイル名を統一形式に変更
            timestamp = datetime.now().strftime('%Y%m%d_%H%M%S')
            new_filename = f"{category}_{timestamp}_{filename}"
            
            # 画像を保存
            img = Image.open(filename)
            save_path = f"{self.project_path}/dataset/{category}/{new_filename}"
            img.save(save_path)
            
            # メタデータを記録
            self.metadata[category].append({
                'filename': new_filename,
                'original_filename': filename,
                'timestamp': timestamp,
                'size': img.size
            })
            
            print(f"✓ {filename} を {category} として保存しました")
    
    def show_dataset_summary(self):
        """
        データセットの概要を表示
        """
        good_count = len(os.listdir(f"{self.project_path}/dataset/good"))
        bad_count = len(os.listdir(f"{self.project_path}/dataset/bad"))
        
        fig, ax = plt.subplots(1, 1, figsize=(8, 6))
        categories = ['良品', '不良品']
        counts = [good_count, bad_count]
        colors = ['green', 'red']
        
        bars = ax.bar(categories, counts, color=colors)
        ax.set_ylabel('画像数', fontsize=12)
        ax.set_title('データセット概要', fontsize=14)
        
        # 数値をバーの上に表示
        for bar, count in zip(bars, counts):
            ax.text(bar.get_x() + bar.get_width()/2, bar.get_height() + 1,
                   str(count), ha='center', va='bottom')
        
        plt.show()
        
        print(f"\n合計: {good_count + bad_count}枚")
        print(f"良品: {good_count}枚")
        print(f"不良品: {bad_count}枚")
        
        if good_count < 50 or bad_count < 50:
            print("\n⚠️ 注意: 各カテゴリ50枚以上の画像を推奨します")
    
    def save_metadata(self):
        """
        メタデータを保存
        """
        with open(f"{self.project_path}/dataset/metadata.json", 'w') as f:
            json.dump(self.metadata, f, ensure_ascii=False, indent=2)
        print("メタデータを保存しました")

# データ収集オブジェクトの作成
collector = DataCollector(project_path)

In [None]:
# 良品画像のアップロード
collector.upload_images('good')

In [None]:
# 不良品画像のアップロード
collector.upload_images('bad')

In [None]:
# データセットの概要を表示
collector.show_dataset_summary()
collector.save_metadata()

## 2. Teachable Machineモデルの読み込みと改良

In [None]:
# Teachable Machineのモデルファイルをアップロード
print("Teachable Machineからエクスポートした keras_model.h5 と labels.txt をアップロードしてください")
uploaded = files.upload()

# モデルファイルをプロジェクトフォルダに移動
import shutil
for filename in uploaded.keys():
    shutil.move(filename, f"{project_path}/models/{filename}")
    print(f"✓ {filename} を保存しました")

In [None]:
import tensorflow as tf

class DamageDetector:
    def __init__(self, model_path, labels_path):
        # モデルとラベルの読み込み
        self.model = tf.keras.models.load_model(model_path, compile=False)
        
        with open(labels_path, 'r', encoding='utf-8') as f:
            self.labels = [line.strip() for line in f.readlines()]
        
        # パフォーマンス記録
        self.performance_log = []
    
    def preprocess_image(self, image):
        """
        画像の前処理
        """
        if isinstance(image, str):
            # ファイルパスの場合
            img = Image.open(image).convert('RGB')
        else:
            # numpy配列の場合
            img = Image.fromarray(image).convert('RGB')
        
        # リサイズと正規化
        img = img.resize((224, 224))
        img_array = np.array(img) / 255.0
        img_array = np.expand_dims(img_array, axis=0)
        
        return img_array, img
    
    def predict(self, image):
        """
        画像の予測
        """
        start_time = datetime.now()
        
        # 前処理
        img_array, original_img = self.preprocess_image(image)
        
        # 予測
        predictions = self.model.predict(img_array, verbose=0)
        
        # 処理時間を記録
        process_time = (datetime.now() - start_time).total_seconds()
        
        # 結果を整理
        results = {}
        for i, label in enumerate(self.labels):
            clean_label = label.split(' ', 1)[1] if ' ' in label else label
            results[clean_label] = float(predictions[0][i])
        
        # パフォーマンスログに追加
        self.performance_log.append({
            'timestamp': datetime.now().strftime('%Y-%m-%d %H:%M:%S'),
            'process_time': process_time,
            'result': max(results, key=results.get)
        })
        
        return results, original_img, process_time
    
    def analyze_with_confidence(self, image):
        """
        信頼度を含む詳細な分析
        """
        results, img, process_time = self.predict(image)
        
        # 判定ロジック
        bad_prob = results.get('不良品', results.get('bad', 0))
        good_prob = results.get('良品', results.get('good', 0))
        
        # 判定基準
        if bad_prob > 0.8:
            status = "不良品"
            confidence = "高"
            action = "廃棄または再加工を推奨"
            color = "red"
        elif bad_prob > 0.5:
            status = "要確認"
            confidence = "中"
            action = "目視での再確認を推奨"
            color = "orange"
        elif bad_prob > 0.3:
            status = "要注意"
            confidence = "低"
            action = "品質管理者による確認を推奨"
            color = "yellow"
        else:
            status = "良品"
            confidence = "高"
            action = "次工程へ進めて問題ありません"
            color = "green"
        
        analysis = {
            'status': status,
            'confidence': confidence,
            'action': action,
            'color': color,
            'probabilities': results,
            'process_time': process_time,
            'image': img
        }
        
        return analysis
    
    def visualize_analysis(self, analysis):
        """
        分析結果の可視化
        """
        fig, (ax1, ax2) = plt.subplots(1, 2, figsize=(15, 6))
        
        # 画像表示
        ax1.imshow(analysis['image'])
        ax1.axis('off')
        ax1.set_title(f"判定: {analysis['status']}", fontsize=16, 
                     color=analysis['color'], weight='bold')
        
        # 確率の可視化
        labels = list(analysis['probabilities'].keys())
        values = list(analysis['probabilities'].values())
        colors = ['red' if '不良' in l or 'bad' in l else 'green' for l in labels]
        
        bars = ax2.barh(labels, values, color=colors)
        ax2.set_xlabel('確率', fontsize=12)
        ax2.set_title('詳細分析結果', fontsize=14)
        ax2.set_xlim(0, 1)
        
        # 値をバーの上に表示
        for bar, value in zip(bars, values):
            ax2.text(value + 0.01, bar.get_y() + bar.get_height()/2, 
                    f'{value:.1%}', va='center')
        
        # 推奨アクションを表示
        plt.figtext(0.5, 0.02, f"推奨アクション: {analysis['action']}", 
                   ha='center', fontsize=12, style='italic')
        plt.figtext(0.5, -0.02, f"処理時間: {analysis['process_time']:.3f}秒", 
                   ha='center', fontsize=10)
        
        plt.tight_layout()
        plt.show()

# 検出器の初期化
detector = DamageDetector(
    model_path=f"{project_path}/models/keras_model.h5",
    labels_path=f"{project_path}/models/labels.txt"
)

## 3. Google Vision API連携（オプション）

より高度な分析のためにGoogle Vision APIを使用する場合のコードです。

In [None]:
# Vision API を使用する場合（認証ファイルが必要）
USE_VISION_API = False  # Vision APIを使う場合はTrueに変更

if USE_VISION_API:
    from google.cloud import vision
    import io
    
    class VisionAPIAnalyzer:
        def __init__(self):
            self.client = vision.ImageAnnotatorClient()
        
        def analyze_image(self, image_path):
            """
            Vision APIで画像を分析
            """
            with io.open(image_path, 'rb') as image_file:
                content = image_file.read()
            
            image = vision.Image(content=content)
            
            # 物体検出
            objects = self.client.object_localization(image=image).localized_object_annotations
            
            # ラベル検出
            labels = self.client.label_detection(image=image).label_annotations
            
            # 画像の特性
            properties = self.client.image_properties(image=image).image_properties_annotation
            
            return {
                'objects': [(obj.name, obj.score) for obj in objects],
                'labels': [(label.description, label.score) for label in labels],
                'dominant_colors': properties.dominant_colors
            }
    
    vision_analyzer = VisionAPIAnalyzer()
else:
    print("Vision APIは使用しません。Teachable Machineのモデルのみを使用します。")

## 4. Gradioを使った実用的なWebアプリ

In [None]:
import gradio as gr
import pandas as pd
from datetime import datetime

# 検査履歴を保存するリスト
inspection_history = []

def inspect_part(image):
    """
    部品検査のメイン関数
    """
    if image is None:
        return None, "画像をアップロードしてください", None, None
    
    # 分析実行
    analysis = detector.analyze_with_confidence(image)
    
    # 結果の整理
    probabilities = analysis['probabilities']
    
    # 判定メッセージの作成
    status_emoji = {
        "良品": "✅",
        "不良品": "❌",
        "要確認": "⚠️",
        "要注意": "🔍"
    }
    
    message = f"""
{status_emoji.get(analysis['status'], '')} **判定結果: {analysis['status']}**

**信頼度**: {analysis['confidence']}
**推奨アクション**: {analysis['action']}
**処理時間**: {analysis['process_time']:.3f}秒
"""
    
    # 履歴に追加
    inspection_history.append({
        '検査日時': datetime.now().strftime('%Y-%m-%d %H:%M:%S'),
        '判定': analysis['status'],
        '信頼度': analysis['confidence'],
        '不良品確率': f"{probabilities.get('不良品', 0):.1%}",
        '処理時間': f"{analysis['process_time']:.3f}秒"
    })
    
    # 履歴データフレーム作成（最新10件）
    history_df = pd.DataFrame(inspection_history[-10:])
    
    # 統計情報の作成
    if len(inspection_history) > 0:
        total = len(inspection_history)
        good_count = sum(1 for h in inspection_history if h['判定'] == '良品')
        bad_count = sum(1 for h in inspection_history if h['判定'] == '不良品')
        
        stats_message = f"""
### 📊 本日の検査統計
- 総検査数: {total}件
- 良品: {good_count}件 ({good_count/total*100:.1f}%)
- 不良品: {bad_count}件 ({bad_count/total*100:.1f}%)
- 平均処理時間: {np.mean([float(h['処理時間'].replace('秒', '')) for h in inspection_history]):.3f}秒
"""
    else:
        stats_message = "統計情報はまだありません"
    
    return probabilities, message, history_df, stats_message

def export_history():
    """
    検査履歴をCSVファイルとしてエクスポート
    """
    if len(inspection_history) > 0:
        df = pd.DataFrame(inspection_history)
        filename = f"inspection_history_{datetime.now().strftime('%Y%m%d_%H%M%S')}.csv"
        df.to_csv(filename, index=False, encoding='utf-8-sig')
        return f"履歴を {filename} として保存しました"
    return "エクスポートする履歴がありません"

# カスタムCSS
custom_css = """
.gradio-container {
    font-family: 'Noto Sans JP', sans-serif;
}
.output-markdown h3 {
    color: #1f77b4;
}
"""

# Gradioインターフェースの作成
with gr.Blocks(title="傷検出AI検査システム", theme=gr.themes.Soft(), css=custom_css) as app:
    gr.Markdown("""
    # 🏭 製造部品 傷検出AIシステム
    
    金属部品の表面検査をAIがサポートします。画像をアップロードして「検査開始」をクリックしてください。
    """)
    
    with gr.Row():
        with gr.Column(scale=1):
            input_image = gr.Image(
                label="検査対象の画像",
                type="numpy",
                height=300
            )
            
            with gr.Row():
                inspect_btn = gr.Button("🔍 検査開始", variant="primary", size="lg")
                clear_btn = gr.Button("🗑️ クリア", variant="secondary")
            
            # 使い方のアコーディオン
            with gr.Accordion("📖 使い方", open=False):
                gr.Markdown("""
                ### 撮影のポイント
                1. **照明**: 均一な照明下で撮影
                2. **角度**: 部品に対して正面から撮影
                3. **距離**: 部品全体が画角に収まるように
                4. **背景**: シンプルな背景を使用
                
                ### 判定基準
                - **良品**: 不良品確率 30%未満
                - **要注意**: 不良品確率 30-50%
                - **要確認**: 不良品確率 50-80%
                - **不良品**: 不良品確率 80%以上
                """)
        
        with gr.Column(scale=1):
            output_label = gr.Label(
                label="判定確率",
                num_top_classes=2
            )
            
            output_message = gr.Markdown(
                label="判定結果",
                value="画像をアップロードして検査を開始してください"
            )
            
            stats_display = gr.Markdown(
                label="統計情報"
            )
    
    # 検査履歴のタブ
    with gr.Tab("検査履歴"):
        history_table = gr.Dataframe(
            label="最近の検査結果（最新10件）",
            headers=["検査日時", "判定", "信頼度", "不良品確率", "処理時間"],
            datatype=["str", "str", "str", "str", "str"],
            row_count=10
        )
        
        export_btn = gr.Button("📥 履歴をエクスポート", variant="secondary")
        export_message = gr.Textbox(label="エクスポート状態", interactive=False)
    
    # バッチ処理タブ
    with gr.Tab("バッチ処理"):
        gr.Markdown("""
        ### 複数画像の一括検査
        複数の画像を一度にアップロードして検査できます。
        """)
        
        batch_input = gr.File(
            label="画像ファイルを選択（複数可）",
            file_count="multiple",
            file_types=["image"]
        )
        
        batch_btn = gr.Button("🚀 一括検査開始", variant="primary")
        batch_output = gr.Dataframe(
            label="バッチ処理結果",
            headers=["ファイル名", "判定", "不良品確率", "処理時間"]
        )
    
    # イベントハンドラー
    inspect_btn.click(
        fn=inspect_part,
        inputs=input_image,
        outputs=[output_label, output_message, history_table, stats_display]
    )
    
    clear_btn.click(
        fn=lambda: (None, None, "画像をアップロードして検査を開始してください", None, None),
        outputs=[input_image, output_label, output_message, history_table, stats_display]
    )
    
    export_btn.click(
        fn=export_history,
        outputs=export_message
    )
    
    # バッチ処理の実装
    def batch_process(files):
        if not files:
            return pd.DataFrame()
        
        results = []
        for file in files:
            analysis = detector.analyze_with_confidence(file.name)
            results.append({
                'ファイル名': os.path.basename(file.name),
                '判定': analysis['status'],
                '不良品確率': f"{analysis['probabilities'].get('不良品', 0):.1%}",
                '処理時間': f"{analysis['process_time']:.3f}秒"
            })
        
        return pd.DataFrame(results)
    
    batch_btn.click(
        fn=batch_process,
        inputs=batch_input,
        outputs=batch_output
    )

# アプリを起動
app.launch(share=True, debug=True)

## 5. モデルの性能評価とレポート生成

In [None]:
def evaluate_model_performance():
    """
    モデルの性能を評価してレポートを生成
    """
    test_results = []
    
    # テストデータで評価
    for category in ['good', 'bad']:
        folder_path = f"{project_path}/dataset/{category}"
        images = os.listdir(folder_path)[:10]  # 各カテゴリから10枚ずつ
        
        for img_file in images:
            img_path = os.path.join(folder_path, img_file)
            analysis = detector.analyze_with_confidence(img_path)
            
            # 正解ラベルと予測を比較
            true_label = '良品' if category == 'good' else '不良品'
            predicted_label = analysis['status']
            
            test_results.append({
                'true': true_label,
                'predicted': predicted_label,
                'confidence': analysis['probabilities'].get('不良品', 0),
                'correct': true_label == predicted_label or 
                          (true_label == '不良品' and predicted_label in ['不良品', '要確認'])
            })
    
    # 精度計算
    accuracy = sum(r['correct'] for r in test_results) / len(test_results)
    
    # 混同行列の作成
    from sklearn.metrics import confusion_matrix
    import seaborn as sns
    
    true_labels = [r['true'] for r in test_results]
    pred_labels = [r['predicted'] for r in test_results]
    
    plt.figure(figsize=(10, 8))
    
    # 混同行列
    plt.subplot(2, 2, 1)
    cm = confusion_matrix(true_labels, pred_labels, labels=['良品', '不良品', '要確認', '要注意'])
    sns.heatmap(cm, annot=True, fmt='d', cmap='Blues')
    plt.title('混同行列')
    plt.ylabel('正解ラベル')
    plt.xlabel('予測ラベル')
    
    # 精度グラフ
    plt.subplot(2, 2, 2)
    plt.bar(['精度'], [accuracy], color='green')
    plt.ylim(0, 1)
    plt.title(f'モデル精度: {accuracy:.1%}')
    
    # 処理時間の分布
    plt.subplot(2, 2, 3)
    process_times = [float(log['process_time']) for log in detector.performance_log]
    if process_times:
        plt.hist(process_times, bins=20, color='blue', alpha=0.7)
        plt.xlabel('処理時間（秒）')
        plt.ylabel('頻度')
        plt.title('処理時間の分布')
    
    plt.tight_layout()
    plt.savefig(f"{project_path}/results/performance_report.png", dpi=300, bbox_inches='tight')
    plt.show()
    
    # レポートの生成
    report = f"""
# モデル性能評価レポート

生成日時: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}

## 評価結果サマリー
- テストデータ数: {len(test_results)}件
- 全体精度: {accuracy:.1%}
- 平均処理時間: {np.mean(process_times):.3f}秒

## 推奨事項
1. 現在の精度は{accuracy:.1%}です。{'良好な性能です。' if accuracy > 0.8 else 'データを追加して再学習することを推奨します。'}
2. 処理時間は平均{np.mean(process_times):.3f}秒で、リアルタイム検査に適しています。
3. 誤検出を減らすため、照明条件を統一することを推奨します。

## 次のステップ
- より多様な不良品パターンのデータ収集
- エッジデバイスでの動作検証
- 現場オペレーターからのフィードバック収集
"""
    
    with open(f"{project_path}/results/evaluation_report.md", 'w') as f:
        f.write(report)
    
    print(report)
    return accuracy, report

# 評価実行
if len(os.listdir(f"{project_path}/dataset/good")) > 0:
    accuracy, report = evaluate_model_performance()
else:
    print("評価するためのデータがありません。先にデータを収集してください。")

## まとめと今後の展開

このノートブックで実装した内容：

1. **データ収集と管理**
   - 体系的なデータセット構築
   - メタデータの記録

2. **AIモデルの実装**
   - Teachable Machineモデルの統合
   - 詳細な分析機能

3. **実用的なWebアプリ**
   - 直感的なUI
   - バッチ処理機能
   - 検査履歴の管理

4. **性能評価**
   - 精度測定
   - レポート生成

### 今後の改良案
- リアルタイムカメラ連携
- 不良箇所のヒートマップ表示
- 複数の不良パターンの分類
- エッジデバイスへの展開