 putty에서
 uvicorn app:app --host 0.0.0.0 --port 5555 &

./ngrok http 5555


In [None]:
#분석 요청 코드(파이썬)
import requests

url = "https://c9b5-155-230-28-29.ngrok-free.app/predict"
# 로컬에 있는 테스트용 비디오 파일 경로

video_path = "test_clip.mp4"

with open(video_path, "rb") as f:
    files = {"file": ("test_clip", f, "application/octet-stream")}
    response = requests.post(url, files=files)

print(response.status_code)
print(response.json())

200
{'class': 'burglary', 'confidence': 0.9146443009376526, 'detected_objects': ['person']}


소요시간

Preprocess: 4.6ms  
Inference: 4.1ms  
Postprocess: 110.0ms

In [None]:
#객체탐지 후 행동분석 서버 코드
#app2.py
import io
import torch
import torch.nn as nn
from torchvision import transforms
from torchvision.models import video as video_models
from torchvision.io import read_video
from fastapi import FastAPI, File, UploadFile
from fastapi.responses import JSONResponse
from pydantic import BaseModel
from ultralytics import YOLO
import cv2
import numpy as np

# 클래스 설정
classes = ['normal', 'trespass', 'fight', 'dump', 'burglary', 'vandalism']
allowed_objects = [
    #사람,차량
    'person', 'car', 'bus', 'truck', 'motorbike', 'bicycle', 'train',
    # 동물
    'dog', 'cat', 'bird', 'horse', 'sheep', 'cow',
    'elephant', 'bear', 'zebra', 'giraffe'
]

device = torch.device("cuda")
# 객체탐지 모델
yolo_model = YOLO('yolov8n.pt')
# 이상행동 분류 모델
def build_model(num_classes=6):
    model = video_models.r3d_18(pretrained=False)
    model.fc = nn.Linear(model.fc.in_features, num_classes)
    return model

model = build_model(num_classes=len(classes))
model.load_state_dict(torch.load("anomaly_detection_model.pth", map_location=device))
model = model.to(device)
model.eval()

# 비디오 전처리
transform = transforms.Compose([
    transforms.Resize((112, 112)),
    transforms.Normalize([0.43216, 0.394666, 0.37645],
                         [0.22803, 0.22145, 0.216989])
])

# FastAPI 앱
app = FastAPI()

@app.post("/predict")
async def predict(file: UploadFile = File(...)):
    try:
        contents = await file.read()
        video_bytes = io.BytesIO(contents)

        # 임시 저장
        with open("temp.mp4", "wb") as f:
            f.write(video_bytes.read())

        # 비디오 로드
        video, _, _ = read_video("temp.mp4", pts_unit="sec")  # (T, H, W, C)
        if video.shape[0] == 0:
            return JSONResponse({"error": "비디오 프레임이 없습니다."}, status_code=400)

        # 첫 프레임을 YOLO 추론
        first_frame = video[0].numpy()  # (H, W, C)
        first_frame_bgr = cv2.cvtColor(first_frame, cv2.COLOR_RGB2BGR)

        results = yolo_model.predict(source=first_frame_bgr, conf=0.4, classes=None, device=0 if torch.cuda.is_available() else "cpu")
        names = results[0].names
        detected = results[0].boxes.cls.tolist()
        detected_labels = [names[int(cls)] for cls in detected]

        # 객체 감지 여부 확인
        relevant_labels = [label for label in detected_labels if label in allowed_objects]
        if not relevant_labels:
            return JSONResponse({
                "class": "normal",
                "confidence": 1.0,
                "detected_objects": []
            })

        # 이상행동 분석
        video = video.permute(0, 3, 1, 2).float() / 255.0  # (T, C, H, W)
        clip_len = 60
        if video.shape[0] > clip_len:
            video = video[:clip_len]
        elif video.shape[0] < clip_len:
            pad = clip_len - video.shape[0]
            video = torch.cat([video, video[-1:].repeat(pad, 1, 1, 1)], dim=0)

        video = transform(video)
        video = video.permute(1, 0, 2, 3).unsqueeze(0).to(device)  # (1, C, T, H, W)

        with torch.no_grad():
            logits = model(video)
            probs = torch.nn.functional.softmax(logits, dim=1)
            pred = torch.argmax(probs, dim=1).item()

        return JSONResponse({
            "class": classes[pred],
            "confidence": float(probs[0][pred]),
            "detected_objects": relevant_labels
        })

    except Exception as e:
        return JSONResponse({"error": str(e)}, status_code=500)


In [None]:
#이상행동 분석 서버코드
#app.py
import io
import torch
import torch.nn as nn
from torchvision import transforms
from torchvision.models import video as video_models
from torchvision.io import read_video
from fastapi import FastAPI, File, UploadFile
from fastapi.responses import JSONResponse
from pydantic import BaseModel
from typing import List

# 클래스 [보통, 침입, 싸움, 투기, 도둑질, 기물파손]
classes = ['normal', 'trespass', 'fight', 'dump', 'burglary', 'vandalism']

# GPU 우선
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

# 모델 생성
def build_model(num_classes=6):
    model = video_models.r3d_18(pretrained=False)
    model.fc = nn.Linear(model.fc.in_features, num_classes)
    return model

# 모델 로드
model = build_model(num_classes=len(classes))
model.load_state_dict(torch.load("anomaly_detection_model.pth", map_location=device))
model = model.to(device)
model.eval()

# 전처리
transform = transforms.Compose([
    transforms.Resize((112, 112)),
    transforms.Normalize([0.43216, 0.394666, 0.37645],
                         [0.22803, 0.22145, 0.216989])
])

# FastAPI 초기화
app = FastAPI()

@app.post("/predict")
async def predict(file: UploadFile = File(...)):
    try:
        # 파일 로드
        contents = await file.read()
        video_bytes = io.BytesIO(contents)

        # 임시 파일로 저장
        with open("temp.mp4", "wb") as f:
            f.write(video_bytes.read())

        # 임시 파일 로드
        video, _, _ = read_video("temp.mp4", pts_unit="sec")  # (T, H, W, C)
        video = video.permute(0, 3, 1, 2).float() / 255.0      # (T, C, H, W)

        clip_len = 60
        if video.shape[0] > clip_len:
            video = video[:clip_len]
        elif video.shape[0] < clip_len:
            pad = clip_len - video.shape[0]
            video = torch.cat([video, video[-1:].repeat(pad, 1, 1, 1)], dim=0)

        video = transform(video)
        video = video.permute(1, 0, 2, 3).unsqueeze(0).to(device)  # (1, C, T, H, W)

        # 추론
        with torch.no_grad():
            logits = model(video)
            probs = torch.nn.functional.softmax(logits, dim=1)
            pred = torch.argmax(probs, dim=1).item()

        return JSONResponse({"class": classes[pred], "confidence": float(probs[0][pred])})

    except Exception as e:
        return JSONResponse({"error": str(e)}, status_code=500)

In [None]:
#ai 모델 코드
#ai.py
from torch.utils.data import Dataset
from torchvision.io import read_video
from torchvision import transforms
import os
import torch.nn as nn
from torchvision.models import video as video_models
from torch.utils.data import DataLoader
import torch
from tqdm import tqdm
from torch.utils.data import DataLoader

class VideoClipDataset(Dataset):
    def __init__(self, root_dir, classes, clip_len=60):
        self.root_dir = root_dir
        self.classes = classes
        self.clip_len = clip_len
        self.samples = []
        for label_idx, label in enumerate(classes):
            class_dir = os.path.join(root_dir, label)
            for fname in os.listdir(class_dir):
                if fname.endswith(".mp4"):
                    self.samples.append((os.path.join(class_dir, fname), label_idx))

        self.transform = transforms.Compose([
            transforms.Resize((112, 112)),
            transforms.Normalize([0.43216, 0.394666, 0.37645],
                                 [0.22803, 0.22145, 0.216989])
        ])

    def __len__(self):
        return len(self.samples)

    def __getitem__(self, idx):
        video_path, label = self.samples[idx]
        video, _, _ = read_video(video_path, pts_unit='sec')  # (T, H, W, C)

        video = video.permute(0, 3, 1, 2).float() / 255.0  # (T, C, H, W)

        if video.shape[0] > self.clip_len:
            video = video[:self.clip_len]
        elif video.shape[0] < self.clip_len:
            pad = self.clip_len - video.shape[0]
            video = torch.cat([video, video[-1:].repeat(pad, 1, 1, 1)], dim=0)

        if self.transform:
            video = self.transform(video)

        video = video.permute(1, 0, 2, 3)  # (C, T, H, W) ← 여기 추가!

        return video, label

def build_model(num_classes=6):
    model = video_models.r3d_18(pretrained=False)
    model.fc = nn.Linear(model.fc.in_features, num_classes)
    return model

def train(model, dataloader, device, epochs=5):
    model = model.to(device)
    criterion = nn.CrossEntropyLoss()
    optimizer = torch.optim.Adam(model.parameters(), lr=1e-4)

    model.train()
    for epoch in range(epochs):
        total_loss, total_acc = 0, 0
        for x, y in tqdm(dataloader):
            x, y = x.to(device), y.to(device)
            logits = model(x)
            loss = criterion(logits, y)
            preds = logits.argmax(dim=1)
            acc = (preds == y).float().mean()
            total_loss += loss.item()
            total_acc += acc.item()

            optimizer.zero_grad()
            loss.backward()
            optimizer.step()
        print(f"Epoch {epoch+1}: Loss {total_loss/len(dataloader):.4f}, Acc {total_acc/len(dataloader):.4f}")

classes = ['normal', 'trespass', 'fight', 'dump', 'burglary', 'vandalism']
clip_output_dir = r"D:\clips"

dataset = VideoClipDataset(clip_output_dir, classes)
dataloader = DataLoader(dataset, batch_size=4, shuffle=True)

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model = build_model(num_classes=len(classes))
train(model, dataloader, device, epochs=10)


[Epoch 1]  Loss: 0.667  Accuracy: 74.9%


[Epoch10]  Loss: 0.360  Accuracy: 83.9%