In [1]:
import sys
sys.path.append('/DATA_17/ij/trt_inference/make_trt')
import make_onnx

In [None]:
import time
import torch
from torch.backends import cudnn
from matplotlib import colors
import cv2 as cv
import numpy as np
import os
import statistics
from skimage import io, draw
from loguru import logger
import torchvision
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F
from sklearn.preprocessing import MinMaxScaler
import yolox_detector as dt
import classification as cls
import convert
import copy
import logging
logger = logging.getLogger()
logger.setLevel(logging.INFO)

torch.cuda.init()


#categoty_name 기준으로 dict 만들어 주기
# 예시 : categoty_mAP = {"helmet" : {'TP':0,'FP':0,'FN':0,}, "head" : {'TP':0,'FP':0,'FN':0,}}
def making_mAP_dict(categoty_name):
    c_values = categoty_name.values()
    categoty_mAP = {}
    for i in c_values:
        categoty_mAP[i] = dict()
        categoty_mAP[i]['TP'] = 0 
        categoty_mAP[i]['FP'] = 0 
        categoty_mAP[i]['FN'] = 0 
    return categoty_mAP

#detector box값 도출 해주는것 
def visual(output, ratio, cls_conf=0.35):
    #아무런 output이 나오지 않을경우 
    if output is None:
        return None, None, None
    output = output.cpu()
    bboxes = output[:, 0:4]
    # preprocessing: resize
    bboxes /= ratio
    cls = output[:, 6] #현재 이미지에서 잡힌 객체들의 번호 차 = 2번 , 신호등 = 9 번 
    scores = output[:, 4] * output[:, 5] #잡힌 객체들의 신뢰도 수준 
    original_bboxes = []
    original_cls = []
    original_scores = []
    for i in range(len(bboxes)):
        box = bboxes[i]
        cls_id = int(cls[i])
        score = scores[i]
        
        x_min = int(box[0])
        y_min = int(box[1])
        x_max = int(box[2])
        y_max = int(box[3])
        
        #사람 
        if cls_id == 0 and cls_conf < score :
            original_cls.append(cls_id)
            original_scores.append(score)
            original_bboxes.append([x_min,y_min,x_max,y_max])

    #output은 나왔지만 차량에 해당 하는 output이 없을경우 (사람만 포착이 되었을경우)     
    if len(original_bboxes) == 0 :
        return None, None, None

    return original_bboxes, original_cls, original_scores

#디텍션에서 나온 바운딩 박스값 잘라서 분류기 넣어 주기 위한것 
def crop_detect(frame,bboxes,scores):
    cut_img = []
    for i in range(len(bboxes)):
        box = bboxes[i]
        score = scores[i]
        if score < 0.5:
            continue
        x0 = abs(int(box[0]))
        y0 = abs(int(box[1]))
        x1 = abs(int(box[2]))
        y1 = abs(int(box[3]))
        img = frame[y0:y1,x0:x1]
        
        cut_img.append(img)
    return cut_img


#분류기용 자르기 (정답지에서 바운딩 박스값 잘라서 분류기 넣어 주기 위한것 
def crop_cls(frame,num_frame):
    cut_img_list = [] 
    box_list = []
    cnt = 0
    if num_frame in correct_json_key:
        corr = correct_json[num_frame]
        c_keys= corr.keys()
        for category in c_keys:
            cnt +=1 
            for idx in corr[category]:
                x_min = abs(idx[0])
                y_min = abs(idx[1])
                x_max = abs(idx[2])
                y_max = abs(idx[3])
                box_list.append([x_min,y_min,x_max,y_max])
                cut_frame = frame[y_min:y_max,x_min:x_max]
                cut_img_list.append(cut_frame)
    else : 
        cut_img_list = None
        box_list = None
    return cut_img_list, box_list


#바운딩 박스 그려주기 (영상 저장 하고 싶을때 )
def vis(frame, boxes,cls_output):
    img = frame
    for i in range(len(boxes)):
        box = boxes[i]
        pred = int(cls_output[i][0])
        x0 = int(box[0])
        y0 = int(box[1])
        x1 = int(box[2])
        y1 = int(box[3])

        if pred == 0 :
            category = categoty_name[0]
        if pred == 1 :
            category = categoty_name[1]
        if pred == 2 :
            category = categoty_name[2]
        color = [(0,0,255),(0,255,0),(200,200,255),(100,100,255)]
        text = '{}'.format(f'{category}')
        txt_color = (255, 255, 255)
        font = cv.FONT_HERSHEY_SIMPLEX

        txt_size = cv.getTextSize(text, font, 0.2, 1)[0]
        cv.rectangle(img, (x0, y0), (x1, y1), color[pred], 2)

        txt_bk_color = (0, 0, 0)
        cv.rectangle(
            img,
            (x0, y0 + 1),
            (x0 + txt_size[0] + 1, y0 + int(1.5*txt_size[1])),
            txt_bk_color,
            -1
        )
        cv.putText(img, text, (x0, y0 + txt_size[1]), font, 0.4, txt_color, thickness=1)


    return img


#예측한 json 저장해주기 
def save_pred_json(save_path,pred_json):
    file_path = save_path
    with open(file_path, 'w') as outfile:
        json.dump(pred_json, outfile)


# json 비교를 위한 흐름 코드 
def comparison(predict_json):
    #mAP dict 만들어주기 
    categoty_mAP = making_mAP_dict(categoty_name)
    correct_json_key = correct_json.keys()
    
    #카테고리별 mAP값 프레임단위로 돌면서 계산해주기 
    for frame_nb in correct_json_key:
        categoty_mAP = update_mAP(frame_nb,correct_json,predict_json,categoty_mAP)
    
    #결과 도출 
    for cate in categoty_mAP.keys():
        TP = categoty_mAP[cate]['TP']
        FP = categoty_mAP[cate]['FP']
        FN = categoty_mAP[cate]['FN']
        if not TP ==0 :
            precision = TP/(TP+FP)
            recall = TP/(TP+FN)
            accuracy = TP/(TP+FN+FP)
            F1_score = 2*(precision*recall/precision+recall)
            print(f'{cate}_accuracy : ',accuracy)
            print(f'{cate}_F1_score : ',F1_score)
            print(f'{cate}_recall : ',recall)
            print(f'{cate}_precision : ',precision)
            print(f'categoty_mAP[{cate}]',categoty_mAP[cate])
        else : 
            print('영상내 등장하는 카테고리 : ',c_category)
            print(f'{cate} TP is 0 ',categoty_mAP)
            print(f'{cate}_FP : 예측은{cate} 실제론 다른것 : ',FP)

#mAP 값 업데이트를 하기 위한 계산 (혼동행렬 계산)
def update_mAP(frame_nb,correct_json,predict_json,categoty_mAP):

    corr = correct_json[frame_nb] #해당 프레임의 정답 객체 
    predict_json_key = predict_json.keys()
    
    if frame_nb in predict_json_key :
        pred = predict_json[frame_nb] #해당 프레임의 예측 객체
        c_key = corr.keys()
        p_key = pred.keys()

        k_intersection = p_key&c_key #교집합

        differnet_keys = (p_key|c_key) - k_intersection  #차집합

        #교집합인 카테고리 돌면서 점수 올려주기 
        for category in k_intersection:
            best_iou = calculate_iou(corr[category], pred[category]) #iou 계산
            categoty_mAP[category]['FN'] += abs(len(best_iou) - len(corr[category])) #실제로 0번을 다른것으로 예측한것 
            categoty_mAP[category]['FP'] += abs(len(best_iou) - len( pred[category])) #예측이 0인데 실제론 다른것
            categoty_mAP[category]['TP'] += len(best_iou) #정답인것 

        #차집합이 존재
        if differnet_keys != None and differnet_keys in categoty_name.values(): 
            print(differnet_keys)
            #카테고리 돌기
            for i in differnet_keys : 
                #차집합이 정답지에 포함 될경우 
                if i in c_key and i in categoty_name.values():
                    categoty_mAP[i]['FN'] += len(corr[i])
                #차집합이 예상지에만 있을 경우 
                elif i in p_key and i in categoty_name.values():
                    categoty_mAP[i]['FP'] += len(pred[i])
                else : pass

    #정답지에 있는 프레임이 예측값에는 없을경우 
    elif frame_nb not in predict_json_key :
        inter_key = categoty_name.values()&corr.keys() #정답지 라벨과 지정 라벨의 교집합만 돔
        for category in inter_key:
            categoty_mAP[category]['FN'] += len(correct_json[frame_nb][category])
    
    return categoty_mAP



#IOU 계산
def IoU(box1, box2):
    # box = (x1, y1, x2, y2)
    #cbox 
    
    box1_area = (box1[2] - box1[0] + 1) * (box1[3] - box1[1] + 1)
    box2_area = (box2[2] - box2[0] + 1) * (box2[3] - box2[1] + 1)

    # obtain x1, y1, x2, y2 of the intersection
    x1 = max(box1[0], box2[0])
    y1 = max(box1[1], box2[1])
    x2 = min(box1[2], box2[2])
    y2 = min(box1[3], box2[3])

    # compute the width and height of the intersection
    w = max(0, x2 - x1 + 1)
    h = max(0, y2 - y1 + 1)

    inter = w * h
    iou = inter / (box1_area + box2_area - inter)
#     print('c_box, p_box,iou',box1,box2,iou)
    if iou < 0.5 :
        return None
    return float(iou)                    

            
            
#박스 하나하나 비교하면 iou가 가장 높은 박스값 도출                     
def calculate_iou(correct, predict):
    #정답 박스와 예측한 박스 하나하나를 비교해 가면서 최상의 iou 값을 구하고 젤 높은 iou값의 바운딩 박스 쌍을 리스트로 뽑아주기 
    best = []
    for c_box in correct:
        best_iou = []
        for p_box in predict:
            iou = IoU(c_box, p_box)
            if iou == None : 
                pass
            elif len(best_iou) == 0  : 
                best_iou = list([[c_box, p_box],[iou]])
            elif iou > best_iou[1][0] :
                best_iou = list([[c_box, p_box],[iou]])
        if not len(best_iou) == 0 : 
            best.append(best_iou)
    
    #에측의 박스보다 매칭 박스가 많을떄는 예측박스가 두개의 정답지에 매칭되었기 떄문에 함수에서 변경해주기 
    if len(best) > len(predict) : 
        return compare_box(best)
    
    return best

#정답지 박스가 예측 박스보다 많을때 같은 박스 매칭하는걸 방지 
def compare_box(best):
    best_copy = copy.deepcopy(best)
    for i in range(len(best)):
        for j in range(i+1,len(best)): 
            if best[i][0][1] ==  best[j][0][1] and best[i][1][0] > best[j][1][0] :
                best_copy.remove(best[j])
    return best_copy   



def Classification(frame,num_frame,cls,predict_json):
    cut_img_list, box_list = crop_cls(frame,num_frame)
    #결과값 받아오기 #[unique,pred[0],category]
    if cut_img_list == None:
        predict_json[num_frame] = None
        result_frame = frame
    else :                 
        input_data = cls.preprocess_multi(cut_img_list)

        #inference
        cls_output = cls.inference(input_data)
        cls_output = cls.postprocess(cls_output)
        #예측값으로 영상 도출 해주기 
        result_frame = vis(frame, box_list,cls_output,None)
        result_frame = frame
        for idx, box in zip(predict,box_list):
            if idx in categoty_name.keys():
                category = categoty_name[idx]
                if not num_frame in predict_json :
                    predict_json[num_frame] = dict()
                if not category in predict_json[num_frame] :
                    predict_json[num_frame][category] = list()
                predict_json[num_frame][category].append(box)
    return result_frame, predict_json

     

def Detection(frame,num_frame,yd,cls,predict_json):
#     print(frame.shape)
    ydd = make_onnx.YoloDetector()
    ydd.load()
#     input_data = yd.preprocess([frame])
    

#     #detect 완료후 결과 받기
#     output_data = yd.inference(input_data)
    
#     #postprocess
#     outputs = yd.postprocess(output_data, class_agnostic=True)
    frame = np.reshape(frame, (3,640,640))
    frame = torch.Tensor([frame]).cuda()
    print(frame.shape)
    outputs = ydd.model(frame)

    ratio = min(640 / frame.shape[0], 640 / frame.shape[1])
    
    #특정 클래스만 받아오기 
    bboxes, class_id, scores = visual(outputs[0], ratio, 0.5)

    #박스 그리기 
    if not bboxes == None :
        cut_img_list = crop_detect(frame,bboxes,scores)

        if cut_img_list == None:
            predict_label[now_frame] = None
            result_frame = frame 
        else :    
            input_data = cls.preprocess(cut_img_list)
    
            #inference
            cls_output = cls.inference(input_data)
            cls_output = cls.postprocess(cls_output)
            cls_output = cls_output[0]

            #예측값으로 영상 도출 해주기 
#             result_frame = vis(frame, bboxes,cls_output)
            result_frame = frame
        if not num_frame in predict_json:
            predict_json[num_frame] = dict()

        #예측값 돌면서 predict_json 만들어주기 
        for i in range(cls_output.shape[0]):
            pred = int(cls_output[i][0])
            box = bboxes[i]
            #원하는 라벨만 json 만들어 주기 원하는 라벨은 categoty_name에 명시하기 
            if pred in categoty_name.keys() :
                category = categoty_name[pred]
                if not category in predict_json[num_frame] :
                    predict_json[num_frame][category] = list()
                predict_json[num_frame][category].append(box)
            else : pass 
    else : result_frame = frame 
        
    return result_frame, predict_json



class Model:
    def __init__(self):
#         self.trt_engine_path = '/DATA_17/trt_test/engines/yoloxm_fp16_test/yoloxm_fp16_016.trt'
        self.trt_engine_path2 = '/DATA_17/trt_test/engines/suit_fp16_0728/suit_best_ij_fp16_064.trt'
#         self.yd = dt.module_load(logger)
        self.cls = cls.module_load(logger)
#         self.yd.load(self.trt_engine_path)
        self.yd = 1
        self.cls.load(self.trt_engine_path2)
        
    #비디오 읽고, 전체적인 코드 실행 
    def proc(self,video_path, output_file):
        cap = cv.VideoCapture(video_path)
        width = cap.get(cv.CAP_PROP_FRAME_WIDTH)  # float
        height = cap.get(cv.CAP_PROP_FRAME_HEIGHT)  # float
        fps = cap.get(cv.CAP_PROP_FPS)

        save_path = output_file
        logger.info(f"video save_path is {save_path}")
        vid_writer = cv.VideoWriter(
            save_path, cv.VideoWriter_fourcc(*"mp4v"), fps, (int(width), int(height))
        )

        predict_json = {}
        print('fps',fps)

        cnt = 0

        if cap.isOpened() :    
            while True:
                cnt+=1
                #원본이미지 = frame 
                ret, frame = cap.read()
                if ret == False :
                    break
                if cnt < 0:
                    continue
                frame = cv.resize(frame, dsize=(640,640))

    #             프레임수 특정값 이상일떄는 break 할때 
    #             if cnt > 1:
    #                 break
                #현재 프레임
                num_frame = round(cap.get(cv.CAP_PROP_POS_FRAMES)) - 1
            
                detection = True
                classification = False

                if detection  :
                    result_frame, predict_json =Detection(frame,num_frame,self.yd,self.cls,predict_json)
#                     vid_writer.write(result_frame)
#                     ch = cv.waitKey(1)
#                     if ch == 27 or ch == ord("q") or ch == ord("Q"):
#                         break

                #분류기 진행 
                if classification :
                    result_frame, predict_json = Classification(frame,num_frame,self.cls,predict_json)
#                     vid_writer.write(result_frame)
#                     ch = cv.waitKey(1)
#                     if ch == 27 or ch == ord("q") or ch == ord("Q"):
#                         break
        #정답지와 예상지 비교해주기 
        comparison(predict_json)

    #json 저장 하고싶을때 
    #     save_pred_json('/DATA/source/ij/injung/pred.json',predict_json)

    #############################################################################################################



intput_path = './input/worker_val_1.mp4'
output_path = './output/worker_val_1.mp4'



# 컨버트 완료된 json 불러오기(정답지) 

#컨버트 완료된 json 불러오기(정답지) 
data_path = './json/worker_val_1.json'
correct_json, c_category = convert.json_convert(data_path)
correct_json_key = correct_json.keys()
# categoty_name = {0 : "helmet", 1 : "head", 2 : "error"} 
categoty_name = {0 : "worker", 1 : "non_worker", 2 : 'error'} 
m = Model()

m.proc(intput_path,output_path)

