<a href="https://colab.research.google.com/github/lapinss/git-practice/blob/main/%E6%B8%85%E6%8E%83%E6%A4%9C%E7%9F%A5%E7%B5%90%E6%9E%9C%E3%82%92%E5%87%A6%E7%90%86%E3%81%99%E3%82%8BLambda%E9%96%A2%E6%95%B0.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
import os
import json
import logging
import datetime
from decimal import Decimal
import boto3
import psycopg2
import psycopg2.extras
import pytz

# =============================================================================
# 初期設定
# =============================================================================

# ロガー設定
logger = logging.getLogger()
logger.setLevel(logging.INFO)

# タイムゾーン設定
JST = pytz.timezone('Asia/Tokyo')

# 環境変数から設定値を取得
# Lambdaの環境変数に以下のキーで設定してください
DB_HOST = os.environ.get('DB_HOST')
DB_PORT = os.environ.get('DB_PORT', '5432')
DB_NAME = os.environ.get('DB_NAME')
DB_USER = os.environ.get('DB_USER')
DB_PASSWORD = os.environ.get('DB_PASSWORD')
DETECT_TABLE_NAME = os.environ.get('DETECT_TABLE_NAME', 'monitor_detect')
PHOTO_GSI_NAME = os.environ.get('PHOTO_GSI_NAME', 'PhotoIndex') # photoによる重複チェック用のGSI名

# =============================================================================
# データベース接続 (グローバルスコープで定義し接続を再利用)
# =============================================================================

# Aurora (PostgreSQL) への接続
db_connection = None

def get_db_connection():
    """Aurora DBへの接続を取得または作成する"""
    global db_connection
    try:
        if db_connection is None or db_connection.closed:
            logger.info("Aurora DBへの新しい接続を初期化します。")
            db_connection = psycopg2.connect(
                host=DB_HOST,
                port=DB_PORT,
                dbname=DB_NAME,
                user=DB_USER,
                password=DB_PASSWORD
            )
        return db_connection
    except psycopg2.Error as e:
        logger.error(f"データベース接続に失敗しました: {e}")
        raise

# DynamoDBへの接続
try:
    dynamodb = boto3.resource('dynamodb')
    detect_table = dynamodb.Table(DETECT_TABLE_NAME)
except Exception as e:
    logger.error(f"DynamoDBへの接続に失敗しました: {e}")
    raise

# =============================================================================
# ヘルパー関数 (DBからのデータ取得)
# =============================================================================

def get_terminal_and_shop_info(cursor, serial_no):
    """
    シリアル番号から端末と店舗の情報を取得する。
    Djangoの: Terminal.objects.filter(serial_no=serial_no).select_related("shop").first() に相当。
    """
    sql = """
        SELECT
            t.terminal_id, t.is_active AS terminal_is_active,
            s.shop_id, s.is_active AS shop_is_active
        FROM shop_terminal t
        JOIN shop_shop s ON t.shop_id = s.shop_id
        WHERE t.serial_no = %s;
    """
    cursor.execute(sql, (serial_no,))
    return cursor.fetchone()

def get_camera_info(cursor, terminal_id, camera_no):
    """
    端末IDとカメラ番号からカメラ情報を取得する。
    Djangoの: Camera.objects.filter(terminal=terminal, camera_no=camera_no).first() に相当。
    """
    sql = "SELECT camera_id, is_active FROM shop_camera WHERE terminal_id = %s AND camera_no = %s;"
    cursor.execute(sql, (terminal_id, camera_no))
    return cursor.fetchone()

def get_ai_model_info(cursor, model_name, model_version):
    """
    AIモデル名とバージョンからAIモデル情報を取得する。
    Djangoの: AIModel.objects.filter(...).first() に相当。
    """
    # AIModelTypeテーブルの存在を仮定
    sql = """
        SELECT m.ai_model_id
        FROM device_ai_model m
        JOIN device_aimodeltype mt ON m.ai_model_type_id = mt.ai_model_type_id
        WHERE mt.ai_model_type = %s AND m.ai_model_version = %s;
    """
    cursor.execute(sql, (model_name, model_version))
    return cursor.fetchone()

def get_or_create_cleaning_status(cursor, cleaning_status_name):
    """
    清掃ステータスを取得または作成する。
    Djangoの: CleaningStatus.objects.get_or_create(...) に相当。
    """
    # まず存在確認
    sql_select = "SELECT cleaning_id FROM monitor_cleaning_status WHERE cleaning_status = %s;"
    cursor.execute(sql_select, (cleaning_status_name,))
    result = cursor.fetchone()
    if result:
        return result['cleaning_id'], False # 取得成功

    # 存在しない場合は作成
    # cleaning_nameはcleaning_statusと同じ値をデフォルトで設定
    sql_insert = "INSERT INTO monitor_cleaning_status (cleaning_status, cleaning_name) VALUES (%s, %s) RETURNING cleaning_id;"
    try:
        cursor.execute(sql_insert, (cleaning_status_name, cleaning_status_name))
        new_id = cursor.fetchone()['cleaning_id']
        return new_id, True # 作成成功
    except psycopg2.IntegrityError:
        # 挿入が競合した場合（ほぼ同時に他のプロセスが挿入した場合）、再度SELECTを実行
        cursor.connection.rollback() # トランザクションをリセット
        cursor.execute(sql_select, (cleaning_status_name,))
        result = cursor.fetchone()
        if result:
            return result['cleaning_id'], False
        else:
            # 予期せぬエラー
            logger.error(f"{cleaning_status_name}の取得・作成に失敗しました。")
            raise

def photo_exists_in_dynamodb(image_file):
    """
    DynamoDBに同じ写真ファイル名のレコードが存在するか確認する。
    Djangoの: Detect.objects.filter(photo=image_file).exists() に相当。
    この機能には、DynamoDBテーブルに'photo'をキーとするGSI(Global Secondary Index)が必要です。
    """
    if not image_file:
        return False
    try:
        response = detect_table.query(
            IndexName=PHOTO_GSI_NAME,
            KeyConditionExpression=boto3.dynamodb.conditions.Key('photo').eq(image_file),
            Select='COUNT'
        )
        return response.get('Count', 0) > 0
    except Exception as e:
        # GSIが存在しない場合などのエラーを捕捉
        logger.error(f"DynamoDBでの写真存在チェックに失敗しました ({image_file}): {e}")
        # エラー時は安全のため「存在する」とみなし、重複登録を防ぐ
        return True

# =============================================================================
# Lambdaハンドラ
# =============================================================================

def lambda_handler(event, context):
    """
    メインの処理関数。
    """
    logger.info(f"★受信イベント: {event}")

    # Djangoのrequest.dataはリスト形式なので、eventがリストでない場合はリストに変換
    request_data = event if isinstance(event, list) else [event]

    conn = None
    items_to_write = []

    try:
        conn = get_db_connection()
        # 辞書形式で結果を取得するためのカーソル
        cursor = conn.cursor(cursor_factory=psycopg2.extras.DictCursor)

        for data in request_data:
            # --- 撮影日時 ---
            shot_time_iso = data.get("shot_time")
            if not shot_time_iso:
                logger.error(f"shot_timeがありません。data={data}")
                continue # 次のデータへ
            shot_dt = datetime.datetime.fromisoformat(shot_time_iso).astimezone(JST)

            # --- 端末情報 ---
            serial_no = data.get("term", {}).get("serial_no")
            if not serial_no:
                logger.error(f"serial_noがありません。data={data}")
                continue

            term_info = get_terminal_and_shop_info(cursor, serial_no)
            if not term_info:
                logger.error(f"シリアル番号に紐づく端末が見つかりません。serial_no={serial_no}, data={data}")
                continue

            if not term_info['terminal_is_active']:
                logger.warning(f"端末がアクティブではありません。terminal_id={term_info['terminal_id']}")
                continue

            shop_id = term_info['shop_id']
            if not term_info['shop_is_active']:
                logger.warning(f"店舗がアクティブではありません。shop_id={shop_id}")
                continue

            terminal_id = term_info['terminal_id']

            # --- カメラ情報 ---
            camera_inf = data.get("camera_inf", {})
            camera_no = camera_inf.get("camera_no")
            if camera_no is None: # camera_noが0の場合もあるためis Noneでチェック
                logger.error(f"camera_noがありません。data={data}")
                continue

            cam_info = get_camera_info(cursor, terminal_id, camera_no)
            if not cam_info:
                logger.error(f"カメラが見つかりません。terminal_id={terminal_id}, camera_no={camera_no}, data={data}")
                continue

            if not cam_info['is_active']:
                logger.warning(f"カメラがアクティブではありません。camera_id={cam_info['camera_id']}")
                continue

            camera_id = cam_info['camera_id']

            # --- 判定結果 (results) ---
            results = data.get("results", [])
            for result in results:
                image_file = result.get("image_file")
                cleaning = result.get("cleaning")
                cleaning_score = result.get("cleaning_score")

                if not all([image_file, cleaning, cleaning_score is not None]):
                    logger.error(f"判定結果の必須項目が不足しています。data={data}, result={result}")
                    continue

                # --- AIモデル特定 ---
                model_name = result.get("cleaning_ai_model_name", "")
                model_version = result.get("cleaning_ai_version", "")
                ai_model_id = None
                if model_name and model_version:
                    ai_model_info = get_ai_model_info(cursor, model_name, model_version)
                    if ai_model_info:
                        ai_model_id = ai_model_info['ai_model_id']
                    else:
                        logger.warning(f"AIモデルが見つかりません。name={model_name}, version={model_version}")

                # --- 清掃ステータス特定/作成 ---
                try:
                    cleaning_status_id, _ = get_or_create_cleaning_status(cursor, cleaning)
                    conn.commit() # get_or_createでINSERTが発生した場合にコミット
                except Exception as e:
                    logger.error(f"CleaningStatusの取得/作成に失敗しました: {e}")
                    conn.rollback() # エラーが発生した場合はロールバック
                    continue

                # --- 重複チェック & 書き込みリストへの追加 ---
                if not photo_exists_in_dynamodb(image_file):
                    # DynamoDBのスキーマに合わせてアイテムを作成
                    item = {
                        'Date': shot_dt.strftime('%Y%m%d'),
                        'ShopCameraTime': f"{shop_id}_{camera_id}_{shot_dt.strftime('%H%M%S%f')}",
                        'detect_datetime': shot_dt.isoformat(),
                        'shop_id': shop_id,
                        'camera_id': camera_id,
                        'photo': image_file,
                        'cleaning_status_id': cleaning_status_id,
                        'cleaning_score': Decimal(str(cleaning_score)), # floatはDecimalに変換
                        'is_active': True,
                        'memo': "",
                        'created_at': datetime.datetime.now(JST).isoformat() # 監査用の作成日時
                    }
                    if ai_model_id is not None:
                        item['cleaning_ai_model_id'] = ai_model_id

                    items_to_write.append(item)
                else:
                    logger.info(f"写真が既に存在するためスキップします: {image_file}")

        # --- DynamoDBへの一括書き込み ---
        if items_to_write:
            logger.info(f"{len(items_to_write)} 件のデータをDynamoDBに書き込みます。")
            with detect_table.batch_writer() as batch:
                for item in items_to_write:
                    batch.put_item(Item=item)
            logger.info("書き込みが完了しました。")

        return {"result": True, "processed_count": len(items_to_write)}

    except Exception as e:
        logger.exception(e)
        if conn:
            conn.rollback()
        # 元のAPIと同様のレスポンスを返す
        return {"result": False, "error": str(e)}

    finally:
        if conn:
            conn.close()
            logger.info("データベース接続をクローズしました。")