In [9]:
import sys
import os
import torch
import random
import numpy as np
import glob
import json
import cv2
import pandas as pd
from time import time
from PIL import Image
import torch.nn.functional as F
from torch import nn
from scipy.optimize import linear_sum_assignment
from filterpy.kalman import KalmanFilter

# --- IMPORT C√ÅC TH∆Ø VI·ªÜN C·ª¶A B·∫†N (ƒê·∫£m b·∫£o folder n·∫±m trong /code) ---
sys.path.append('/code')
# L∆∞u √Ω: C√°c d√≤ng import d∆∞·ªõi ƒë√¢y ph·ª• thu·ªôc v√†o folder code b·∫°n copy v√†o docker
try:
    from transformers import AutoImageProcessor, AutoModel
    from mobilesamv2.promt_mobilesamv2 import ObjectAwareModel
    from mobilesamv2 import SamPredictor
    from mobilesamv2.modeling import Sam
    from tinyvit.tiny_vit import TinyViT
    from mobilesamv2.modeling import PromptEncoder, MaskDecoder, TwoWayTransformer
except ImportError as e:
    print(f"‚ùå L·ªói Import: {e}. H√£y ki·ªÉm tra l·∫°i c·∫•u tr√∫c folder trong Docker.")

# --- H√ÄM SEED ---
def seed_everything(seed=42):
    random.seed(seed)
    os.environ['PYTHONHASHSEED'] = str(seed)
    np.random.seed(seed)
    torch.manual_seed(seed)
    torch.cuda.manual_seed(seed)
    torch.cuda.manual_seed_all(seed)
    torch.backends.cudnn.deterministic = True
    torch.backends.cudnn.benchmark = False

seed_everything(42)
print("‚úÖ ƒê√£ c·ªë ƒë·ªãnh Seed: 42")

‚úÖ ƒê√£ c·ªë ƒë·ªãnh Seed: 42


In [12]:
# ====================== C·∫§U H√åNH DOCKER ======================
class DockerConfig:
    def __init__(self, video_id="test"):
        # C·∫§U H√åNH PATH CHU·∫®N C·ª¶A BTC
        self.video_id = video_id
        self.data_base = "D:/code/detect/public_test/public_test/samples"           # Input folder
        self.results_base = "/result"             # Output folder
        self.segment_base = "/segment_objects" # Template folder (copy v√†o docker)

        self.video_path = os.path.join(self.data_base, video_id, "drone_video.mp4")
        
        # Template images (d√πng ƒë·ªÉ so s√°nh features)
        self.template_img_dir = os.path.join(self.segment_base, video_id, "original_images")
        self.template_mask_dir = os.path.join(self.segment_base, video_id, "mask_images")

        # MODEL PATHS (LOCAL - KH√îNG D√ôNG HUGGINGFACE HUB)
        self.dino_model_id = "./weight/DINO" 
        self.sam_checkpoint = './weight/mobile_sam.pt'
        self.yolo_model = './weight/ObjectAwareModel.pt'

        # THRESHOLDS
        self.SCORE_THRESHOLD = 0.50
        self.device = "cuda" if torch.cuda.is_available() else "cpu"
        self.conf_thres = 0.25
        self.min_area_ratio = 0.0005
        self.max_area_ratio = 0.15
        
        # TRACKING CONFIG
        self.IOU_THRESHOLD = 0.3
        self.MAX_AGE = 10
        self.MIN_HITS = 3

# ====================== HELPER CLASSES (COPY T·ª™ CODE B·∫†N) ======================

class DinoV3FeatureExtractor(nn.Module):
    def __init__(self, model_id, device):
        super().__init__()
        self.device = device
        # Load t·ª´ ƒë∆∞·ªùng d·∫´n local
        self.processor = AutoImageProcessor.from_pretrained(model_id, local_files_only=True)
        self.model = AutoModel.from_pretrained(model_id, local_files_only=True).to(device)
        self.model.eval()
        self.patch_size = getattr(self.model.config, "patch_size", 14)
        for param in self.model.parameters(): param.requires_grad = False

    def forward(self, img_pil):
        inputs = self.processor(images=img_pil, return_tensors="pt").to(self.device)
        pixel_values = inputs['pixel_values']
        h_img, w_img = pixel_values.shape[2], pixel_values.shape[3]
        h_grid = h_img // self.patch_size
        w_grid = w_img // self.patch_size
        num_patches = h_grid * w_grid

        with torch.no_grad():
            outputs = self.model(**inputs)
            
        last_hidden_state = outputs.last_hidden_state 
        tokens_no_cls = last_hidden_state[:, 1:, :]
        patch_tokens = tokens_no_cls[:, :num_patches, :]
        
        B, N, C = patch_tokens.shape
        if N != num_patches:
            patch_tokens = patch_tokens.permute(0, 2, 1).view(B, C, int(np.sqrt(N)), int(np.sqrt(N)))
            patch_tokens = F.interpolate(patch_tokens, size=(h_grid, w_grid), mode='bilinear')
            return patch_tokens

        feat_map = patch_tokens.reshape(B, h_grid, w_grid, C).permute(0, 3, 1, 2)
        return feat_map

class FFAProcessor:
    @staticmethod
    def apply_ffa(feat_map, mask):
        target_size = feat_map.shape[-2:]
        mask_resized = F.interpolate(mask.float(), size=target_size, mode='nearest')
        masked_feat = feat_map * mask_resized
        sum_feat = masked_feat.sum(dim=(2, 3))
        sum_mask = mask_resized.sum(dim=(2, 3)) + 1e-6
        return sum_feat / sum_mask

class SimilarityModel:
    def __init__(self, cfg):
        self.device = cfg.device
        self.extractor = DinoV3FeatureExtractor(cfg.dino_model_id, self.device)
        self.template_features = None

    def load_templates(self, img_dir, mask_dir):
        feats = []
        # Load 3 templates img_1, img_2, img_3
        for i in range(1, 4):
            img_path = os.path.join(img_dir, f"img_{i}.jpg")
            mask_path = os.path.join(mask_dir, f"img_{i}.png")
            if not (os.path.exists(img_path) and os.path.exists(mask_path)): continue
            img = Image.open(img_path).convert('RGB')
            mask = np.array(Image.open(mask_path).convert('L')) > 128
            feat = self.extract_features(img, mask.astype(np.float32))
            feats.append(feat)
            
        if not feats: return False # Tr·∫£ v·ªÅ False n·∫øu kh√¥ng load ƒë∆∞·ª£c template n√†o
        self.template_features = torch.stack(feats).to(self.device)
        self.template_features = F.normalize(self.template_features, p=2, dim=1)
        return True

    def extract_features(self, img_pil, mask_np):
        m = torch.from_numpy(mask_np).float().unsqueeze(0).unsqueeze(0).to(self.device)
        feat_map = self.extractor(img_pil)
        feat = FFAProcessor.apply_ffa(feat_map, m)
        return F.normalize(feat, p=2, dim=1).squeeze(0)

    def compute_scores(self, feat):
        sims = torch.matmul(feat.unsqueeze(0), self.template_features.T).squeeze(0)
        s1, s2, s3 = sims.cpu().numpy() if sims.shape[0] >= 3 else (0,0,0) # Handle tr∆∞·ªùng h·ª£p thi·∫øu template
        avg = sims.mean().item()
        best = sims.max().item()
        app_bonus = 0.7 * best + 0.3 * avg
        return s1, s2, s3, avg, app_bonus
    
    @staticmethod
    def size_penalty(bbox, area):
        w, h = bbox[2] - bbox[0], bbox[3] - bbox[1]
        ratio = (w * h) / area
        if ratio < 0.0001: s = 0.8
        elif ratio <= 0.001: s = 1.0
        elif ratio <= 0.01: s = 0.9
        elif ratio <= 0.05: s = 0.8
        else: s = 0.2
        return s

# --- SORT TRACKING UTILS ---
def iou_batch(bb_test, bb_gt):
    if bb_test.size == 0 or bb_gt.size == 0:
        return np.zeros((bb_test.shape[0], bb_gt.shape[0]))
    bb_gt = np.expand_dims(bb_gt, 0)
    bb_test = np.expand_dims(bb_test, 1)
    xx1 = np.maximum(bb_test[..., 0], bb_gt[..., 0])
    yy1 = np.maximum(bb_test[..., 1], bb_gt[..., 1])
    xx2 = np.minimum(bb_test[..., 2], bb_gt[..., 2])
    yy2 = np.minimum(bb_test[..., 3], bb_gt[..., 3])
    w = np.maximum(0., xx2 - xx1)
    h = np.maximum(0., yy2 - yy1)
    wh = w * h
    o = wh / ((bb_test[..., 2] - bb_test[..., 0]) * (bb_test[..., 3] - bb_test[..., 1]) 
        + (bb_gt[..., 2] - bb_gt[..., 0]) * (bb_gt[..., 3] - bb_gt[..., 1]) - wh)
    return o

class KalmanBoxTracker:
    count = 0
    def __init__(self, bbox):
        self.kf = KalmanFilter(dim_x=7, dim_z=4)
        self.kf.F = np.array([[1,0,0,0,1,0,0],[0,1,0,0,0,1,0],[0,0,1,0,0,0,1],[0,0,0,1,0,0,0],  
                              [0,0,0,0,1,0,0],[0,0,0,0,0,1,0],[0,0,0,0,0,0,1]], dtype=np.float32)
        self.kf.H = np.array([[1,0,0,0,0,0,0],[0,1,0,0,0,0,0],[0,0,1,0,0,0,0],[0,0,0,1,0,0,0]], dtype=np.float32)
        self.kf.P[4:,4:] *= 1000. 
        self.kf.P *= 10.
        self.kf.Q[-1,-1] *= 0.01
        self.kf.Q[4:,4:] *= 0.01
        self.kf.R[2:,2:] *= 10.
        self.kf.x[:4] = self.convert_bbox_to_z(bbox)
        self.time_since_update = 0
        self.id = KalmanBoxTracker.count
        KalmanBoxTracker.count += 1
        self.history = []
        self.hits = 0
        self.hit_streak = 0
        self.age = 0
        self.last_score = 0.0

    def update(self, bbox, score=0.0):
        self.time_since_update = 0
        self.history = []
        self.hits += 1
        self.hit_streak += 1
        self.kf.update(self.convert_bbox_to_z(bbox))
        self.last_score = score

    def predict(self):
        if((self.kf.x[6]+self.kf.x[2])<=0):
            self.kf.x[6] *= 0.0
        self.kf.predict()
        self.age += 1
        if(self.time_since_update>0):
            self.hit_streak = 0
        self.time_since_update += 1
        self.history.append(self.convert_x_to_bbox(self.kf.x))
        return self.history[-1]

    def get_state(self):
        return self.convert_x_to_bbox(self.kf.x)

    @staticmethod
    def convert_bbox_to_z(bbox):
        w = bbox[2] - bbox[0]
        h = bbox[3] - bbox[1]
        x = bbox[0] + w/2.
        y = bbox[1] + h/2.
        s = w * h
        r = w / float(h)
        return np.array([x, y, s, r]).reshape((4, 1))

    @staticmethod
    def convert_x_to_bbox(x, score=None):
        w = np.sqrt(x[2] * x[3])
        h = x[2] / w
        return np.array([x[0]-w/2., x[1]-h/2., x[0]+w/2., x[1]+h/2.]).reshape((1,4))

def associate_detections_to_trackers(detections, trackers, iou_threshold=0.3):
    if(len(trackers)==0):
        return np.empty((0,2),dtype=int), np.arange(len(detections)), np.empty((0,5),dtype=int)
    iou_matrix = iou_batch(detections, trackers)
    if min(iou_matrix.shape) > 0:
        a = (iou_matrix > iou_threshold).astype(np.int32)
        if a.sum(1).max() == 1 and a.sum(0).max() == 1:
            matched_indices = np.stack(np.where(a), axis=1)
        else:
            matched_indices = linear_sum_assignment(-iou_matrix)
            matched_indices = np.array(matched_indices).T
    else:
        matched_indices = np.empty((0,2))
    unmatched_detections = []
    for d, det in enumerate(detections):
        if(d not in matched_indices[:,0]): unmatched_detections.append(d)
    unmatched_trackers = []
    for t, trk in enumerate(trackers):
        if(t not in matched_indices[:,1]): unmatched_trackers.append(t)
    matches = []
    for m in matched_indices:
        if(iou_matrix[m[0], m[1]] < iou_threshold):
            unmatched_detections.append(m[0])
            unmatched_trackers.append(m[1])
        else:
            matches.append(m.reshape(1,2))
    if(len(matches)==0): matches = np.empty((0,2),dtype=int)
    else: matches = np.concatenate(matches,axis=0)
    return matches, np.array(unmatched_detections), np.array(unmatched_trackers)

# ====================== LOADING MODELS ======================
dummy_cfg = DockerConfig("Lifejacket_0")
print("‚è≥ ƒêang load models...")

# 1. Sim Model
sim_model = SimilarityModel(dummy_cfg)

# 2. YOLO
yolo = ObjectAwareModel(dummy_cfg.yolo_model)

# 3. SAM
sam = Sam(image_encoder=TinyViT(img_size=1024, in_chans=3, num_classes=1000,
                                embed_dims=[64,128,160,320], depths=[2,2,6,2],
                                num_heads=[2,4,5,10], window_sizes=[7,7,14,7]),
          prompt_encoder=PromptEncoder(embed_dim=256, image_embedding_size=(64,64),
                                       input_image_size=(1024,1024), mask_in_chans=16),
          mask_decoder=MaskDecoder(num_multimask_outputs=3,
                                   transformer=TwoWayTransformer(depth=2, embedding_dim=256, mlp_dim=2048, num_heads=8),
                                   transformer_dim=256))
sam.load_state_dict(torch.load(dummy_cfg.sam_checkpoint, map_location=dummy_cfg.device), strict=False)
sam.to(dummy_cfg.device).eval()
predictor = SamPredictor(sam)

print("‚úÖ Load xong to√†n b·ªô Model!")

‚è≥ ƒêang load models...
‚úÖ Load xong to√†n b·ªô Model!


In [None]:
# T√¨m t·∫•t c·∫£ folder test cases trong /data/samples
search_path = "D:/code/detect/public_test/public_test/samples/*"
test_cases = [os.path.basename(p) for p in glob.glob(search_path) if os.path.isdir(p)]
print(f"üîé T√¨m th·∫•y {len(test_cases)} videos: {test_cases}")

all_predicted_time = []
all_results_json = {} # L∆∞u k·∫øt qu·∫£ format c·ªßa BTC

for video_id in test_cases:
    print(f"‚ñ∂Ô∏è ƒêang x·ª≠ l√Ω: {video_id}")
    
    # --- B·∫ÆT ƒê·∫¶U T√çNH GI·ªú ---
    t1 = time()
    
    # Setup Config cho video hi·ªán t·∫°i
    cfg = DockerConfig(video_id)
    
    # Load Templates (N·∫øu th·∫•t b·∫°i th√¨ skip ho·∫∑c tr·∫£ v·ªÅ r·ªóng)
    templates_loaded = sim_model.load_templates(cfg.template_img_dir, cfg.template_mask_dir)
    
    cap = cv2.VideoCapture(cfg.video_path)
    W, H = int(cap.get(3)), int(cap.get(4))
    
    # Reset Tracker cho video m·ªõi
    trackers = []
    KalmanBoxTracker.count = 0
    video_predictions = [] # K·∫øt qu·∫£ c·ªßa video n√†y
    frame_idx = 0
    
    while True:
        ret, frame = cap.read()
        if not ret: break
        
        # Ch·ªâ x·ª≠ l√Ω n·∫øu load template th√†nh c√¥ng, n·∫øu kh√¥ng th√¨ tr·∫£ v·ªÅ empty
        best_obj = None
        if templates_loaded:
            rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
            
            # 1. YOLO Detect
            results = yolo(rgb, conf=cfg.conf_thres, verbose=False)
            boxes = results[0].boxes.xyxy.cpu().numpy() if results and results[0].boxes is not None else []
            
            high_score_candidates = []
            if len(boxes) > 0:
                predictor.set_image(rgb)
                for box in boxes:
                    x1, y1, x2, y2 = map(int, box)
                    if not (cfg.min_area_ratio <= (x2-x1)*(y2-y1)/(W*H) <= cfg.max_area_ratio): continue

                    masks, _, _ = predictor.predict(box=box, multimask_output=False)
                    if len(masks) == 0: continue
                    mask = masks[0]
                    
                    yy, xx = np.where(mask)
                    if len(yy) == 0: continue
                    y1b, y2b, x1b, x2b = yy.min(), yy.max()+1, xx.min(), xx.max()+1
                    crop = rgb[y1b:y2b, x1b:x2b]
                    crop_mask = mask[y1b:y2b, x1b:x2b]
                    
                    feat = sim_model.extract_features(Image.fromarray(crop), crop_mask)
                    s1, s2, s3, avg_match, app_bonus = sim_model.compute_scores(feat)
                    size_pen = sim_model.size_penalty([x1b, y1b, x2b, y2b], W * H)
                    final_score = 0.7 * avg_match + 0.25 * app_bonus + 0.05 * size_pen
                    
                    if final_score > cfg.SCORE_THRESHOLD:
                        high_score_candidates.append({
                            'bbox': np.array([x1b, y1b, x2b, y2b]), 
                            'score': final_score
                        })
            
            # 2. SORT Logic
            if len(high_score_candidates) > 0:
                dets_for_track = np.array([c['bbox'] for c in high_score_candidates])
            else:
                dets_for_track = np.empty((0, 4))
                
            trks_for_track = np.zeros((len(trackers), 4))
            to_del = []
            for t, trk in enumerate(trackers):
                pos = trk.predict()[0]
                trks_for_track[t, :] = [pos[0], pos[1], pos[2], pos[3]]
                if np.any(np.isnan(pos)): to_del.append(t)
            for t in reversed(to_del): trackers.pop(t)

            matched, unmatched_dets, unmatched_trks = associate_detections_to_trackers(dets_for_track, trks_for_track, iou_threshold=cfg.IOU_THRESHOLD)

            for m in matched:
                trackers[m[1]].update(dets_for_track[m[0]], score=high_score_candidates[m[0]]['score'])
            for i in unmatched_dets:
                trk = KalmanBoxTracker(dets_for_track[i])
                trk.update(dets_for_track[i], score=high_score_candidates[i]['score'])
                trackers.append(trk)

            # Get Best Tracker
            active_trackers = []
            for trk in trackers:
                 # Logic visualize/save: hit_streak >= MIN_HITS or frame ƒë·∫ßu
                 if (trk.time_since_update < 1) and (trk.hit_streak >= cfg.MIN_HITS or frame_idx <= 5): 
                    active_trackers.append({"bbox": trk.get_state()[0], "score": trk.last_score, "id": trk.id})
            
            if active_trackers:
                best_obj = max(active_trackers, key=lambda x: x['score'])
        
        # Save Result
        frame_res = {
            "frame_id": frame_idx,
            "box": best_obj['bbox'].astype(int).tolist() if best_obj else [],
            "score": float(best_obj['score']) if best_obj else 0.0,
            "track_id": int(best_obj['id']) if best_obj else -1
        }
        video_predictions.append(frame_res)
        frame_idx += 1
        
    cap.release()
    # --- K·∫æT TH√öC T√çNH GI·ªú ---
    t2 = time()
    
    predicted_time = int((t2 - t1) * 1000) # milliseconds
    
    # L∆∞u time
    all_predicted_time.append({"id": video_id, "answer": "processed", "time": predicted_time})
    
    # L∆∞u json prediction (Tu·ª≥ format BTC y√™u c·∫ßu, ·ªü ƒë√¢y l√† list c√°c object)
    all_results_json[video_id] = video_predictions

# --- GHI FILE OUTPUT ---
output_dir = "/result"
os.makedirs(output_dir, exist_ok=True)

# 1. Ghi file time_submission.csv [cite: 513]
df_time = pd.DataFrame(all_predicted_time)
# C·ªôt b·∫Øt bu·ªôc: id, answer, time
df_time.to_csv(os.path.join(output_dir, "time_submission.csv"), index=False)

# 2. Ghi file jupyter_submission.json [cite: 514]
# Format json cu·ªëi c√πng: {"video_id": ..., "predictions": ...} ho·∫∑c List, tu·ª≥ ƒë·ªÅ b√†i.
# D·ª±a tr√™n code c≈© c·ªßa b·∫°n th√¨ m·ªói video ra 1 file json, nh∆∞ng file n·ªôp t·ªïng h·ª£p th∆∞·ªùng g·ªôp l·∫°i.
# ·ªû ƒë√¢y m√¨nh dump dict t·ªïng h·ª£p.
with open(os.path.join(output_dir, "jupyter_submission.json"), 'w') as f:
    json.dump(all_results_json, f, indent=2)

print("\n‚úÖ HO√ÄN TH√ÄNH! K·∫øt qu·∫£ ƒë√£ l∆∞u t·∫°i /result/")
print(df_time)

üîé T√¨m th·∫•y 6 videos: ['BlackBox_0', 'BlackBox_1', 'CardboardBox_0', 'CardboardBox_1', 'LifeJacket_0', 'LifeJacket_1']
‚ñ∂Ô∏è ƒêang x·ª≠ l√Ω: BlackBox_0
‚ñ∂Ô∏è ƒêang x·ª≠ l√Ω: BlackBox_1
