## 최종 코드(23.08.05. 07:45 기준)

In [None]:
## Morning, Afternoon 확인
## 예비 코드 지움

import cv2
import pandas as pd
from ultralytics import YOLO
import numpy as np
import torch
import torch.nn as nn
import torchvision.transforms as transforms
from DenseNet import DenseNet, BottleNeck, Transition
from PIL import Image
from datetime import datetime
import os
os.environ['KMP_DUPLICATE_LIB_OK']='True'

os.environ["CUDA_DEVICE_ORDER"]="PCI_BUS_ID"   
os.environ["CUDA_VISIBLE_DEVICES"]="0"

device = torch.device('cuda:0')
densenet_model = torch.load('./model/model2.pth', map_location=torch.device(device))
# transform 밖으로 뺴서 한 번만 선언
transform = transforms.Compose([
    transforms.Resize((128, 128)),  # 이미지 크기 조정
    transforms.ToTensor(),  # 이미지를 텐서로 변환
    transforms.Normalize(mean=[0.25, 0.25, 0.25], std=[0.1, 0.1, 0.1])
])

def predict_image_class(image, densenet_model, transform):
    
    if type(image)==str:
        image = Image.open(image).convert("RGB")

    # else 문으로 조건 추가
    else: 
        image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB)
        image=Image.fromarray(image)
    image = transform(image).unsqueeze(0).to(device)

    densenet_model.eval()
    with torch.no_grad():
        outputs = densenet_model(image)
        _, predicted = torch.max(outputs.data, 1)

    return predicted.item()


# cap = cv2.VideoCapture("./Video Data 2/Morning.mp4")
cap = cv2.VideoCapture("./Video Data 3/Afternoon.mp4")
model_car = YOLO("./model/yolov8s.pt")      # 우회전 tracker용 모델
model = YOLO("./model/best.pt")     # 횡단자 객체 탐지용 모델

datas = []                        # 5400 frame의 차량 객체 탐지 정보 저장
right_turn_dict = {}              # 우회전 확인용 딕셔너리
pause_dict = {}                   # 우측 건널목 횡단자 정지선 일시정지 확인용 딕셔너리
right_turn_pause_dict = {}        # 우회전(좌측 건널목) 횡단자 일시정지 확인용 딕셔너리
red_light_pause_dict = {}         # 적신호 정지서 일시정지 확인용 딕셔너리
right_turn_violated_dict = {}     # 최종 위반여부 확인용 딕셔너리
results_person_right_list = []    # 적신호 일시정지를 더 정밀히 확인하고 갑작스레 들어온 횡당자 경계 위한 리스트
results_person_left_list = []     # 우회전 일시정지를 더 정밀히 확인하고 갑작스레 들어온 횡당자 경계 위한 리스트
car_judging_dict = {}             # 차량 구분 딕셔너리
car_images_dict = {}              # 차량 이미지(array) 저장 딕셔너리
distance = 0.5                    # 일시정지 확인용 좌표 이동 기준 거리
frame_count = 0

# red_light = list(pd.read_csv('./대연교차로_오전.csv').전방차량신호) # 우측 차량 직진 신호 True=적신호, False=청신호
red_light = list(pd.read_csv('./대연교차로_오후.csv').전방차량신호) # 우측 차량 직진 신호 True=적신호, False=청신호
red_light = [True if i=='적색' else False for i in red_light]

dir_name = None

while True:
    ret, frame = cap.read()
    if not ret:
        break

    time_string = f'{frame_count//108000:02}:{(frame_count//1800)%60:02}:{(frame_count//30)%60:02}'
    time_string_name = f'{frame_count//108000:02}{(frame_count//1800)%60:02}{(frame_count//30)%60:02}{frame_count%30:02}'

    # 폴더 생성
    if dir_name != datetime.today().strftime("%Y%m%d"):
        dir_name = datetime.today().strftime("%Y%m%d")
        os.makedirs(f'./{dir_name}', exist_ok=True)

    # roi 조도 어둡게 설정.
    results_car = model_car.track(frame[290:790,960:1400], persist=True, classes=[2,5,7])[0].to('cuda:0')     # ROI 및 검출 클래스 지정
    results_car_data = results_car.boxes.data
    results_left = model(frame[530:790,290:915])[0].to('cuda:0')       # 버스 승객은 3에서도 검출되는 경우 발생
    results_right = model(frame[505:790,1150:1920])[0].to('cuda:0')

    # 현재==frame_count
    if results_car_data.size()[1]!=6:                             # 기본 size==(n,7)이나 (n,6)을 반환하여 오류 생기는 경우가 발생하여 조건 지정.

        datas.append(results_car_data)                            # 다른 방향 직진차량을 제외하기 위해 기존 좌표 저장.
        tmp = torch.cat(datas)                                         # 우회전 객체 검출 위해 2차원 텐서로 변환
        tmp = tmp[torch.isin(tmp[:,4],results_car_data[:,4])]     # 현재 프레임 존재하는 id의 좌표만 남김.
        tmp = tmp[torch.isin(tmp[:,4],tmp[(tmp[:,3]<400)&(tmp[:,2]>240)][:,4].unique())]    # 현재 검출 id 중 3차로에서 온 차량 여부 확인
        
        # bus는 먼저 구분한 후 DenseNet 사용하여 다득표로 분류
        for i in results_car_data[(results_car_data[:,3]>355)&(results_car_data[:,3]<385)][:,4]:
            if car_judging_dict.get(int(i)) is None:
                xyxy_value = results_car_data[results_car_data[:,4]==i].reshape(-1)
                target_image = frame[int(xyxy_value[1])+290:int(xyxy_value[3])+290,int(xyxy_value[0])+960:int(xyxy_value[2])+960]
                car_judging_dict[int(i)] = [1 if xyxy_value[6]==5 else predict_image_class(target_image, densenet_model, transform)]
                car_images_dict[int(i)] = np.array(frame[int(xyxy_value[1])+280:int(xyxy_value[3])+310,int(xyxy_value[0])+950:int(xyxy_value[2])+980])
            else:
                car_judging_dict[int(i)].append(1 if xyxy_value[6]==5 else predict_image_class(frame[int(xyxy_value[1])+290:int(xyxy_value[3])+290,int(xyxy_value[0])+960:int(xyxy_value[2])+960], densenet_model, transform))

        
        # 좌측 건널목, 우회전 진입 일시정지 판별
        results_person_left = results_left.boxes.data
        if list(results_person_left)==[]:
            results_person_left = 0
        else:
            results_person_left = results_person_left[~((results_person_left[:,0]>527)&(results_person_left[:,3]<73))][:,5].int().tolist()     # 좌측 건널목 우상단 인도 제거
            results_person_left = results_person_left.count(0)
        results_person_left_list = results_person_left_list[-29:]+[results_person_left]
        
        if list(results_right)==[]:
            results_person_right = 0
        else:
            results_person_right = results_right.boxes.data[:,5].int().tolist()
            results_person_right = results_person_right.count(0)
        results_person_right_list = results_person_right_list[-29:]+[results_person_right]

        # 현재 우회전일 경우
        right_turn = tmp[(tmp[:,3]>310)&(tmp[:,2]<216)]                      # 현재 우회전 여부 확인
        for i in right_turn[:,4].unique():
            i = int(i)
            right_turn_dict[i] = time_string                               # 우회전 감지 검증용 딕셔너리 

            if (results_person_left_list.count(0)<int(len(results_person_left_list)/2+1)) and results_person_left>0:
                right_turn_pause_test = pd.DataFrame(right_turn[right_turn[:,4]==i][-30:].cpu())
                right_turn_pause_test = ((right_turn_pause_test[1].rolling(15).mean() - right_turn_pause_test[1].rolling(5).mean()).abs()<distance) + ((right_turn_pause_test[2].rolling(15).mean() - right_turn_pause_test[2].rolling(5).mean()).abs()<distance)
                # 횡단자 일지정지 = True, 횡단자 위반 진행 = False
                right_turn_pause_dict[i] = (False, time_string) if np.sum(right_turn_pause_test[20:])==0 else (True, time_string)


            # 아래의 좌측 건널목, 우회전 진입 일시정지에서 입력한 right_turn_pause_dict에 해당 id가 있을 경우 조건문 실행.
            if right_turn_pause_dict.get(i):
                right_turn_pause_boolen, right_turn_frame = right_turn_pause_dict.get(i)        # (True or False, frame_number)로 구성
                if not right_turn_pause_boolen:
                    if right_turn_violated_dict.get(i):
                        v1, v2, v3, v4, v5 = right_turn_violated_dict[i]
                        right_turn_violated_dict[i] = (v1, v2, 1, v4, v5)
                        pd.DataFrame(right_turn_violated_dict, index=['마지막 위반 시점 시간', '정지선 일시정지 위반', '횡단자 보호 위반', '차종 구분', '범칙금']).T.reset_index(drop=True).to_excel(f'./{dir_name}/right_turn_violated_dict.xlsx')
                    else:
                        car_type = int(np.median(car_judging_dict.get(i)))
                        right_turn_violated_dict[i] = (right_turn_frame, 0, 1, '승용' if car_type==0 else '승합', 60000 if car_type==0 else 70000)    # right_turn_violated_dict이 최종 위반 확인용 딕셔너리. 1 = 위반.
                        pd.DataFrame(right_turn_violated_dict, index=['마지막 위반 시점 시간', '정지선 일시정지 위반', '횡단자 보호 위반', '차종 구분', '범칙금']).T.reset_index(drop=True).to_excel(f'./{dir_name}/right_turn_violated_dict.xlsx')
                        os.makedirs(f'./{dir_name}/{len(right_turn_violated_dict)-1:05}', exist_ok=True)

            # 아래의 우측 건널목, 횡단자 신호 일시정지에서 입력한 red_light_pause_dict에 해당 id가 있을 경우 조건문 실행
            if red_light_pause_dict.get(i):
                red_light_pause_boolen, red_light_pause_frame = red_light_pause_dict.get(i)
                if not red_light_pause_boolen:
                    if right_turn_violated_dict.get(i):
                        v1, v2, v3, v4, v5 = right_turn_violated_dict[i]
                        right_turn_violated_dict[i] = (v1, 1, v3, v4, v5)
                        pd.DataFrame(right_turn_violated_dict, index=['마지막 위반 시점 시간', '정지선 일시정지 위반', '횡단자 보호 위반', '차종 구분', '범칙금']).T.reset_index(drop=True).to_excel(f'./{dir_name}/right_turn_violated_dict.xlsx')
                    else:
                        car_type = int(np.median(car_judging_dict.get(i)))
                        right_turn_violated_dict[i] = (red_light_pause_frame, 1, 0, '승용' if car_type==0 else '승합', 60000 if car_type==0 else 70000)    # 1 = 위반
                        pd.DataFrame(right_turn_violated_dict, index=['마지막 위반 시점 시간', '정지선 일시정지 위반', '횡단자 보호 위반', '차종 구분', '범칙금']).T.reset_index(drop=True).to_excel(f'./{dir_name}/right_turn_violated_dict.xlsx')
                        os.makedirs(f'./{dir_name}/{len(right_turn_violated_dict)-1:05}', exist_ok=True)

        # 우측 건널목, 횡단자 신호 일시정지 판별
        # 적신호, 횡단자 없음
        if red_light[frame_count] and (results_person_right==0): # 횡단자가 있을 경우 움직이면 위반
            pause_tensor = tmp[torch.isin(tmp[:,4],tmp[(tmp[:,3]>140)&(tmp[:,3]<215)][:,4].unique())]
            # 140<ymax<215의 좌표를 보유한 적 있는 id 반환
            for i in pause_tensor[:,4].unique():
                pause = pause_tensor[pause_tensor[:,4]==i][-30:]
                # 최근 30frame의 좌표 반환
                if len(pause[(pause[:,3]>140)&(pause[:,3]<215)])==30:
                    # 최근 30frame의 좌표가 145<ymax<215일 때 작동
                    pause = pd.DataFrame(pause[pause[:,4]==i][-30:].cpu())
                    # 일시정지 위반 버스 검토
                    pause = ((pause[1].rolling(15).mean() - pause[1].rolling(5).mean()).abs()<distance) + ((pause[3].rolling(15).mean() - pause[3].rolling(5).mean()).abs()<distance) # True==일시정지
                    red_light_pause_dict[int(i)] = (True if np.sum(pause[20:])==10 else False, time_string)

        # 횡단자 있음
        if results_person_right>0:
            pause_tensor = tmp[torch.isin(tmp[:,4],tmp[(tmp[:,3]>140)&(tmp[:,3]<215)][:,4].unique())]
            # 140<ymax<215의 좌표를 보유한 적 있는 id 반환
            for i in pause_tensor[:,4].unique():
                pause = pause_tensor[pause_tensor[:,4]==i][-30:]
                # 최근 30frame의 좌표 반환
                if len(pause[(pause[:,3]>140)&(pause[:,3]<215)])==30:
                    # 최근 30frame의 좌표가 145<ymax<215일 때 작동
                    pause = pd.DataFrame(pause.cpu())
                    pause = ((pause[1].rolling(15).mean() - pause[1].rolling(5).mean()).abs()<distance) + ((pause[3].rolling(15).mean() - pause[3].rolling(5).mean()).abs()<distance) # True==일시정지
                    if np.sum(pause[20:])==10:
                        red_light_pause_dict[int(i)] = (True if np.sum(pause[20:])==10 else False, time_string)

                elif len(pause[pause[:,3]<240])==30:
                    # 최근 30frame의 좌표가 ymax<240일 때 작동
                    # 사람이 갑자기 튀어 나왔는지 확인
                    if results_person_right_list.count(0)<int(len(results_person_right_list)/2+1): 
                        pause = pd.DataFrame(pause.cpu())
                        pause = ((pause[1].rolling(15).mean() - pause[1].rolling(5).mean()).abs()<distance) + ((pause[3].rolling(15).mean() - pause[3].rolling(5).mean()).abs()<distance) # True==일시정지
                        if np.sum(pause[20:])==0:
                            red_light_pause_dict[int(i)] = (False, time_string)

    # # 진행 상황 확인용 코드. txt 파일은 열려도 덮어쓰기 가능
    # with open(f'./{dir_name}/for_checking.txt', 'w') as f:
    #     f.write(f'frame_count: {frame_count}, time_string: {time_string}')

    # # 영상 안 볼 거면 아래 3줄 주석 처리
    # cv2.imshow("frame", frame)
    # if cv2.waitKey(1) & 0xFF == ord("q"):
    #     break
    
    frame_count += 1

    if len(datas)>5399:                                                             # 메모리 절약을 위해 179초 동안의 frame만 저장
        datas = datas[30:]
cap.release()
cv2.destroyAllWindows()

In [None]:
## 예비 코드 남김

import cv2
import pandas as pd
from ultralytics import YOLO
import numpy as np
import torch
import torch.nn as nn
import torchvision.transforms as transforms
from DenseNet import DenseNet, BottleNeck, Transition
from PIL import Image
from datetime import datetime
import os
os.environ['KMP_DUPLICATE_LIB_OK']='True'

os.environ["CUDA_DEVICE_ORDER"]="PCI_BUS_ID"   
os.environ["CUDA_VISIBLE_DEVICES"]="0"

device = torch.device('cuda:0')
densenet_model = torch.load('./model/model2.pth', map_location=torch.device(device))
# transform 밖으로 뺴서 한 번만 선언
transform = transforms.Compose([
    transforms.Resize((128, 128)),  # 이미지 크기 조정
    transforms.ToTensor(),  # 이미지를 텐서로 변환
    transforms.Normalize(mean=[0.25, 0.25, 0.25], std=[0.1, 0.1, 0.1])
])

def predict_image_class(image, densenet_model, transform):
    
    if type(image)==str:
        image = Image.open(image).convert("RGB")

    # else 문으로 조건 추가
    else: 
        image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB)
        image=Image.fromarray(image)
    image = transform(image).unsqueeze(0).to(device)

    densenet_model.eval()
    with torch.no_grad():
        outputs = densenet_model(image)
        _, predicted = torch.max(outputs.data, 1)

    return predicted.item()


# cap = cv2.VideoCapture("./Video Data 2/Morning.mp4")
cap = cv2.VideoCapture("./Video Data 3/Afternoon.mp4")
model_car = YOLO("./model/yolov8s.pt")      # 우회전 tracker용 모델
model = YOLO("./model/best.pt")     # 횡단자 객체 탐지용 모델

datas = []                        # 5400 frame의 차량 객체 탐지 정보 저장
right_turn_dict = {}              # 우회전 확인용 딕셔너리
pause_dict = {}                   # 우측 건널목 횡단자 정지선 일시정지 확인용 딕셔너리
right_turn_pause_dict = {}        # 우회전(좌측 건널목) 횡단자 일시정지 확인용 딕셔너리
red_light_pause_dict = {}         # 적신호 정지서 일시정지 확인용 딕셔너리
right_turn_violated_dict = {}     # 최종 위반여부 확인용 딕셔너리
results_person_right_list = []    # 적신호 일시정지를 더 정밀히 확인하고 갑작스레 들어온 횡당자 경계 위한 리스트
results_person_left_list = []     # 우회전 일시정지를 더 정밀히 확인하고 갑작스레 들어온 횡당자 경계 위한 리스트
car_judging_dict = {}             # 차량 구분 딕셔너리
car_images_dict = {}              # 차량 이미지(array) 저장 딕셔너리
distance = 0.5                    # 일시정지 확인용 좌표 이동 기준 거리
frame_count = 0

# red_light = list(pd.read_csv('./대연교차로_오전.csv').전방차량신호) # 우측 차량 직진 신호 True=적신호, False=청신호
red_light = list(pd.read_csv('./대연교차로_오후.csv').전방차량신호) # 우측 차량 직진 신호 True=적신호, False=청신호
red_light = [True if i=='적색' else False for i in red_light]

dir_name = None

# ligth_gamma = -50

while True:
    ret, frame = cap.read()
    if not ret:
        break

    time_string = f'{frame_count//108000:02}:{(frame_count//1800)%60:02}:{(frame_count//30)%60:02}'
    time_string_name = f'{frame_count//108000:02}{(frame_count//1800)%60:02}{(frame_count//30)%60:02}{frame_count%30:02}'

    # 폴더 생성
    if dir_name != datetime.today().strftime("%Y%m%d"):
        dir_name = datetime.today().strftime("%Y%m%d")
        os.makedirs(f'./{dir_name}', exist_ok=True)
        # os.makedirs(f'./{dir_name}/red_light_pause', exist_ok=True)
        # os.makedirs(f'./{dir_name}/right_turn_violated', exist_ok=True)
        # os.makedirs(f'./{dir_name}/right_turn_pause', exist_ok=True)
        # os.makedirs(f'./{dir_name}/car_judging', exist_ok=True)

    # roi 조도 어둡게 설정.
    results_car = model_car.track(frame[290:790,960:1400], persist=True, classes=[2,5,7])[0].to('cuda:0')     # ROI 및 검출 클래스 지정
    results_car_data = results_car.boxes.data
    # results_left = model(cv2.add(frame[530:790,290:915], (ligth_gamma, ligth_gamma, ligth_gamma, ligth_gamma))0].to('cuda:0')       # 버스 승객은 3에서도 검출되는 경우 발생
    # results_right = model(cv2.add(frame[530:790,1150:1920], (ligth_gamma, ligth_gamma, ligth_gamma, ligth_gamma)))[0].to('cuda:0')
    results_left = model(frame[530:790,290:915])[0].to('cuda:0')       # 버스 승객은 3에서도 검출되는 경우 발생
    results_right = model(frame[505:790,1150:1920])[0].to('cuda:0')

    # 현재==frame_count
    if results_car_data.size()[1]!=6:                             # 기본 size==(n,7)이나 (n,6)을 반환하여 오류 생기는 경우가 발생하여 조건 지정.

        datas.append(results_car_data)                            # 다른 방향 직진차량을 제외하기 위해 기존 좌표 저장.
        tmp = torch.cat(datas)                                         # 우회전 객체 검출 위해 2차원 텐서로 변환
        tmp = tmp[torch.isin(tmp[:,4],results_car_data[:,4])]     # 현재 프레임 존재하는 id의 좌표만 남김.
        tmp = tmp[torch.isin(tmp[:,4],tmp[(tmp[:,3]<400)&(tmp[:,2]>240)][:,4].unique())]    # 현재 검출 id 중 3차로에서 온 차량 여부 확인
        
        # bus는 먼저 구분한 후 DenseNet 사용하여 다득표로 분류
        for i in results_car_data[(results_car_data[:,3]>355)&(results_car_data[:,3]<385)][:,4]:
            if car_judging_dict.get(int(i)) is None:
                xyxy_value = results_car_data[results_car_data[:,4]==i].reshape(-1)
                target_image = frame[int(xyxy_value[1])+290:int(xyxy_value[3])+290,int(xyxy_value[0])+960:int(xyxy_value[2])+960]
                car_judging_dict[int(i)] = [1 if xyxy_value[6]==5 else predict_image_class(target_image, densenet_model, transform)]
                # car_judging_dict[int(i)] = [predict_image_class(target_image, densenet_model, transform)]
                # cv2.imwrite(f'./{dir_name}/car_judging/{time_string_name}.jpg', target_image)                # 차량 이미지 저장
                car_images_dict[int(i)] = np.array(frame[int(xyxy_value[1])+280:int(xyxy_value[3])+310,int(xyxy_value[0])+950:int(xyxy_value[2])+980])
            else:
                car_judging_dict[int(i)].append(1 if xyxy_value[6]==5 else predict_image_class(frame[int(xyxy_value[1])+290:int(xyxy_value[3])+290,int(xyxy_value[0])+960:int(xyxy_value[2])+960], densenet_model, transform))
                # car_judging_dict[int(i)].append(predict_image_class(frame[int(xyxy_value[1])+290:int(xyxy_value[3])+290,int(xyxy_value[0])+960:int(xyxy_value[2])+960], densenet_model, transform))

        
        # 좌측 건널목, 우회전 진입 일시정지 판별
        results_person_left = results_left.boxes.data
        if list(results_person_left)==[]:
            results_person_left = 0
        else:
            results_person_left = results_person_left[~((results_person_left[:,0]>527)&(results_person_left[:,3]<73))][:,5].int().tolist()     # 좌측 건널목 우상단 인도 제거
            results_person_left = results_person_left.count(0)
        results_person_left_list = results_person_left_list[-29:]+[results_person_left]
        
        if list(results_right)==[]:
            results_person_right = 0
        else:
            results_person_right = results_right.boxes.data[:,5].int().tolist()
            results_person_right = results_person_right.count(0)
        results_person_right_list = results_person_right_list[-29:]+[results_person_right]

        # 현재 우회전일 경우
        right_turn = tmp[(tmp[:,3]>310)&(tmp[:,2]<216)]                      # 현재 우회전 여부 확인
        for i in right_turn[:,4].unique():
            i = int(i)
            right_turn_dict[i] = time_string                               # 우회전 감지 검증용 딕셔너리
            # pd.DataFrame(right_turn_dict.items()).to_excel(f'./{dir_name}/right_turn_pause/right_turn_dict.xlsx', index=False) # 진행 상황 확인 위해 매번 저장 처리.

            if (results_person_left_list.count(0)<int(len(results_person_left_list)/2+1)) and results_person_left>0:
                right_turn_pause_test = pd.DataFrame(right_turn[right_turn[:,4]==i][-30:].cpu())
                right_turn_pause_test = ((right_turn_pause_test[1].rolling(15).mean() - right_turn_pause_test[1].rolling(5).mean()).abs()<distance) + ((right_turn_pause_test[2].rolling(15).mean() - right_turn_pause_test[2].rolling(5).mean()).abs()<distance)
                # 횡단자 일지정지 = True, 횡단자 위반 진행 = False
                right_turn_pause_dict[i] = (False, time_string) if np.sum(right_turn_pause_test[20:])==0 else (True, time_string)
                # pd.DataFrame(right_turn_pause_dict.items()).to_excel(f'./{dir_name}/right_turn_pause/right_turn_pause_dict.xlsx', index=False)
                # cv2.imwrite(f'./{dir_name}/right_turn_violated/{time_string_name}.jpg', frame)

            # 아래의 좌측 건널목, 우회전 진입 일시정지에서 입력한 right_turn_pause_dict에 해당 id가 있을 경우 조건문 실행.
            if right_turn_pause_dict.get(i):
                right_turn_pause_boolen, right_turn_frame = right_turn_pause_dict.get(i)        # (True or False, frame_number)로 구성
                if not right_turn_pause_boolen:
                    if right_turn_violated_dict.get(i):
                        v1, v2, v3, v4, v5 = right_turn_violated_dict[i]
                        right_turn_violated_dict[i] = (v1, v2, 1, v4, v5)
                        pd.DataFrame(right_turn_violated_dict, index=['마지막 위반 시점 시간', '정지선 일시정지 위반', '횡단자 보호 위반', '차종 구분', '범칙금']).T.reset_index(drop=True).to_excel(f'./{dir_name}/right_turn_violated_dict.xlsx')
                    else:
                        car_type = int(np.median(car_judging_dict.get(i)))
                        right_turn_violated_dict[i] = (right_turn_frame, 0, 1, '승용' if car_type==0 else '승합', 60000 if car_type==0 else 70000)    # right_turn_violated_dict이 최종 위반 확인용 딕셔너리. 1 = 위반.
                        pd.DataFrame(right_turn_violated_dict, index=['마지막 위반 시점 시간', '정지선 일시정지 위반', '횡단자 보호 위반', '차종 구분', '범칙금']).T.reset_index(drop=True).to_excel(f'./{dir_name}/right_turn_violated_dict.xlsx')
                        os.makedirs(f'./{dir_name}/{len(right_turn_violated_dict)-1:05}', exist_ok=True)

            # 아래의 우측 건널목, 횡단자 신호 일시정지에서 입력한 red_light_pause_dict에 해당 id가 있을 경우 조건문 실행
            if red_light_pause_dict.get(i):
                red_light_pause_boolen, red_light_pause_frame = red_light_pause_dict.get(i)
                if not red_light_pause_boolen:
                    if right_turn_violated_dict.get(i):
                        v1, v2, v3, v4, v5 = right_turn_violated_dict[i]
                        right_turn_violated_dict[i] = (v1, 1, v3, v4, v5)
                        pd.DataFrame(right_turn_violated_dict, index=['마지막 위반 시점 시간', '정지선 일시정지 위반', '횡단자 보호 위반', '차종 구분', '범칙금']).T.reset_index(drop=True).to_excel(f'./{dir_name}/right_turn_violated_dict.xlsx')
                    else:
                        car_type = int(np.median(car_judging_dict.get(i)))
                        right_turn_violated_dict[i] = (red_light_pause_frame, 1, 0, '승용' if car_type==0 else '승합', 60000 if car_type==0 else 70000)    # 1 = 위반
                        pd.DataFrame(right_turn_violated_dict, index=['마지막 위반 시점 시간', '정지선 일시정지 위반', '횡단자 보호 위반', '차종 구분', '범칙금']).T.reset_index(drop=True).to_excel(f'./{dir_name}/right_turn_violated_dict.xlsx')
                        os.makedirs(f'./{dir_name}/{len(right_turn_violated_dict)-1:05}', exist_ok=True)

        # 우측 건널목, 횡단자 신호 일시정지 판별
        # 적신호, 횡단자 없음
        if red_light[frame_count] and (results_person_right==0): # 횡단자가 있을 경우 움직이면 위반
            pause_tensor = tmp[torch.isin(tmp[:,4],tmp[(tmp[:,3]>140)&(tmp[:,3]<215)][:,4].unique())]
            # 140<ymax<215의 좌표를 보유한 적 있는 id 반환
            for i in pause_tensor[:,4].unique():
                pause = pause_tensor[pause_tensor[:,4]==i][-30:]
                # 최근 30frame의 좌표 반환
                if len(pause[(pause[:,3]>140)&(pause[:,3]<215)])==30:
                    # 최근 30frame의 좌표가 145<ymax<215일 때 작동
                    pause = pd.DataFrame(pause[pause[:,4]==i][-30:].cpu())
                    # 일시정지 위반 버스 검토
                    pause = ((pause[1].rolling(15).mean() - pause[1].rolling(5).mean()).abs()<distance) + ((pause[3].rolling(15).mean() - pause[3].rolling(5).mean()).abs()<distance) # True==일시정지
                    red_light_pause_dict[int(i)] = (True if np.sum(pause[20:])==10 else False, time_string)
                    # pd.DataFrame(red_light_pause_dict.items()).to_excel(f'./{dir_name}/red_light_pause/red_light_pause_dict.xlsx', index=False)
                    # cv2.imwrite(f'./{dir_name}/red_light_pause/{time_string_name}.jpg', frame)

        # 횡단자 있음
        if results_person_right>0:
            pause_tensor = tmp[torch.isin(tmp[:,4],tmp[(tmp[:,3]>140)&(tmp[:,3]<215)][:,4].unique())]
            # 140<ymax<215의 좌표를 보유한 적 있는 id 반환
            for i in pause_tensor[:,4].unique():
                pause = pause_tensor[pause_tensor[:,4]==i][-30:]
                # 최근 30frame의 좌표 반환
                if len(pause[(pause[:,3]>140)&(pause[:,3]<215)])==30:
                    # 최근 30frame의 좌표가 145<ymax<215일 때 작동
                    pause = pd.DataFrame(pause.cpu())
                    pause = ((pause[1].rolling(15).mean() - pause[1].rolling(5).mean()).abs()<distance) + ((pause[3].rolling(15).mean() - pause[3].rolling(5).mean()).abs()<distance) # True==일시정지
                    if np.sum(pause[20:])==10:
                        red_light_pause_dict[int(i)] = (True if np.sum(pause[20:])==10 else False, time_string)
                        # pd.DataFrame(red_light_pause_dict.items()).to_excel(f'./{dir_name}/red_light_pause/red_light_pause_dict.xlsx', index=False)
                        # cv2.imwrite(f'./{dir_name}/red_light_pause/{time_string_name}.jpg', frame)

                elif len(pause[pause[:,3]<240])==30:
                    # 최근 30frame의 좌표가 ymax<240일 때 작동
                    # 사람이 갑자기 튀어 나왔는지 확인
                    if results_person_right_list.count(0)<int(len(results_person_right_list)/2+1): 
                        pause = pd.DataFrame(pause.cpu())
                        pause = ((pause[1].rolling(15).mean() - pause[1].rolling(5).mean()).abs()<distance) + ((pause[3].rolling(15).mean() - pause[3].rolling(5).mean()).abs()<distance) # True==일시정지
                        if np.sum(pause[20:])==0:
                            red_light_pause_dict[int(i)] = (False, time_string)
                            # red_light_pause_dict[int(i)] = (True if np.sum(pause[20:])==10 else False, time_string_name)
                            # pd.DataFrame(red_light_pause_dict.items()).to_excel(f'./{dir_name}/red_light_pause/red_light_pause_dict.xlsx', index=False)
                            # cv2.imwrite(f'./{dir_name}/red_light_pause/{time_string_name}.jpg', frame)

    # # 확인용 이미지 저장. right_turn 폴더 생성 필요.
    # frame[290:790,960:1400] = results_car.plot()                                    # 트래킹 및 객체 탐지 이미지 전체 이미지와 합성
    # frame[530:790,290:915] = results_left.plot()                                    # 좌측 건널목
    # frame[505:790,1150:1920] = results_right.plot()                                 # 우측 건널목
    # cv2.rectangle(frame, (960, 290), (1400, 790), (255,0,0), 2)                     # ROI 청색 사각형 표시
    # cv2.rectangle(frame, (1220, 430), (1400, 505), (191,191,0), 2)                  # ROI 청녹 사각형 표시
    # cv2.rectangle(frame, (960, 690), (1060, 790), (0, 0, 255), 2)                   # ROI 적색 사각형 표시
    # cv2.rectangle(frame, (290, 530), (915, 790), (191, 0, 191), 2)                  # ROI 자색 사각형 표시. 좌측.
    # cv2.rectangle(frame, (1150, 505), (1920, 790), (191, 0, 191), 2)                # ROI 자색 사각형 표시. 우측.
    # cv2.rectangle(frame, (960, 645), (1400, 675), (90, 90, 0), 2)                   # ROI 녹색 사각형 표시.
    # cv2.putText(frame, f'Time: {time_string}', (20, 20), cv2.FONT_HERSHEY_SIMPLEX, 0.5, (191, 191, 191), 2)  # 좌상단에 원본 동영상 시간 표시
    # cv2.putText(frame, f'Frame: {frame_count}', (20, 40), cv2.FONT_HERSHEY_SIMPLEX, 0.5, (0, 0, 191), 2)  # 좌상단에 frame 표시
    # cv2.imwrite(f'./{dir_name}/{time_string_name}.jpg',frame)                # 전체 이미지 저장

    # # 진행 상황 확인용 코드. txt 파일은 열려도 덮어쓰기 가능
    # with open(f'./{dir_name}/for_checking.txt', 'w') as f:
    #     f.write(f'frame_count: {frame_count}, time_string: {time_string}')

    frame_count += 1
    # # 영상 안 볼 거면 아래 3줄 주석 처리
    # cv2.imshow("frame", frame)
    # if cv2.waitKey(1) & 0xFF == ord("q"):
    #     break

    if len(datas)>5399:                                                             # 메모리 절약을 위해 179초 동안의 frame만 저장
        datas = datas[30:]
cap.release()
cv2.destroyAllWindows()

In [None]:
# 전체 이미지 객체 탐지 동영상 생성
import glob
from tqdm.notebook import tqdm

paths = glob.glob(f'./{dir_name}/*.jpg')
paths = list(pd.DataFrame([list(map(lambda x: int(x.split('frame')[-1][:-4]),paths)), paths]).T.set_index(0).sort_index()[1])

# 이미지를 영상으로 만들기
pathOut = f'./{dir_name}/tmp.mp4'
fps = 30
frame_array = []
for path in tqdm(paths):
    img = cv2.imread(path)
    height, width, layers = img.shape
    frame_array.append(img)
out = cv2.VideoWriter(pathOut,cv2.VideoWriter_fourcc(*'DIVX'), fps, (width,height))
for i in tqdm(range(len(frame_array))):
    # writing to a image array
    out.write(frame_array[i])
out.release()