In [1]:
!pip install -q groundingdino-py supervision opencv-python-headless pillow matplotlib numpy open_clip_torch



[notice] A new release of pip is available: 25.0.1 -> 25.3
[notice] To update, run: python.exe -m pip install --upgrade pip


In [2]:
import torch
import numpy as np
import cv2
from PIL import Image
import matplotlib.pyplot as plt
from tqdm import tqdm
import supervision as sv
import os

# CLIP
import open_clip

# GroundingDINO
from groundingdino.util.inference import Model

DEVICE = "cuda" if torch.cuda.is_available() else "cpu"
print("Using device:", DEVICE)


  from .autonotebook import tqdm as notebook_tqdm


Using device: cpu




In [4]:
import os
import urllib.request

os.makedirs("groundingdino_weights", exist_ok=True)

config_url = "https://raw.githubusercontent.com/IDEA-Research/GroundingDINO/main/groundingdino/config/GroundingDINO_SwinT_OGC.py"
weights_url = "https://huggingface.co/ShilongLiu/GroundingDINO/resolve/main/groundingdino_swint_ogc.pth"

config_path = "groundingdino_weights/GroundingDINO_SwinT_OGC.py"
weights_path = "groundingdino_weights/groundingdino_swint_ogc.pth"

print("Downloading config file...")
urllib.request.urlretrieve(config_url, config_path)

print("Downloading weights file...")
urllib.request.urlretrieve(weights_url, weights_path)

print("Download complete!")
print("Config saved at:", config_path)
print("Weights saved at:", weights_path)


Downloading config file...
Downloading weights file...
Download complete!
Config saved at: groundingdino_weights/GroundingDINO_SwinT_OGC.py
Weights saved at: groundingdino_weights/groundingdino_swint_ogc.pth


In [6]:
from groundingdino.util.inference import Model

dino = Model(
    model_config_path="groundingdino_weights/GroundingDINO_SwinT_OGC.py",
    model_checkpoint_path="groundingdino_weights/groundingdino_swint_ogc.pth",
    device="cpu"
)

print("GroundingDINO loaded successfully!")


final text_encoder_type: bert-base-uncased
GroundingDINO loaded successfully!


In [7]:
clip_model, _, clip_preprocess = open_clip.create_model_and_transforms(
    "ViT-B-32", pretrained="openai"
)
tokenizer = open_clip.get_tokenizer("ViT-B-32")

clip_model = clip_model.to(DEVICE)
clip_model.eval()




CLIP(
  (visual): VisionTransformer(
    (conv1): Conv2d(3, 768, kernel_size=(32, 32), stride=(32, 32), bias=False)
    (patch_dropout): Identity()
    (ln_pre): LayerNorm((768,), eps=1e-05, elementwise_affine=True)
    (transformer): Transformer(
      (resblocks): ModuleList(
        (0-11): 12 x ResidualAttentionBlock(
          (ln_1): LayerNorm((768,), eps=1e-05, elementwise_affine=True)
          (attn): MultiheadAttention(
            (out_proj): NonDynamicallyQuantizableLinear(in_features=768, out_features=768, bias=True)
          )
          (ls_1): Identity()
          (ln_2): LayerNorm((768,), eps=1e-05, elementwise_affine=True)
          (mlp): Sequential(
            (c_fc): Linear(in_features=768, out_features=3072, bias=True)
            (gelu): GELU(approximate='none')
            (c_proj): Linear(in_features=3072, out_features=768, bias=True)
          )
          (ls_2): Identity()
        )
      )
    )
    (ln_post): LayerNorm((768,), eps=1e-05, elementwise_affine

In [8]:
def normalize(v):
    return v / (np.linalg.norm(v) + 1e-10)

def cosine(a, b):
    return float(np.dot(a,b) / (np.linalg.norm(a)*np.linalg.norm(b)))

def pil_from_bgr(frame):
    return Image.fromarray(cv2.cvtColor(frame, cv2.COLOR_BGR2RGB))

@torch.no_grad()
def encode_image(pil_img):
    img = clip_preprocess(pil_img).unsqueeze(0).to(DEVICE)
    emb = clip_model.encode_image(img)
    emb = emb / emb.norm(dim=-1, keepdim=True)
    return emb[0].cpu().numpy()

@torch.no_grad()
def encode_text(text):
    tokens = tokenizer([text]).to(DEVICE)
    emb = clip_model.encode_text(tokens)
    emb = emb / emb.norm(dim=-1, keepdim=True)
    return emb[0].cpu().numpy()

def fuse_embeddings(img_emb=None, txt_emb=None):
    if img_emb is not None and txt_emb is not None:
        return normalize(img_emb + txt_emb)
    if img_emb is not None:
        return img_emb
    if txt_emb is not None:
        return txt_emb
    raise ValueError("No query provided")


In [19]:
def detect_with_groundingdino(frame, text_query):
    from PIL import Image
    import cv2

    if frame is None:
        return None

    pil_img = Image.fromarray(cv2.cvtColor(frame, cv2.COLOR_BGR2RGB))

    detections = dino.predict_with_caption(
        image=pil_img,
        caption=text_query,
        box_threshold=0.3,
        text_threshold=0.25
    )

    return detections


In [16]:
def match_frame_dino(frame, query_emb, text_query, sim_threshold=0.28):
    
    detections = detect_with_groundingdino(frame, text_query)
    boxes = detections.xyxy

    similarities = []
    for box in boxes:
        x1, y1, x2, y2 = map(int, box)
        crop = frame[y1:y2, x1:x2]
        crop_pil = pil_from_bgr(crop)

        emb = encode_image(crop_pil)
        sim = cosine(emb, query_emb)
        similarities.append(sim)

    return boxes, similarities


In [20]:
def search_in_video_dino(
        video_path,
        output_path,
        text_query=None,
        image_query_path=None,
        sim_threshold=0.28,
        frame_skip=1):

    # Build QUERY embedding
    txt_emb = encode_text(text_query) if text_query else None
    img_emb = encode_image(Image.open(image_query_path)) if image_query_path else None
    query_emb = fuse_embeddings(img_emb, txt_emb)

    # Open video
    cap = cv2.VideoCapture(video_path)
    print(cap.isOpened())
    fps = cap.get(cv2.CAP_PROP_FPS)
    W = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
    H = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
    total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))

    # Output writer
    fourcc = cv2.VideoWriter_fourcc(*"mp4v")
    out = cv2.VideoWriter(output_path, fourcc, fps, (W, H))

    results = []

    for frame_id in tqdm(range(total_frames)):
        ret, frame = cap.read()
        if not ret: break

        
        if frame_id % frame_skip != 0:
            out.write(frame); continue

        boxes, similarities = match_frame_dino(
            frame, query_emb, text_query=text_query, sim_threshold=sim_threshold
        )

        # Draw boxes
        for box, sim in zip(boxes, similarities):
            x1,y1,x2,y2 = map(int, box)
            color = (0,255,0) if sim > sim_threshold else (0,0,255)
            cv2.rectangle(frame, (x1,y1), (x2,y2), color, 2)
            cv2.putText(frame, f"{sim:.2f}", (x1,y1-8),
                        cv2.FONT_HERSHEY_SIMPLEX, 0.6, color, 2)

            if sim > sim_threshold:
                timestamp = frame_id / fps
                results.append((frame_id, timestamp, sim, (x1,y1,x2,y2)))

        out.write(frame)

    cap.release()
    out.release()

    print("Results saved to:", output_path)
    return results


In [21]:
results = search_in_video_dino(
    "input_2.mp4",
    "output_dino_carwheel.mp4",
    text_query="handbag"
)


True


  0%|          | 0/1317 [00:00<?, ?it/s]


error: OpenCV(4.12.0) :-1: error: (-5:Bad argument) in function 'cvtColor'
> Overload resolution failed:
>  - src is not a numpy array, neither a scalar
>  - Expected Ptr<cv::UMat> for argument 'src'


In [None]:
query_img = Image.open("wheel.jpg")
img_emb = encode_image(query_img)
text_query = "wheel"

results = search_in_video_dino(
    "input.mp4",
    "output_dino_wheelimg.mp4",
    text_query=text_query,
    image_query_path="wheel.jpg"
)


In [None]:
results = search_in_video_dino(
    "input.mp4",
    "output_dino_fusion.mp4",
    text_query="blue car",
    image_query_path="wheel.jpg"
)
