# 준비



In [25]:
!pip install requests numpy opencv-python boto3 ultralytics pandas torch

import torch
import requests
import numpy as np
import cv2
import yaml
import time
import boto3
import uuid
import pandas as pd
from datetime import datetime
from google.colab import drive
from google.colab import userdata
from ultralytics import YOLO
from concurrent.futures import ThreadPoolExecutor, as_completed
import json
from decimal import Decimal
import copy


# Google Drive 마운트
drive.mount('/content/drive')

# 환경 변수 설정
cctv_api_key = userdata.get('CCTV_API_KEY')
aws_region = userdata.get('AWS_REGION')
aws_access_key_id = userdata.get('AWS_ACCESS_KEY_ID')
aws_secret_access_key = userdata.get('AWS_SECRET_ACCESS_KEY')

dynamodb = boto3.resource(
    'dynamodb',
    region_name=aws_region,
    aws_access_key_id=aws_access_key_id,
    aws_secret_access_key=aws_secret_access_key
)

lambda_client = boto3.client(
    'lambda',
    region_name=aws_region, # aws_region 변수를 사용하는 것이 더 좋습니다.
    aws_access_key_id=aws_access_key_id,
    aws_secret_access_key=aws_secret_access_key
)

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


In [26]:
# --- User Table 가져오기 ---
def get_member_region_from_dynamodb(table_name='Member'):
  try:
      member_table = dynamodb.Table(table_name)
      unique_addresses = set()
      response = member_table.scan(
          ProjectionExpression='address'
      )
      items = response['Items']

      for item in items:
          if 'address' in item:
              unique_addresses.add(item['address'])

      while 'LastEvaluatedKey' in response:
          response = member_table.scan(
              ProjectionExpression='address',
              ExclusiveStartKey=response['LastEvaluatedKey']
          )
          items = response['Items']
          for item in items:
              if 'address' in item:
                  unique_addresses.add(item['address'])

      if unique_addresses: # 리스트 변환
          sorted_regions = sorted(list(unique_addresses))
          return sorted_regions
      else:
          print("member_table: 조회한 내역이 없습니다.")
  except Exception as e:
    print(e)

# --- region_table 가져오기 ---
def get_region_status_from_dynamodb(table_name='Region'):
  try:
    region_table = dynamodb.Table(table_name)
    all_items = []
    response = region_table.scan()
    all_items.extend(response.get('Items', []))

    while 'LastEvaluatedKey' in response:
        response = region_table.scan(ExclusiveStartKey=response['LastEvaluatedKey'])
        all_items.extend(response.get('Items', []))

    region_status_data = {}
    for item in all_items:
        region = item.get('region')
        status = item.get('status')
        timestamp = item.get('timestamp')

        if isinstance(timestamp, Decimal):
            timestamp = int(timestamp)

        if region and status is not None:
            region_status_data[region] = {'status': status, 'timestamp': timestamp}
    if region_status_data:
      return region_status_data
    else:
      print("region_table: 조회한 내역이 없습니다.")

  except Exception as e:
    print(e)

# --- CCTV Table 가져오기 ---
def get_cctv_items_from_dynamodb(table_name='Cctv'):
    cctv_table = dynamodb.Table(table_name)
    all_items = []

    # DynamoDB Scan (5000개 데이터이므로 페이지네이션 처리 필수)
    response = cctv_table.scan()
    all_items.extend(response.get('Items', []))
    while 'LastEvaluatedKey' in response:
        response = cctv_table.scan(ExclusiveStartKey=response['LastEvaluatedKey'])
        all_items.extend(response.get('Items', []))

    cctv_data_cache = {}
    for item in all_items:
        cctv_id = item.get('id')
        if cctv_id:
            if isinstance(cctv_id, Decimal):
              cctv_id = int(cctv_id)
            item_status = item.get('status')
            item_region = item.get('name')
            item_cctvurl = item.get('cctvUrl')
            if item_status is not None:
              item['status'] = bool(item_status) # 명시적으로 boolean으로 변환

            cctv_data_cache[cctv_id] = {'status': item_status, 'region': item_region, 'url':item_cctvurl}

    if cctv_data_cache:
      return cctv_data_cache
    else:
      print("CCTV Table: 조회한 내역이 없습니다.")

members_result = get_member_region_from_dynamodb()
regions_result = get_region_status_from_dynamodb()
cctvs_result = get_cctv_items_from_dynamodb()

In [27]:
print(members_result)
print(regions_result)
print(cctvs_result)

['서울특별시 강남구']
{'서울특별시 강남구': {'status': False, 'timestamp': 0}, '서울특별시 강동구': {'status': False, 'timestamp': 0}, '서울특별시 강서구': {'status': False, 'timestamp': 0}, '경기도 안성시': {'status': False, 'timestamp': 0}, '서울특별시 송파구': {'status': False, 'timestamp': 0}}
{'3635': {'status': False, 'region': '경상북도 안동시', 'url': 'http://cctvsec.ktict.co.kr/2277/W17spigfEvEpmYYDKVVNzw5crrPUmEya7IXvIce/j7/Ii7tp4oV6TZ9VOGR1ABIO4JIxLAdyqahGM87g0+bxQgyUmEHKlyOgdq9BCKVoTOM='}, '228': {'status': False, 'region': '전라남도 장성군', 'url': 'http://cctvsec.ktict.co.kr/8434/1kv4ybMYOIGyADuQ7nCw5hJLn6Ko/etvs3GGa0ht3tYoq5x++jKbRDCNEQ6/NPgguCF9JcjQ3H8asZgaMIe1q07G84SKkaQdzHdBIYg5hxI='}, '1668': {'status': False, 'region': '대전광역시 동구', 'url': 'http://cctvsec.ktict.co.kr/3343/QLo6wnU2D704lpBef2hHHRqQcTgntTYm6ck5BPKL2koOBziKoWUDfUYqYG/SNMowqDwIjlyeaaoXJDvoSvVRPeFCpfY517kX9KfFoMa+yjo='}, '4274': {'status': False, 'region': '경상남도 양산시', 'url': 'http://cctvsec.ktict.co.kr/631/pM5NvOxbHPdE3ZUPCpBPMV6jEEXUDiBLLy/BvJqZjVoUxxqcpBVw6CY5uLDi

In [28]:
## --- Lambda 함수 호출 함수 ---

# --- fcm 람다 호출 함수 ---
def invoke_lambda_fcm(region="불명", status=False):
    # 람다 함수에 전달할 페이로드 (JSON 형식)
    if region == "불명":
        return
    payload = {
        'region' : region,
        'status' : status
    }
    try:
        response = lambda_client.invoke(
            FunctionName='disaster-fcm', # Lambda 함수 이름
            InvocationType='Event', # 비동기 호출
            Payload=json.dumps(payload)
        )
        print(f"  [Lambda 호출] 지역: {region} 람다 함수 호출 완료. 상태 코드: {response['StatusCode']}")
        if response['StatusCode'] != 202: # Event 타입 호출 시 202 (Accepted)가 정상
             print(f"  [Lambda 경고] 람다 호출 상태 코드 비정상: {response['StatusCode']}")
    except Exception as e:
        print(f"  [Lambda 오류] 람다 함수 호출 실패: {e}")

# --- sns 람다 호출 함수 ---
def invoke_lambda_sns(region="불명", status=False):
    if region == "불명":
        return
    payload = {
        'region' : region,
        'status' : status
    }
    try:
        response = lambda_client.invoke(
            FunctionName='disaster-sns',
            InvocationType='Event',
            Payload=json.dumps(payload)
        )
        if response['StatusCode'] != 202:
             print(f"  [Lambda 경고] 람다 호출 상태 코드 비정상: {response['StatusCode']}")
    except Exception as e:
        print(f"  [Lambda 오류] 람다 함수 호출 실패: {e}")

# --- region db 변경 람다 호출 함수 ---
def invoke_lambda_region(region="불명", status=False):
    if region == "불명":
        return

    payload = {
        'region' : region,
        'status' : status
    }
    try:
        response = lambda_client.invoke(
            FunctionName='disaster-region-db',
            InvocationType='Event',
            Payload=json.dumps(payload)
        )
        if response['StatusCode'] != 202:
             print(f"  [Lambda 경고] 람다 호출 상태 코드 비정상: {response['StatusCode']}")
    except Exception as e:
        print(f"  [Lambda 오류] 람다 함수 호출 실패: {e}")

# --- cctv db 변경 람다 호출 함수 ---
def invoke_lambda_cctv(id="불명", status=False):
    if id == "불명":
        return
    payload = {
        'id' : id,
        'status' : status
    }
    try:
        response = lambda_client.invoke(
            FunctionName='disaster-cctv-db',
            InvocationType='Event',
            Payload=json.dumps(payload)
        )
        if response['StatusCode'] != 202:
             print(f"  [Lambda 경고] 람다 호출 상태 코드 비정상: {response['StatusCode']}")
    except Exception as e:
        print(f"  [Lambda 오류] 람다 함수 호출 실패: {e}")

# YOLOv11

In [None]:
MODEL = YOLO('/content/drive/MyDrive/Colab_Notebooks/models/best_nano_111.pt')
# MODEL.to('cuda') #GPU 있을 때만 사용 가능

In [None]:
def load_cctv_config(path="/content/drive/MyDrive/Colab_Notebooks/cctv_info.csv", regions=None):
    # CSV 파일 읽기
    df = pd.read_csv(path, encoding='euc-kr')  # 한글이 포함되어 있으니 euc-kr 사용

    # 지역 필터링이 없으면 전체 반환
    if regions is None or len(regions) == 0:
        return df.to_dict(orient='records')

    # 해당 지역만 필터링
    filtered_df = df[df['region'].isin(regions)]

    return filtered_df.to_dict(orient='records')

# 재난 탐지
def detect_disaster(cctv):
    try:
        cap = cv2.VideoCapture(cctv["cctvurl"])
        if not cap.isOpened():
            print(f"[{cctv['region']}] {cctv['cctvname']}: 스트리밍 열기 실패")
            return (cctv["cctvname"], False)
        ret, frame = cap.read()
        cap.release()
        if not ret or frame is None:
            print(f"[{cctv['region']}] {cctv['cctvname']}: 프레임 수신 실패")
            return (cctv["cctvname"], False)
    except Exception as e:
        print(f"[{cctv['region']}] {cctv['cctvname']}: 예외 발생 - {e}")
        return (cctv["cctvname"], False)

    results = MODEL(frame)
    predictions = results[0].boxes
    return (cctv["cctvname"], len(predictions) > 0)

# DynamoDB에 기록
def save_to_dynamodb(cctv_id, region, is_disaster):
    dynamodb = boto3.resource("dynamodb", region_name="ap-northeast-2")
    table = dynamodb.Table("DisasterDetectionResults")
    result = {
        "id": str(uuid.uuid4()),
        "timestamp": datetime.utcnow().isoformat(),
        "cctv_id": cctv_id,
        "region": region,
        "detected": is_disaster,
    }
    table.put_item(Item=result)

def monitor_cctvs(cctvs, interval=5, max_workers=10):
    while True:
        round_start = time.time()
        true_cnt = 0
        print(f"--- {datetime.now().strftime('%H:%M:%S')} | {len(cctvs)}개 CCTV 감지 시작 ---")

        with ThreadPoolExecutor(max_workers=max_workers) as executor:
            futures = {executor.submit(detect_disaster, cctv): cctv for cctv in cctvs}

            for future in as_completed(futures):
                cctv = futures[future]
                try:
                    name, detected = future.result()
                    print(f"[{cctv['region']}] {name}: 감지 결과 = {detected}")
                    # save_to_dynamodb(cctv['cctvname'], cctv['region'], detected)  # 나중에 저장
                    if detected:
                        true_cnt += 1
                except Exception as e:
                    print(f"[{cctv['region']}] {cctv['cctvname']}: 결과 처리 중 예외 - {e}")

        print(f"✔ 탐지: {true_cnt}")
        elapsed = time.time() - round_start
        wait_time = max(0, interval - elapsed)
        print(f"총 소요 시간: {elapsed:.2f}초 | 다음 감지까지 대기 시간: {wait_time:.2f}초\n")
        time.sleep(wait_time)

# YOLOv8n

In [None]:
MODEL = YOLO('/content/drive/MyDrive/Colab_Notebooks/models/yolov8n.pt')  # yolov8n.pt 또는 fine-tuned model
MODEL.to('cuda') #GPU 있을 때만 사용 가능

YOLO(
  (model): DetectionModel(
    (model): Sequential(
      (0): Conv(
        (conv): Conv2d(3, 16, kernel_size=(3, 3), stride=(2, 2), padding=(1, 1), bias=False)
        (bn): BatchNorm2d(16, eps=0.001, momentum=0.03, affine=True, track_running_stats=True)
        (act): SiLU(inplace=True)
      )
      (1): Conv(
        (conv): Conv2d(16, 32, kernel_size=(3, 3), stride=(2, 2), padding=(1, 1), bias=False)
        (bn): BatchNorm2d(32, eps=0.001, momentum=0.03, affine=True, track_running_stats=True)
        (act): SiLU(inplace=True)
      )
      (2): C2f(
        (cv1): Conv(
          (conv): Conv2d(32, 32, kernel_size=(1, 1), stride=(1, 1), bias=False)
          (bn): BatchNorm2d(32, eps=0.001, momentum=0.03, affine=True, track_running_stats=True)
          (act): SiLU(inplace=True)
        )
        (cv2): Conv(
          (conv): Conv2d(48, 32, kernel_size=(1, 1), stride=(1, 1), bias=False)
          (bn): BatchNorm2d(32, eps=0.001, momentum=0.03, affine=True, track_running_s

In [None]:
# CCTV config 로딩
def load_cctv_config(path="/content/drive/MyDrive/Colab_Notebooks/cctv_info.csv", regions=None):
    df = pd.read_csv(path, encoding='euc-kr')
    if regions is None or len(regions) == 0:
        return df.to_dict(orient='records')
    return df[df['region'].isin(regions)].to_dict(orient='records')

# 화재/연기 감지
def detect_disaster(cctv):
    try:
        cap = cv2.VideoCapture(cctv["cctvurl"])
        if not cap.isOpened():
            print(f"[{cctv['region']}] {cctv['cctvname']}: 스트리밍 열기 실패")
            return (cctv["cctvname"], False)
        ret, frame = cap.read()
        cap.release()
        if not ret or frame is None:
            print(f"[{cctv['region']}] {cctv['cctvname']}: 프레임 수신 실패")
            return (cctv["cctvname"], False)
    except Exception as e:
        print(f"[{cctv['region']}] {cctv['cctvname']}: 예외 발생 - {e}")
        return (cctv["cctvname"], False)

    # YOLOv8 추론 (confidence 조정 가능)
    results = MODEL.predict(source=frame, conf=0.7, verbose=False)
    predictions = results[0].boxes
    return (cctv["cctvname"], len(predictions) > 0)

# 주기적 실행
def monitor_cctvs(cctvs, interval=5, max_workers=10):
    while True:
        round_start = time.time()
        true_cnt = 0
        print(f"--- {datetime.now().strftime('%H:%M:%S')} | {len(cctvs)}개 CCTV 감지 시작 ---")

        with ThreadPoolExecutor(max_workers=max_workers) as executor:
            futures = {executor.submit(detect_disaster, cctv): cctv for cctv in cctvs}
            for future in as_completed(futures):
                cctv = futures[future]
                try:
                    name, detected = future.result()
                    print(f"[{cctv['region']}] {name}: 감지 결과 = {detected}")
                    # save_to_dynamodb(cctv['cctvname'], cctv['region'], detected)  # 나중에 저장
                    if detected:
                        true_cnt += 1
                except Exception as e:
                    print(f" - [{cctv['region']}] {cctv['cctvname']}: 에러 = {e}")

        print(f"✔ 탐지: {true_cnt}")
        elapsed = time.time() - round_start
        wait_time = max(0, interval - elapsed)
        print(f"총 소요 시간: {elapsed:.2f}초 | 다음 감지까지 대기 시간: {wait_time:.2f}초\n")
        time.sleep(wait_time)

# YOLOv5s

In [44]:
members_address = copy.deepcopy(members_result)
region_status_cache = copy.deepcopy(regions_result)
cctv_data_cache = copy.deepcopy(cctvs_result)

# for i in range(1, 8):
#     cctv_data_cache[i] = {'status': False}

print(members_address)
print(region_status_cache)
print(cctv_data_cache)

['서울특별시 강남구']
{'서울특별시 강남구': {'status': False, 'timestamp': 0}, '서울특별시 강동구': {'status': False, 'timestamp': 0}, '서울특별시 강서구': {'status': False, 'timestamp': 0}, '경기도 안성시': {'status': False, 'timestamp': 0}, '서울특별시 송파구': {'status': False, 'timestamp': 0}}
{'3635': {'status': False, 'region': '경상북도 안동시', 'url': 'http://cctvsec.ktict.co.kr/2277/W17spigfEvEpmYYDKVVNzw5crrPUmEya7IXvIce/j7/Ii7tp4oV6TZ9VOGR1ABIO4JIxLAdyqahGM87g0+bxQgyUmEHKlyOgdq9BCKVoTOM='}, '228': {'status': False, 'region': '전라남도 장성군', 'url': 'http://cctvsec.ktict.co.kr/8434/1kv4ybMYOIGyADuQ7nCw5hJLn6Ko/etvs3GGa0ht3tYoq5x++jKbRDCNEQ6/NPgguCF9JcjQ3H8asZgaMIe1q07G84SKkaQdzHdBIYg5hxI='}, '1668': {'status': False, 'region': '대전광역시 동구', 'url': 'http://cctvsec.ktict.co.kr/3343/QLo6wnU2D704lpBef2hHHRqQcTgntTYm6ck5BPKL2koOBziKoWUDfUYqYG/SNMowqDwIjlyeaaoXJDvoSvVRPeFCpfY517kX9KfFoMa+yjo='}, '4274': {'status': False, 'region': '경상남도 양산시', 'url': 'http://cctvsec.ktict.co.kr/631/pM5NvOxbHPdE3ZUPCpBPMV6jEEXUDiBLLy/BvJqZjVoUxxqcpBVw6CY5uLDi

In [45]:
import cv2
import time
import pandas as pd
from datetime import datetime
from concurrent.futures import ThreadPoolExecutor, as_completed
import torch
import warnings

warnings.filterwarnings("ignore", category=FutureWarning)

# 모델 로드 (YOLOv5 기준)
MODEL = torch.hub.load("ultralytics/yolov5", "custom", path="/content/drive/MyDrive/Colab_Notebooks/models/yolov5s_best.pt")

# 프레임 수집 (끊김 시 재연결)
def get_frame(cctv_id, cctv_info, cap):
    try:
        ret, frame = cap.read()
        if not ret or frame is None:
            print(f"[{cctv_info['region']}] {cctv_info.get('cctvname', cctv_id)}: 프레임 수신 실패 → 재연결 시도")
            cap.release()
            new_cap = cv2.VideoCapture(cctv_info["url"]) # 'url' 키 사용
            if new_cap.isOpened():
                return (cctv_id, cctv_info, new_cap, None)  # 연결은 성공했지만 frame은 None
            else:
                print(f"[{cctv_info['region']}] {cctv_info.get('cctvname', cctv_id)}: 재연결 실패")
                return (cctv_id, cctv_info, None, None) # 재연결도 실패
        return (cctv_id, cctv_info, cap, frame)
    except Exception as e:
        print(f"[{cctv_info['region']}] {cctv_info.get('cctvname', cctv_id)}: 예외 발생 - {e}")
        return (cctv_id, cctv_info, None, None)

# 감지 함수
def detect_with_model(cctv_id, cctv_info, frame):
    try:
        results = MODEL(frame)
        predictions = results.pandas().xyxy[0]
        return (cctv_id, not predictions.empty)
    except Exception as e:
        print(f"[{cctv_info['region']}] {cctv_info.get('cctvname', cctv_id)}: 감지 실패 - {e}")
        return (cctv_id, False)

Using cache found in /root/.cache/torch/hub/ultralytics_yolov5_master
YOLOv5 🚀 2025-6-18 Python-3.11.13 torch-2.6.0+cu124 CPU

Fusing layers... 
Model summary: 213 layers, 7012822 parameters, 0 gradients, 15.8 GFLOPs
Adding AutoShape... 


In [55]:
def monitor_all_cctvs_by_region_cycle_sequential(
    filtered_cctv_data, # DynamoDB에서 가져온 지역에 해당하는 모든 CCTV 데이터
    sorted_regions,     # DynamoDB에서 가져온 지역 순서
    cycle_interval=60   # 전체 사이클 완료 후 대기 시간 (초)
):
    global_stream_caps = {} # {cctv_id: (cctv_info_dict, VideoCapture_object)}

    # 1. 모든 지정된 CCTV 스트림 초기화 (직렬 방식)
    print("\n--- 모든 지정된 CCTV 스트림 초기화 중 (최초 1회) ---")
    for cctv_id, cctv_info in filtered_cctv_data.items(): # cctv_id와 cctv_info를 함께 가져옴
        cap = cv2.VideoCapture(cctv_info["url"]) # 'url' 키 사용
        if cap and cap.isOpened():
            global_stream_caps[cctv_id] = (cctv_info, cap)
            print(f"[{cctv_info['region']}] {cctv_info.get('cctvname', cctv_id)}: 스트림 열기 성공.")
        else:
            print(f"[{cctv_info['region']}] {cctv_info.get('cctvname', cctv_id)}: 스트림 열기 실패 또는 무효.")
        time.sleep(0.1) # 각 연결 시도 사이에 짧은 지연을 두어 서버 과부하 방지 (선택 사항)

    if not global_stream_caps:
        print("경고: 유효한 CCTV 스트림이 없어 감시를 시작할 수 없습니다.")
        return

    print(f"\n--- 총 {len(global_stream_caps)}개의 지정된 CCTV 스트림이 성공적으로 열렸습니다. ---")

    try:
        # 2. 무한 루프를 돌며 지역별로 순회하며 감시 사이클 반복
        while True:
            full_cycle_start_time = time.time()

            for current_region in sorted_regions:
                print(f"\n{'='*70}")
                print(f"======== {datetime.now().strftime('%H:%M:%S')} | [지역: {current_region}] CCTV 감지 시작 ========")
                print(f"{'='*70}\n")

                # 현재 지역에 해당하는 CCTV 스트림만 global_stream_caps에서 필터링
                cctvs_in_current_region = [
                    (cctv_id, cctv_data, cap_obj) # cctv_id도 함께 필터링
                    for cctv_id, (cctv_data, cap_obj) in global_stream_caps.items()
                    if cctv_data['region'] == current_region
                ]

                if not cctvs_in_current_region:
                    print(f"'{current_region}' 지역에 현재 유효한 CCTV 스트림이 없습니다. 다음 지역으로 넘어갑니다.")
                    continue

                region_detection_start_time = time.time()
                region_overall_status = False # 해당 지역 전체의 재난 감지 여부 (하나라도 감지되면 True)

                print(f"--- [지역: {current_region}] {len(cctvs_in_current_region)}개 CCTV 감지 ---")

                # --- 프레임 가져오기 및 감지 (모두 순차 처리) ---
                # 스트림 객체 업데이트를 위해 복사본 사용 (안전성)
                updated_global_stream_caps_local = global_stream_caps.copy()

                for cctv_id, cctv_info, cap_obj in cctvs_in_current_region:
                    # 1. 프레임 가져오기 (순차)
                    returned_cctv_id, returned_cctv_info, updated_cap, frame = get_frame(cctv_id, cctv_info, cap_obj)

                    # 스트림 객체 업데이트 로직 (끊어졌다가 재연결된 경우)
                    if updated_cap is None: # 스트림이 끊기거나 재연결 실패
                        if returned_cctv_id in updated_global_stream_caps_local:
                            print(f"[{returned_cctv_info['region']}] {returned_cctv_info.get('cctvname', returned_cctv_id)}: 스트림 문제 발생. 전역 목록에서 제거.")
                            updated_global_stream_caps_local.pop(returned_cctv_id)
                    elif updated_cap != global_stream_caps[returned_cctv_id][1]: # 재연결 성공 시 cap 객체 업데이트
                        updated_global_stream_caps_local[returned_cctv_id] = (returned_cctv_info, updated_cap)

                    if frame is not None: # 유효한 프레임만 감지 대상으로 처리
                        # 2. 감지 (모델 추론) (순차)
                        cctv_id_detected, detected = detect_with_model(returned_cctv_id, returned_cctv_info, frame)

                        if cctv_id_detected == '2504': # 예시: '2604' CCTV가 감지되면 True로 설정
                            detected = True
                        else:
                            detected = False

                        print(f"[{returned_cctv_info.get('cctvname', returned_cctv_id)} | {returned_cctv_info['region']}] 감지 결과 = {detected}")

                        # CCTV 캐시의 이전 상태 확인 (안전한 접근)
                        old_status = cctv_data_cache.get(cctv_id_detected, {}).get('status', False)

                        if detected != old_status:
                            if detected:
                                # T != F -> 재난 탐지 (CCTV status 변경 람다 함수 호출)
                                cctv_data_cache[cctv_id_detected]['status'] = True
                                invoke_lambda_cctv(cctv_id_detected, True)
                            else:
                                # F != T -> 재난 종식 (CCTV status 변경 람다 함수 호출)
                                cctv_data_cache[cctv_id_detected]['status'] = False
                                invoke_lambda_cctv(cctv_id_detected, False)
                        else:
                            # 상태가 동일하면 아무것도 하지 않음
                            pass

                        if detected:
                            region_overall_status = True # 해당 지역 내 하나라도 감지되면 지역 전체 상태 True
                    else:
                        print(f"[{returned_cctv_info['region']}] {returned_cctv_info.get('cctvname', returned_cctv_id)}: 유효한 프레임을 받지 못했습니다. 이번 라운드에서는 스킵합니다.")

                # 전역 스트림 캡 딕셔너리 업데이트 반영
                global_stream_caps.clear()
                global_stream_caps.update(updated_global_stream_caps_local)

                # 지역 전체 상태 업데이트 및 람다 호출
                # current_region은 이미 외부 루프에서 올바르게 설정되어 있습니다.
                region_cache_result = region_status_cache.get(current_region, {}).get('status', False)
                if region_overall_status != region_cache_result:
                    if region_overall_status:
                        # T != F (재난 탐지)
                        # 지역 DB 업데이트, FCM, SNS 호출
                        region_status_cache[current_region]['status'] = True
                        invoke_lambda_region(current_region, True)
                        invoke_lambda_fcm(current_region, True)
                        invoke_lambda_sns(current_region, True)
                    else:
                        # F != T (재난 종식)
                        # 지역 DB 업데이트만 호출 (알림은 필요에 따라 정책 변경)
                        region_status_cache[current_region]['status'] = False
                        invoke_lambda_region(current_region, False)
                else:
                    pass # 지역 상태가 동일하면 아무것도 하지 않음

                region_elapsed_time = time.time() - region_detection_start_time
                print(f"--- [지역: {current_region}] 감지 완료. 소요 시간: {region_elapsed_time:.2f}초 ---")
                print(f"최종 결과: {current_region} 상태: {region_overall_status}") # 지역별 최종 상태 출력

            # 모든 지역 순회가 끝난 후 전체 감지 완료 메시지 출력
            total_cycle_elapsed_time = time.time() - full_cycle_start_time
            print(f"\n\n{'='*70}")
            print(f"======== 전체 감지 완료 (소요 시간: {total_cycle_elapsed_time:.2f}초). 다음 감지를 시작합니다. ========")
            print(f"{'='*70}\n\n")

            # 전체 사이클 완료 후 cycle_interval 초 대기
            print(f"다음 사이클 시작 전 {cycle_interval}초 대기...")
            time.sleep(cycle_interval)

    except KeyboardInterrupt:
        print("\n--- 사용자 인터럽트 감지: 프로그램 종료 ---")
    except Exception as e:
        print(f"\n--- 예외 발생: {e} ---")
    finally:
        print("\n--- 모든 CCTV 스트림 해제 중 ---")
        for cctv_id, (cctv_data, cap_obj) in global_stream_caps.items():
            if cap_obj and cap_obj.isOpened():
                cap_obj.release()
                print(f"[{cctv_data['region']}] {cctv_data.get('cctvname', cctv_id)}: 스트림 해제 완료")


# 공통

In [56]:
if __name__ == "__main__":
    filtered_cctv_data = {
        cctv_id: cctv for cctv_id, cctv in cctv_data_cache.items()
        if cctv['region'] in members_address
    }
    print(filtered_cctv_data)
    print(len(filtered_cctv_data))

    if not filtered_cctv_data:
        print("CCTV 정보 로드 실패. 프로그램을 종료합니다.")
    else:
        # 4. 모든 CCTV 스트림을 열고 지역별로 순회하며 감시 시작
        # 이 함수 내에서 모든 스트림을 열고 유지하며, 지역별 라운드를 수행
        monitor_all_cctvs_by_region_cycle_sequential(
            filtered_cctv_data, # 필터링된 전체 CCTV 데이터 전달
            members_address,     # 지역 순서
            cycle_interval=10
            )

{'2604': {'status': False, 'region': '서울특별시 강남구', 'url': 'http://cctvsec.ktict.co.kr/80162/4cYWn9bSHnUhwX0v/CfJiWa08tuHRRXLVGpulUntm5XMWOnvvtVgYyhQeNvfzS4HhyIOTizcHK/TYBZQ/Cj0ka1CzLAwQANwhmUWueKUiG0='}, '2504': {'status': False, 'region': '서울특별시 강남구', 'url': 'http://cctvsec.ktict.co.kr/5766/jyRuRvddft0iB2ZQLxl6fVoBq0Htpmqghz9UPIh9ld8Sr7EitMx87J5JklWVckE7vSoQStBY0RppLR7AgxrDcXcg5PN9SmQEcozfrMgtrCk='}}
2

--- 모든 지정된 CCTV 스트림 초기화 중 (최초 1회) ---
[서울특별시 강남구] 2604: 스트림 열기 성공.
[서울특별시 강남구] 2504: 스트림 열기 성공.

--- 총 2개의 지정된 CCTV 스트림이 성공적으로 열렸습니다. ---


--- [지역: 서울특별시 강남구] 2개 CCTV 감지 ---
[2604 | 서울특별시 강남구] 감지 결과 = False
[2504 | 서울특별시 강남구] 감지 결과 = True
  [Lambda 호출] 지역: 서울특별시 강남구 람다 함수 호출 완료. 상태 코드: 202
--- [지역: 서울특별시 강남구] 감지 완료. 소요 시간: 2.58초 ---
최종 결과: 서울특별시 강남구 상태: True




다음 사이클 시작 전 10초 대기...

--- 사용자 인터럽트 감지: 프로그램 종료 ---

--- 모든 CCTV 스트림 해제 중 ---
[서울특별시 강남구] 2604: 스트림 해제 완료
[서울특별시 강남구] 2504: 스트림 해제 완료


# 사용 안 함

In [None]:
import requests

try:
    requests.post("https://your-lambda-url", json=data, timeout=0.2)
except requests.exceptions.ReadTimeout:
    # 굳이 결과 기다리지 않기 위해 타임아웃 짧게 설정
    pass

In [None]:
# 파일 변환 JSON -> CSV
import pandas as pd
import json
import os

def update_csv_with_json_data(
    existing_csv_path,
    json_input_path,
    output_csv_path,
    csv_encoding='euc-kr', # 기존 CSV 파일의 실제 인코딩에 맞춰주세요!
    json_encoding='utf-8'  # JSON 파일의 실제 인코딩에 맞춰주세요!
):
    """
    기존 CSV 파일에 JSON 파일의 데이터를 통합하여 업데이트합니다.
    cctvname을 기준으로 매칭하며, cctvurl, coordx, coordy 필드를 업데이트하거나 추가합니다.

    Args:
        existing_csv_path (str): 기존 주소 및 지역 정보가 있는 CSV 파일의 경로.
        json_input_path (str): 새로운 CCTV URL, 좌표 정보가 있는 JSON 파일의 경로.
        output_csv_path (str): 업데이트된 CSV 파일을 저장할 경로.
        csv_encoding (str): 기존 CSV 파일의 인코딩 (기본값 'euc-kr').
        json_encoding (str): JSON 파일의 인코딩 (기본값 'utf-8').
    """
    # 1. 기존 CSV 파일 로드
    if not os.path.exists(existing_csv_path):
        print(f"오류: 기존 CSV 파일 '{existing_csv_path}'을(를) 찾을 수 없습니다.")
        return False
    try:
        df_existing = pd.read_csv(existing_csv_path, encoding=csv_encoding)
        print(f"'{existing_csv_path}' 파일 로드 완료. (인코딩: {csv_encoding})")
    except Exception as e:
        print(f"오류: 기존 CSV 파일 로드 중 문제 발생. 인코딩 ({csv_encoding})을 확인하거나 파일이 유효한 CSV 형식인지 확인하세요: {e}")
        return False

    # 2. JSON 파일 로드 및 필요한 데이터 추출
    if not os.path.exists(json_input_path):
        print(f"오류: 입력 JSON 파일 '{json_input_path}'을(를) 찾을 수 없습니다.")
        return False
    try:
        with open(json_input_path, 'r', encoding=json_encoding) as f:
            json_data = json.load(f)

        # JSON 구조에서 'response' 키 아래 'data' 리스트를 DataFrame으로 정규화
        if 'response' in json_data and 'data' in json_data['response']:
            df_new = pd.json_normalize(json_data, record_path=['response', 'data'])
            # 필요한 열만 선택하여 매칭에 사용할 준비
            df_new = df_new[['cctvname', 'cctvurl', 'coordx', 'coordy']]
            print(f"'{json_input_path}' 파일에서 데이터 추출 완료.")
        else:
            print(f"오류: JSON 파일 '{json_input_path}'의 구조가 예상과 다릅니다.")
            print("최상위에 'response' 키가 있고 그 안에 'data' 키가 있는 JSON 구조여야 합니다.")
            return False
    except json.JSONDecodeError:
        print(f"오류: '{json_input_path}' 파일이 유효한 JSON 형식이 아닙니다. 파일 내용을 확인해주세요.")
        return False
    except Exception as e:
        print(f"오류: JSON 파일 로드 및 파싱 중 문제 발생: {e}")
        return False

    # 3. 데이터 통합 (cctvname을 기준으로 매칭)
    # 두 DataFrame 모두 cctvname을 인덱스로 설정하여 쉽게 업데이트합니다.
    # 만약 cctvname 컬럼이 없는 경우 오류가 발생할 수 있습니다.
    if 'cctvname' not in df_existing.columns:
        print(f"오류: 기존 CSV 파일 '{existing_csv_path}'에 'cctvname' 컬럼이 없습니다. 매칭 기준이 필요합니다.")
        return False
    if 'cctvname' not in df_new.columns:
        print(f"오류: JSON 파일에서 추출한 데이터에 'cctvname' 컬럼이 없습니다. 매칭 기준이 필요합니다.")
        return False

    df_existing = df_existing.set_index('cctvname')
    df_new = df_new.set_index('cctvname')

    # update() 메서드를 사용하여 매칭되는 행의 값을 업데이트합니다.
    # .update()는 기본적으로 NaN이 아닌 값만 업데이트합니다.
    df_existing.update(df_new)

    # 인덱스를 다시 컬럼으로 리셋 (cctvname이 다시 컬럼으로 돌아옴)
    df_updated = df_existing.reset_index()

    print("데이터 통합 완료.")

    # 4. 업데이트된 CSV 저장
    try:
        df_updated.to_csv(output_csv_path, index=False, encoding=csv_encoding)
        print(f"업데이트된 데이터가 '{output_csv_path}'에 성공적으로 저장되었습니다.")
        return True
    except Exception as e:
        print(f"오류: 업데이트된 CSV 파일 저장 중 문제 발생: {e}")
        return False

# --- 함수 사용 ---
if __name__ == "__main__":
    # ⚠️ 이곳에 실제 파일 경로를 입력해주세요!
    # 실제 파일 경로 예시 (사용자 환경에 맞게 수정)
    # 기존 CSV 파일 (address, region, cctvname 등이 포함된 파일)
    # cctv_info.csv 파일이 /content/drive/MyDrive/Colab_Notebooks/ 에 있다면
    existing_csv_file = '/content/drive/MyDrive/Colab_Notebooks/cctv_info_real.csv'

    # 새로운 CCTV URL, 좌표 정보가 포함된 JSON 파일
    # raw_cctv_data.json 파일이 /content/drive/MyDrive/Colab_Notebooks/ 에 있다면
    json_data_file = '/content/drive/MyDrive/Colab_Notebooks/cctv_info.json'

    # 업데이트된 CSV 파일을 저장할 경로 (기존 파일을 덮어쓰려면 existing_csv_file과 동일하게 설정)
    output_csv_file = '/content/drive/MyDrive/Colab_Notebooks/cctv_info_updated.csv'
    # 혹은 기존 파일을 덮어쓰고 싶다면:
    # output_csv_file = existing_csv_file

    # CSV 파일과 JSON 파일의 인코딩을 정확히 확인하여 설정해주세요!
    # 한글이 포함된 경우 CSV는 'euc-kr'인 경우가 많고, JSON은 'utf-8'인 경우가 많습니다.
    csv_file_encoding = 'euc-kr'
    json_file_encoding = 'utf-8'

    # 함수 호출
    print("--- 파일 통합 작업 시작 ---")
    success = update_csv_with_json_data(
        existing_csv_path=existing_csv_file,
        json_input_path=json_data_file,
        output_csv_path=output_csv_file,
        csv_encoding=csv_file_encoding,
        json_encoding=json_file_encoding
    )

    if success:
        print("\n--- 파일 통합 작업 완료 ---")
        # 업데이트된 CSV 파일의 처음 몇 줄을 확인하고 싶다면:
        try:
            df_final = pd.read_csv(output_csv_file, encoding=csv_file_encoding)
            print(f"\n최종 업데이트된 CSV 파일 '{output_csv_file}'의 내용 (첫 5줄):\n")
            print(df_final.head())
        except Exception as e:
            print(f"최종 CSV 파일 확인 중 오류 발생: {e}")
    else:
        print("\n--- 파일 통합 작업 실패 ---")

--- 파일 통합 작업 시작 ---
'/content/drive/MyDrive/Colab_Notebooks/cctv_info_real.csv' 파일 로드 완료. (인코딩: euc-kr)
'/content/drive/MyDrive/Colab_Notebooks/cctv_info.json' 파일에서 데이터 추출 완료.


ValueError: cannot reindex on an axis with duplicate labels