# 05. 推論テスト & 角度計算ロジック検証

## 概要
1. サーバーレスエンドポイントでの推論テスト（複数画像）
2. キーポイントから針の角度を計算
3. 角度からメーター値を算出
4. Bedrockの読み取り結果と精度を比較
5. 結果のサマリーレポート

## 1. セットアップ

In [None]:
import boto3
import json
import math
import time
from decimal import Decimal
from pathlib import Path
from datetime import datetime

import sagemaker

s3 = boto3.client('s3')
runtime = boto3.client('sagemaker-runtime')
dynamodb = boto3.resource('dynamodb')
sess = sagemaker.Session()

SAGEMAKER_BUCKET = sess.default_bucket()
SOURCE_BUCKET = 'facteye-images-20251114'
ENDPOINT_NAME = 'facteye-meter-keypoint'

print(f'Endpoint: {ENDPOINT_NAME}')
print(f'Source Bucket: {SOURCE_BUCKET}')

## 2. 角度計算ロジック

### 計算の考え方

```
       scale_max
      /
     /  θ_max
    ●--------→ (基準方向: 3時方向)
   center  θ_needle
     \  
      \ 
       needle_tip
       
       scale_min
```

1. `needle_center` → `needle_tip` の角度（θ_needle）を計算
2. `needle_center` → `scale_min` の角度（θ_min）を計算
3. `needle_center` → `scale_max` の角度（θ_max）を計算
4. 針の位置をスケール範囲内での割合に変換
5. メーターの最小値・最大値から実際の値を算出

In [None]:
def calc_angle(center, point):
    """中心点から対象点への角度を計算（度数法、3時方向=0°、反時計回りが正）
    
    ※ 画像座標系ではY軸が下向きなので、atan2の符号を反転する
    """
    dx = point['x'] - center['x']
    dy = -(point['y'] - center['y'])  # Y軸反転（画像座標→数学座標）
    angle = math.degrees(math.atan2(dy, dx))
    return angle  # -180 ~ 180


def normalize_angle(angle):
    """角度を0-360度に正規化"""
    return angle % 360


def angle_between(angle, start, end):
    """角度が start → end の範囲内にあるかの割合を計算
    
    時計回りのメーターを想定（scale_min → scale_max が時計回り）
    """
    # 全て0-360に正規化
    angle = normalize_angle(angle)
    start = normalize_angle(start)
    end = normalize_angle(end)
    
    # スケール範囲の角度幅
    if end >= start:
        total_range = end - start
    else:
        total_range = (360 - start) + end
    
    if total_range == 0:
        return 0.0
    
    # 針の位置（start からの角度）
    if angle >= start:
        needle_offset = angle - start
    else:
        needle_offset = (360 - start) + angle
    
    ratio = needle_offset / total_range
    return max(0.0, min(1.0, ratio))


def calculate_meter_value(keypoints, scale_min_value, scale_max_value):
    """キーポイントからメーター値を計算する
    
    Args:
        keypoints: {"needle_tip": {"x": ..., "y": ...}, ...}
        scale_min_value: メーターの最小目盛り値（例: 0）
        scale_max_value: メーターの最大目盛り値（例: 100）
    
    Returns:
        dict: {"value": float, "ratio": float, "angles": dict}
    """
    center = keypoints['needle_center']
    tip = keypoints['needle_tip']
    s_min = keypoints['scale_min']
    s_max = keypoints['scale_max']
    
    # 各角度を計算
    angle_needle = calc_angle(center, tip)
    angle_min = calc_angle(center, s_min)
    angle_max = calc_angle(center, s_max)
    
    # 針のスケール範囲内での割合
    ratio = angle_between(angle_needle, angle_min, angle_max)
    
    # メーター値を算出
    value = scale_min_value + (scale_max_value - scale_min_value) * ratio
    
    return {
        'value': round(value, 2),
        'ratio': round(ratio, 4),
        'angles': {
            'needle': round(angle_needle, 2),
            'scale_min': round(angle_min, 2),
            'scale_max': round(angle_max, 2)
        }
    }


# 単体テスト
test_kp = {
    'needle_center': {'x': 100, 'y': 100},
    'needle_tip': {'x': 100, 'y': 50},     # 12時方向（真上）
    'scale_min': {'x': 50, 'y': 150},       # 7時方向
    'scale_max': {'x': 150, 'y': 150}       # 5時方向
}

result = calculate_meter_value(test_kp, 0, 100)
print(f'テスト結果: {result}')
print(f'  針は12時方向 → 約50%付近が期待値')

## 3. テスト画像の準備

In [None]:
def get_meter_config():
    """Metersテーブルからメーター設定を取得"""
    table = dynamodb.Table('Meters')
    configs = {}
    
    response = table.scan(
        FilterExpression='attribute_not_exists(deleted_at)'
    )
    
    for item in response.get('Items', []):
        meter_id = item['meter_id']
        meter_type = item.get('meter_type', 'unknown')
        
        # アナログメーターのスケール情報（テーブルにない場合はデフォルト値）
        # 実際のスケール値はメーター種別に依存
        scale_ranges = {
            'hygrometer': (0, 100),       # 湿度: 0-100%
            'pressure': (0, 1.0),         # 圧力: 0-1.0 MPa（仮）
            'temperature': (-20, 60),     # 温度: -20~60℃
            'voltmeter': (0, 300),        # 電圧: 0-300V
        }
        
        min_val, max_val = scale_ranges.get(meter_type, (0, 100))
        
        configs[meter_id] = {
            'meter_id': meter_id,
            'meter_type': meter_type,
            'device_id': item.get('device_id'),
            'company_id': item['company_id'],
            'unit': item.get('unit', ''),
            'scale_min_value': min_val,
            'scale_max_value': max_val
        }
    
    return configs

meter_configs = get_meter_config()
print(f'メーター設定数: {len(meter_configs)}')
for mid, cfg in list(meter_configs.items())[:5]:
    print(f'  {mid}: {cfg["meter_type"]} ({cfg["scale_min_value"]}-{cfg["scale_max_value"]} {cfg["unit"]})')

In [None]:
# テスト画像をS3から取得（各メーター種別から数枚ずつ）
TARGET_TYPES = ['hygrometer', 'pressure']
MAX_IMAGES_PER_TYPE = 10

# device_id → meter_type マッピング
device_to_meters = {}
for mid, cfg in meter_configs.items():
    did = cfg.get('device_id')
    if did and cfg['meter_type'] in TARGET_TYPES:
        if did not in device_to_meters:
            device_to_meters[did] = []
        device_to_meters[did].append(cfg)

# テスト画像を収集
test_images = []
type_counts = {t: 0 for t in TARGET_TYPES}

paginator = s3.get_paginator('list_objects_v2')
for page in paginator.paginate(Bucket=SOURCE_BUCKET, Prefix='images/'):
    for obj in page.get('Contents', []):
        key = obj['Key']
        if not key.endswith('.jpg'):
            continue
        
        parts = key.split('/')
        if len(parts) >= 3:
            device_id = parts[2]
            if device_id in device_to_meters:
                for meter in device_to_meters[device_id]:
                    mt = meter['meter_type']
                    if type_counts[mt] < MAX_IMAGES_PER_TYPE:
                        test_images.append({
                            's3_key': key,
                            'meter': meter
                        })
                        type_counts[mt] += 1
    
    # 十分な画像が集まったら終了
    if all(c >= MAX_IMAGES_PER_TYPE for c in type_counts.values()):
        break

print(f'テスト画像数: {len(test_images)}')
for mt, count in type_counts.items():
    print(f'  {mt}: {count}枚')

## 4. バッチ推論テスト

In [None]:
def invoke_endpoint(image_bytes):
    """SageMakerエンドポイントを呼び出す"""
    response = runtime.invoke_endpoint(
        EndpointName=ENDPOINT_NAME,
        ContentType='image/jpeg',
        Accept='application/json',
        Body=image_bytes
    )
    return json.loads(response['Body'].read().decode('utf-8'))


# 全テスト画像で推論実行
inference_results = []
errors = []

for i, test_img in enumerate(test_images):
    key = test_img['s3_key']
    meter = test_img['meter']
    
    try:
        # 画像取得
        response = s3.get_object(Bucket=SOURCE_BUCKET, Key=key)
        image_bytes = response['Body'].read()
        
        # 推論
        start = time.time()
        result = invoke_endpoint(image_bytes)
        elapsed = time.time() - start
        
        # キーポイントからメーター値を算出
        detections = result.get('detections', [])
        meter_value = None
        
        if detections:
            best_detection = detections[0]  # 最も信頼度の高い検出
            kps = best_detection['keypoints']
            
            # キーポイントが全て揃っているか確認
            required_kps = ['needle_tip', 'needle_center', 'scale_min', 'scale_max']
            if all(k in kps for k in required_kps):
                meter_value = calculate_meter_value(
                    kps,
                    meter['scale_min_value'],
                    meter['scale_max_value']
                )
        
        inference_results.append({
            's3_key': key,
            'meter_id': meter['meter_id'],
            'meter_type': meter['meter_type'],
            'unit': meter['unit'],
            'detection_count': len(detections),
            'detection_confidence': detections[0]['confidence'] if detections else 0,
            'meter_value': meter_value,
            'elapsed_sec': elapsed
        })
        
        status = f'value={meter_value["value"]}' if meter_value else 'no detection'
        print(f'[{i+1}/{len(test_images)}] {meter["meter_type"]}: {status} ({elapsed:.2f}s)')
        
    except Exception as e:
        errors.append({'s3_key': key, 'error': str(e)})
        print(f'[{i+1}/{len(test_images)}] ERROR: {e}')

print(f'\n完了: 成功={len(inference_results)}, エラー={len(errors)}')

## 5. Bedrock読み取り結果との比較

In [None]:
def get_bedrock_readings(meter_id, s3_key):
    """MeterReadingテーブルから既存のBedrock読み取り結果を取得"""
    table = dynamodb.Table('MeterReading')
    
    # s3_keyが一致するレコードを探す
    # MeterReadingテーブルのPKはmeter_id, SKはtimestamp
    response = table.query(
        KeyConditionExpression='meter_id = :mid',
        FilterExpression='s3_key = :key',
        ExpressionAttributeValues={
            ':mid': meter_id,
            ':key': s3_key
        },
        ScanIndexForward=False,
        Limit=1
    )
    
    items = response.get('Items', [])
    if items:
        item = items[0]
        return {
            'value': float(item.get('value', 0)),
            'confidence_score': float(item.get('confidence_score', 0)),
            'status': item.get('status', 'unknown')
        }
    return None


# Bedrock結果との比較
comparisons = []

for result in inference_results:
    if not result['meter_value']:
        continue
    
    bedrock_reading = get_bedrock_readings(result['meter_id'], result['s3_key'])
    
    if bedrock_reading and bedrock_reading['status'] == 'success':
        yolo_value = result['meter_value']['value']
        bedrock_value = bedrock_reading['value']
        
        # 差分計算
        abs_diff = abs(yolo_value - bedrock_value)
        
        comparisons.append({
            's3_key': result['s3_key'],
            'meter_type': result['meter_type'],
            'yolo_value': yolo_value,
            'bedrock_value': bedrock_value,
            'abs_diff': abs_diff,
            'yolo_confidence': result['detection_confidence'],
            'bedrock_confidence': bedrock_reading['confidence_score']
        })

print(f'比較可能データ数: {len(comparisons)}')

In [None]:
# 比較結果テーブル
if comparisons:
    print(f'{"Meter Type":<15} {"YOLO":<10} {"Bedrock":<10} {"Diff":<10} {"YOLO Conf":<10} {"BR Conf":<10}')
    print('-' * 65)
    
    for c in comparisons:
        print(f'{c["meter_type"]:<15} {c["yolo_value"]:<10.2f} {c["bedrock_value"]:<10.2f} '
              f'{c["abs_diff"]:<10.2f} {c["yolo_confidence"]:<10.3f} {c["bedrock_confidence"]:<10.3f}')
    
    # サマリー統計
    avg_diff = sum(c['abs_diff'] for c in comparisons) / len(comparisons)
    max_diff = max(c['abs_diff'] for c in comparisons)
    avg_yolo_conf = sum(c['yolo_confidence'] for c in comparisons) / len(comparisons)
    avg_br_conf = sum(c['bedrock_confidence'] for c in comparisons) / len(comparisons)
    
    print(f'\n=== サマリー ===')
    print(f'平均絶対差: {avg_diff:.3f}')
    print(f'最大絶対差: {max_diff:.3f}')
    print(f'YOLO平均信頼度: {avg_yolo_conf:.3f}')
    print(f'Bedrock平均信頼度: {avg_br_conf:.3f}')
else:
    print('比較データがありません。Bedrockの読み取り結果がMeterReadingテーブルに存在するか確認してください。')

## 6. レイテンシ分析

In [None]:
if inference_results:
    latencies = [r['elapsed_sec'] for r in inference_results]
    
    print('=== レイテンシ分析 ===')
    print(f'サンプル数: {len(latencies)}')
    print(f'平均: {sum(latencies)/len(latencies):.2f}秒')
    print(f'最小: {min(latencies):.2f}秒')
    print(f'最大: {max(latencies):.2f}秒')
    print(f'中央値: {sorted(latencies)[len(latencies)//2]:.2f}秒')
    
    # コールドスタートの影響（最初のリクエスト）
    print(f'\n初回リクエスト: {latencies[0]:.2f}秒')
    if len(latencies) > 1:
        warm_latencies = latencies[1:]
        print(f'2回目以降の平均: {sum(warm_latencies)/len(warm_latencies):.2f}秒')

## 7. 検出率分析

In [None]:
if inference_results:
    total = len(inference_results)
    detected = sum(1 for r in inference_results if r['detection_count'] > 0)
    valued = sum(1 for r in inference_results if r['meter_value'] is not None)
    
    print('=== 検出率分析 ===')
    print(f'総画像数: {total}')
    print(f'メーター検出: {detected} ({detected/total*100:.1f}%)')
    print(f'値算出成功: {valued} ({valued/total*100:.1f}%)')
    
    # メーター種別ごと
    for mt in TARGET_TYPES:
        mt_results = [r for r in inference_results if r['meter_type'] == mt]
        if mt_results:
            mt_detected = sum(1 for r in mt_results if r['detection_count'] > 0)
            mt_valued = sum(1 for r in mt_results if r['meter_value'] is not None)
            print(f'\n{mt}:')
            print(f'  画像数: {len(mt_results)}')
            print(f'  検出率: {mt_detected/len(mt_results)*100:.1f}%')
            print(f'  値算出率: {mt_valued/len(mt_results)*100:.1f}%')

## 8. 結果の保存

In [None]:
# テスト結果をJSON形式で保存
test_report = {
    'test_date': datetime.now().isoformat(),
    'endpoint_name': ENDPOINT_NAME,
    'total_images': len(test_images),
    'successful_inferences': len(inference_results),
    'errors': len(errors),
    'inference_results': inference_results,
    'comparisons': comparisons,
    'error_details': errors
}

# ローカル保存
report_path = Path('/tmp/inference_test_report.json')
report_path.write_text(json.dumps(test_report, indent=2, default=str))

# S3に保存
timestamp = datetime.now().strftime('%Y%m%d-%H%M%S')
s3.put_object(
    Bucket=SAGEMAKER_BUCKET,
    Key=f'yolo-pose/test-reports/{timestamp}/report.json',
    Body=json.dumps(test_report, indent=2, default=str).encode('utf-8')
)

print(f'テストレポート保存完了:')
print(f'  ローカル: {report_path}')
print(f'  S3: s3://{SAGEMAKER_BUCKET}/yolo-pose/test-reports/{timestamp}/report.json')

## 9. サマリー

In [None]:
print('=' * 60)
print('推論テスト サマリー')
print('=' * 60)
print(f'エンドポイント: {ENDPOINT_NAME}')
print(f'テスト画像数: {len(test_images)}')
print(f'推論成功: {len(inference_results)}')
print(f'推論エラー: {len(errors)}')
if inference_results:
    valued = sum(1 for r in inference_results if r['meter_value'] is not None)
    print(f'値算出成功: {valued}')
if comparisons:
    avg_diff = sum(c['abs_diff'] for c in comparisons) / len(comparisons)
    print(f'Bedrock比較可能: {len(comparisons)}')
    print(f'平均絶対差: {avg_diff:.3f}')
print('=' * 60)
print()
print('次のアクション:')
print('  1. 精度が十分 → Lambda統合を進める（meter-integrationの改修）')
print('  2. 精度不足 → アノテーションデータの見直し・学習データ追加')
print('  3. レイテンシ問題 → メモリサイズ調整・モデル最適化')

## エンドポイント削除（テスト完了後）

不要なコストを避けるため、テスト完了後にエンドポイントを削除する場合は以下を実行してください。

**注意**: サーバーレスエンドポイントはリクエストがない間は課金されませんが、不要であれば削除してください。

In [None]:
# エンドポイント削除（必要な場合のみ実行）

# sm_client = boto3.client('sagemaker')
# sm_client.delete_endpoint(EndpointName=ENDPOINT_NAME)
# sm_client.delete_endpoint_config(EndpointConfigName='facteye-meter-keypoint-config')
# sm_client.delete_model(ModelName='facteye-meter-keypoint-model')
# print('エンドポイント削除完了')