## 숙지사항

>1. DeepSort를 사용함 (라이브러리 의존성 없게 코드 개선함)
2. YOLOv8 사용하도록 코드 개선 (기존에는 YOLOv3)
3. DeepSort의 내부 모델 일부를 뜯어내서 Similarity 모델로 사용
4. 가장 비슷한 사람 1명(GreenBox)과 그 외 비슷한 특징을 가진 N명(YellowBox)을 보여줌 (비슷한 특징을 가진 N명은 추 후 선택해서 변경할 수 있도록 할 예정)
5. sim_threshold 라는 하이퍼파라미터로 비슷한 특징의 기준 조절가능. 현재로써는 0.09가 적당해보이지만 0.085도 괜찮아보임.

In [None]:
target_image_name='test.png'
source_video_name='test.mp4'

In [None]:
sim_threshold=0.09

In [None]:
!pip3 install ultralytics imageio imageio[ffmpeg] imageio[pyav]

In [None]:
from deep_sort import nn_matching
from deep_sort.detection import Detection
from deep_sort.tracker import Tracker
from deep_sort import generate_detections as gdet

import cv2
import numpy as np
import tensorflow as tf
from tensorflow.keras.models import load_model
import os
import matplotlib.pyplot as plt
from ultralytics import YOLO
from pathlib import Path
import imageio.v3 as iio

In [None]:
det_model = YOLO("yolov8n.pt")

#image(frame)한장에 대해 객체 검출 후 crop 하는 함수
def detect_person(img, conf_threshold=0.5, ret_coord=False):
    result=det_model.cpu()(img, verbose=False)
    
    boxes_out=np.array([np.concatenate([np.array(box.xywh[0])[:2]-np.array(box.xywh[0])[2:]/2,np.array(box.xywh[0])[2:]]) for box in result[0].boxes if box.cls==0 and box.conf>=conf_threshold])
    conf_out=np.array([np.array(box.conf) for box in result[0].boxes if box.cls==0 and box.conf>=conf_threshold])
    
    boxes=np.array([np.array([box.xyxyn[0][1],box.xyxyn[0][0],box.xyxyn[0][3],box.xyxyn[0][2]]) for box in result[0].boxes if box.cls==0 and box.conf>=conf_threshold])
    
    _img=tf.repeat(img[None], len(boxes), axis=0)

    cropped_image=tf.image.crop_and_resize(_img, boxes, range(len(boxes)), (128,64))
    
    if ret_coord:
        return tf.image.crop_and_resize(_img, boxes, range(len(boxes)), (128,64)), boxes_out, conf_out
    else:
        return tf.image.crop_and_resize(_img, boxes, range(len(boxes)), (128,64))

In [None]:
# 비디오를 frames로 변환
frames=iio.imread(Path(source_video_name).read_bytes(), index=None)

In [None]:
#target image 차원 조정
img=iio.imread(target_image_name)[...,:3]
#target image에서 객체 crop - 어차피 1명.
target=detect_person(img)[0]

In [None]:
# tracking 할때 사용할, 가장 거리 작은 id 찾는 함수
def similar_id(dictionary, value, threshold=1e-5):
    r=None
    last=np.inf
    for k in dictionary.keys():
        if np.abs(value-dictionary[k])<last and np.abs(value-dictionary[k])<threshold:
            last=np.abs(value-dictionary[k])
            r=k
            
    return r

In [None]:
max_cosine_distance = 0.7
nn_budget = None

model_filename = 'mars-small128.pb'
encoder = gdet.create_box_encoder(model_filename, batch_size=1)
metric = nn_matching.NearestNeighborDistanceMetric("cosine", max_cosine_distance, nn_budget)
tracker = Tracker(metric)

#프레임별로 id와 bbox 좌표 담는 리스트
tracked=[]
# 트랙 ID 별로 프레임에서 검출된 이미지를 모아두는 딕셔너리.(ID가 key)
objects={}

for frame in frames:
    #frame(단일 이미지)에서 사람 객체 검출 후 저장.
    crops,boxes,scores = detect_person(frame,ret_coord=True)
    
    #객체 이름(사물 이름) person 통일
    names = ['person' for _ in range(len(boxes))]
    # 한 프레임내의 bbox들에 대해 특징벡터로 변환
    features=encoder(frame,boxes)
    
    # 각 객체의 정보를 담고 있음
    detections = [Detection(bbox, score, class_name, feature) for bbox, score, class_name, feature in zip(boxes, scores, names, features)]
    
    # 각 객체의 정보를 바탕으로 각 트랙(ID) 상태 업데이트
    tracker.predict()
    tracker.update(detections)
    
    
    # Obtain info from the tracks
    # 각 track별로 현재 프레임의 좌표 저장
    tracked_bboxes = []
    for track in tracker.tracks:
        if not track.is_confirmed() or track.time_since_update > 5:
            continue 
        bbox = track.to_tlbr() # Get the corrected/predicted bounding box
        class_name = track.get_class() #Get the class name of particular object
        tracking_id = track.track_id # Get the ID for the particular track
        tracked_bboxes.append(bbox.tolist() + [tracking_id]) # Structure data, that we could use it with our draw_bbox function
    # 각 track 별로 crop 후 object에 저장
    for t in tracked_bboxes:
        _t=np.array(t[:4])
        _t=np.concatenate([_t[:2][::-1],_t[2:][::-1]])
        cropped=tf.image.crop_and_resize(frame[None], np.array([_t[:4]])/(frame.shape[:2]+frame.shape[:2]), [0], (128,64))[0].numpy()
        
        if not t[4] in objects.keys(): objects[t[4]]=[]
        
        objects[t[4]].append(cropped)
    
    # 현재 프레임의 추적 결과를 tracked 리스트에 저장.
    tracked.append(tracked_bboxes)

In [None]:
objects_sim={}
for i in tuple(objects.keys()):
    obs=objects[i]
    
    ret=[]
    for o in obs:
        feature1=encoder(np.array(target),[[0,0,64,128]])
        feature2=encoder(np.array(o),[[0,0,64,128]])
        ret.append(np.mean((feature1-feature2)**2)**0.5)
    # id 별로 유사도 저장.
    objects_sim[i]=ret
    
    
    

In [None]:
candidates={}
for k in objects_sim.keys():
    perc=np.percentile(objects_sim[k],10)
    #각 id 트랙의 하위10퍼센트 가지고만 평균
    sim=np.mean(np.array(objects_sim[k])[objects_sim[k]<=perc])
    
    #그러면 이제, 각 id 트랙별로 유사도 값은 1개 나옴. 그게 threshold보다 작으면 후보로 선정.
    #기존에는 후보들 중에서 가장 거리가 적은게 picked 됐었는데 이걸 앙상블 기법을 이용해야 함.
    if sim<=sim_threshold:
        candidates[k]=sim
        
        
        
        

In [None]:

def ensemble_similar_id(candidates_dicts, method="voting"):
    """
    앙상블 기법으로 최종 picked_id 결정
    :param candidates_dicts: 각 target별 candidates 딕셔너리 리스트
    :param method: 'voting' 또는 'mean' 지원
    :return: 최종 picked_id
    """
    import numpy as np

    all_ids = []
    for candidates in candidates_dicts:
        picked_id = similar_id(candidates, np.sort(list(candidates.values()))[0])
        all_ids.append(picked_id)

    if method == "voting":
        # 다수결로 ID 결정 (원핫 인코딩 + argmax 사용)
        unique_ids = list(set(all_ids))  # 고유 ID 추출
        id_to_index = {uid: idx for idx, uid in enumerate(unique_ids)}  # ID를 인덱스로 매핑

        # 원핫 벡터 초기화
        one_hot_counts = np.zeros(len(unique_ids), dtype=int)

        # 원핫 벡터에 등장 횟수 누적
        for pid in all_ids:
            one_hot_counts[id_to_index[pid]] += 1

        # argmax로 가장 많이 등장한 ID 선택
        final_id = unique_ids[np.argmax(one_hot_counts)]
    elif method == "mean":
        # 평균 유사도가 가장 낮은 ID
        mean_scores = {pid: np.mean([cand.get(pid, float("inf")) for cand in candidates_dicts]) for pid in all_ids}
        final_id = min(mean_scores, key=mean_scores.get)
    else:
        raise ValueError("지원하지 않는 method: 'voting' 또는 'mean'을 선택하세요.")

    return final_id



target_image_paths = ["/root/jupyter/Final_MP/deepsort_gwanghee/MP/MP/test.png"]
# 수정된 main loop
target_images = [iio.imread(image_path)[..., :3] for image_path in target_image_paths]  # 여러 target 이미지 로드
candidates_list = []

for target in target_images:
    # 각 target에 대해 유사도 계산
    objects_sim = {}
    for i in tuple(objects.keys()):
        obs = objects[i]
        ret = []
        for o in obs:
            feature1 = encoder(np.array(target), [[0, 0, 64, 128]])
            feature2 = encoder(np.array(o), [[0, 0, 64, 128]])
            ret.append(np.mean((feature1 - feature2) ** 2) ** 0.5)
        objects_sim[i] = ret

    candidates = {}
    for k in objects_sim.keys():
        perc = np.percentile(objects_sim[k], 10)
        sim = np.mean(np.array(objects_sim[k])[objects_sim[k] <= perc])
        
        #threshold수정(실험)
        if sim <= sim_threshold:
            candidates[k] = sim

    candidates_list.append(candidates)

# 앙상블 기법으로 최종 ID 결정
picked_id = ensemble_similar_id(candidates_list, method="voting")

# 이후 picked_id를 사용한 기존 출력 로직 유지
idx = picked_id
n = np.argmin(objects_sim[idx])

print("Target image:")
plt.imshow(target_images[0] / 255)  # 예시: 첫 target
plt.show()

print("Found target:")
plt.imshow(objects[idx][n] / 255)
plt.show()

# 비디오 출력 (기존 로직 유지)


In [None]:
idx=picked_id
n=np.argmin(objects_sim[idx])

print("Target image:")
plt.imshow(target/255)
plt.show()

print("Found target:")
plt.imshow(objects[idx][n]/255)
plt.show()

In [None]:
fourcc = cv2.VideoWriter_fourcc(*'DIVX')
out = cv2.VideoWriter('output.avi', fourcc, 29.7, frames.shape[1:3][::-1])

out_frames=[]

for i in range(len(frames)):
    _frame=frames[i].copy()
    for t in tracked[i]:
        x1, y1, x2, y2, pid = t

        bbox_color=(255,0,0)
        if pid==picked_id:
            bbox_color=(0,255,0)
        elif pid in candidates_list:
            bbox_color=(255,255,0)

        cv2.rectangle(_frame, (int(x1), int(y1)), (int(x2), int(y2)), bbox_color, 2)
        
    out.write(_frame[...,::-1])
    out_frames.append(_frame)
out.release()

In [None]:
# !pip install tensorflow==2.13.0 numpy==1.24.3




In [None]:
# import os

# #def replace_np_float(directory):
#     for root, _, files in os.walk(directory):
#         for file in files:
#             if file.endswith(".py"):
#                 filepath = os.path.join(root, file)
#                 with open(filepath, "r") as f:
#                     content = f.read()
#                 content = content.replace("np.float", "float")  # 또는 "np.float64"
#                 with open(filepath, "w") as f:
#                     f.write(content)

# # 수정할 프로젝트 디렉토리 경로를 입력
# replace_np_float("/root/jupyter/Final_MP/deepsort_gwanghee/MP/MP")


In [None]:
import cv2

# 동영상 파일 경로
video_path = "./output.avi"

# 동영상 파일 열기
cap = cv2.VideoCapture(video_path)

if not cap.isOpened():
    print("Error: Could not open video.")
    exit()

# 동영상 재생
while True:
    ret, frame = cap.read()
    if not ret:  # 더 이상 프레임이 없으면 종료
        print("End of video.")
        break

    # 프레임 보여주기
    cv2.imshow("Video Playback", frame)

    # 'q'를 누르면 종료
    if cv2.waitKey(25) & 0xFF == ord('q'):
        break

# 자원 해제
cap.release()
cv2.destroyAllWindows()
