In [1]:
import cv2
from moviepy.editor import VideoFileClip
import time
import base64
import os
from openai import OpenAI 

from IPython.display import Image, display, Audio, Markdown


In [2]:
MODEL="gpt-4o"
# MODEL="gpt-4-turbo"
openai_api_key = "sk-proj-vq7xTCAdU9d2V0HKp55jT3BlbkFJBTOBsGNVO9ykIuxH5ZrJ"
client = OpenAI(api_key=openai_api_key)

## Importing Grounding dino

In [7]:
import tempfile
from typing import Dict, List, Union
import numpy as np
from dds_cloudapi_sdk import (
    DetectionTask,
    Client,
    Config,
    TextPrompt,
    DetectionModel,
    DetectionTarget,
)
from PIL import Image
import concurrent.futures

class GroundingDINOAPIWrapper:
    """API wrapper for Grounding DINO 1.5

    Args:
        token (str): The token for Grounding DINO 1.5 API. We are now opening free API access to Grounding DINO 1.5. For
            educators, students, and researchers, we offer an API with extensive usage times to
            support your educational and research endeavors. You can get free API token at here:
            https://deepdataspace.com/request_api

    """

    def __init__(self, token: str):
        self.client = Client(Config(token=token))

    def inference(self, prompt: Dict, return_mask:bool=False):
        """Main inference function of Grounding DINO 1.5. We take batch as input and
        each image is a dict. N. We do not support batch inference for now.

        Args:
            prompts (dict): Annotations with the following keys:
                - "image" (str): Path to image. E.g. "test1.jpg",
                - "prompt" (str): Text prompt sepearted by '.' E.g. 'cate1 . cate2 . cate3'
            return_mask (bool): Whether to return mask. Defaults to False.

        Returns:
            (Dict): Detection results in dict format with keys::
                - "scores": (List[float]): A list of scores for each object in the batch
                - "labels": (List[int]): A list of labels for each object in the batch
                - "boxes": (List[List[int]]): A list of boxes for each object in the batch,
                     in format [xmin, ymin, xmax, ymax]
                - "masks": (List[np.ndarray]): A list of segmentations for each object in the batch
        """
        # construct input prompts
        image=self.get_image_url(prompt["image"]),
        task=DetectionTask(
            image_url=image[0],
            prompts=[TextPrompt(text=prompt['prompt'])],
            targets=[DetectionTarget.Mask, DetectionTarget.BBox] if return_mask else [DetectionTarget.BBox],
            model=DetectionModel.GDino1_5_Pro,
        )
        self.client.run_task(task)
        result = task.result
        return self.postprocess(result, task, return_mask)


    def postprocess(self, result, task, return_mask):
        """Postprocess the result from the API call

        Args:
            result (TaskResult): Task result with the following keys:
                - objects (List[DetectionObject]): Each DetectionObject has the following keys:
                    - bbox (List[float]): Box in xyxy format
                    - category (str): Detection category
                    - score (float): Detection score
                    - mask (DetectionObjectMask): Use mask.counts to parse RLE mask 
            task (DetectionTask): The task object
            return_mask (bool): Whether to return mask

        Returns:
            (Dict): Return dict in format:
                {
                    "scores": (List[float]): A list of scores for each object
                    "categorys": (List[str]): A list of categorys for each object
                    "boxes": (List[List[int]]): A list of boxes for each object
                    "masks": (List[PIL.Image]): A list of masks in the format of PIL.Image
                }
        """
        def process_object_with_mask(object):
            box = object.bbox
            score = object.score
            category = object.category
            mask = task.rle2rgba(object.mask)
            return box, score, category, mask
        
        def process_object_without_mask(object):
            box = object.bbox
            score = object.score
            category = object.category
            mask = None
            return box, score, category, mask
        
        boxes, scores, categorys, masks = [], [], [], []
        with concurrent.futures.ThreadPoolExecutor() as executor:
            if return_mask:
                process_object = process_object_with_mask
            else:
                process_object = process_object_without_mask
            futures = [executor.submit(process_object, obj) for obj in result.objects]
            for future in concurrent.futures.as_completed(futures):
                box, score, category, mask = future.result()
                boxes.append(box)
                scores.append(score)
                categorys.append(category)
                if mask is not None:
                    masks.append(mask)

        return dict(boxes=boxes, categorys=categorys, scores=scores, masks=masks)

    def get_image_url(self, image: Union[str, np.ndarray]):
        """Upload Image to server and return the url

        Args:
            image (Union[str, np.ndarray]): The image to upload. Can be a file path or np.ndarray.
                If it is a np.ndarray, it will be saved to a temporary file.

        Returns:
            str: The url of the image
        """
        if isinstance(image, str):
            url = self.client.upload_file(image)
        else:
            with tempfile.NamedTemporaryFile(delete=True, suffix=".png") as tmp_file:
                # image is in numpy format, convert to PIL Image
                image = Image.fromarray(image)
                image.save(tmp_file, format="PNG")
                tmp_file_path = tmp_file.name
                url = self.client.upload_file(tmp_file_path)
        return url

In [32]:
from typing import Dict

import numpy as np
from PIL import Image, ImageDraw, ImageFont, ImageOps
import random


def draw_mask(mask, draw, random_color=True):
    """Draws a mask with a specified color on an image.

    Args:
        mask (np.array): Binary mask as a NumPy array.
        draw (ImageDraw.Draw): ImageDraw object to draw on the image.
        random_color (bool): Whether to use a random color for the mask.
    """
    if random_color:
        color = (
            random.randint(0, 255),
            random.randint(0, 255),
            random.randint(0, 255),
            153,
        )
    else:
        color = (30, 144, 255, 153)

    nonzero_coords = np.transpose(np.nonzero(mask))
    
    for coord in nonzero_coords:
        draw.point(coord[::-1], fill=color)

def visualize(image_pil: Image,
              result: Dict,
              draw_width: float = 2.0,
              return_mask=True,
              draw_score=True) -> Image:
    """Plot bounding boxes and labels on an image.

    Args:
        image_pil (PIL.Image): The input image as a PIL Image object.
        result (Dict[str, Union[torch.Tensor, List[torch.Tensor]]]): The target dictionary containing
            the bounding boxes and labels. The keys are:
                - boxes (List[int]): A list of bounding boxes in shape (N, 4), [x1, y1, x2, y2] format.
                - scores (List[float]): A list of scores for each bounding box. shape (N)
                - categorys (List[str]): A list of categorys for each object
                - masks (List[PIL.Image]): A list of masks in the format of PIL.Image
        draw_score (bool): Draw score on the image. Defaults to False.

    Returns:
        PIL.Image: The input image with plotted bounding boxes, labels, and masks.
    """
    # Get the bounding boxes and labels from the target dictionary
    boxes = result["boxes"]
    scores = result["scores"]
    categorys = result["categorys"]
    masks = result.get("masks", [])

    # Find all unique categories and build a cate2color dictionary
    cate2color = {}
    unique_categorys = set(categorys)
    for cate in unique_categorys:
        cate2color[cate] = tuple(np.random.randint(0, 255, size=3).tolist())

    # Create a PIL ImageDraw object to draw on the input image
    if isinstance(image_pil, np.ndarray):
        image_pil = Image.fromarray(image_pil)
    draw = ImageDraw.Draw(image_pil)
    
    # Create a new binary mask image with the same size as the input image
    mask = Image.new("L", image_pil.size, 0)
    # Create a PIL ImageDraw object to draw on the mask image
    mask_draw = ImageDraw.Draw(mask)

    # Draw boxes, labels, and masks for each box and label in the target dictionary
    for box, score, category in zip(boxes, scores, categorys):
        # Extract the box coordinates
        x0, y0, x1, y1 = box
        x0, y0, x1, y1 = int(x0), int(y0), int(x1), int(y1)
        color = cate2color[category]

        # Draw the box outline on the input image
        draw.rectangle([x0, y0, x1, y1], outline=color, width=int(draw_width))

        # Draw the label and score on the input image
        if draw_score:
            text = f"{category} {score:.2f}"
        else:
            text = f"{category}"
        
        font = ImageFont.load_default()
        if hasattr(font, "getbbox"):
            bbox = draw.textbbox((x0, y0), text, font)
        else:
            w, h = draw.textsize(text, font)
            bbox = (x0, y0, w + x0, y0 + h)
        draw.rectangle(bbox, fill=color)
        draw.text((x0, y0), text, fill="white")

    # Draw the mask on the input image if masks are provided
    if len(masks) > 0 and return_mask:
        size = image_pil.size
        mask_image = Image.new("RGBA", size, color=(0, 0, 0, 0))
        mask_draw = ImageDraw.Draw(mask_image)
        for mask in masks:
            mask = np.array(mask)[:, :, -1]
            draw_mask(mask, mask_draw)

        image_pil = Image.alpha_composite(image_pil.convert("RGBA"), mask_image).convert("RGB")
    return image_pil

In [33]:
import argparse
import os
# from gdino import GroundingDINOAPIWrapper, visualize
from PIL import Image
import numpy as np

def get_args():
    parser = argparse.ArgumentParser(description="Interactive Inference")
    parser.add_argument(
        "--token",
        type=str,
        help="The token for T-Rex2 API. We are now opening free API access to T-Rex2",
    )
    parser.add_argument(
        "--box_threshold", type=float, default=0.3, help="The threshold for box score"
    )
    return parser.parse_args()



In [38]:
image_path = '/Users/niccolofusai/Documents/pi/data/input/image.png'
gdino = GroundingDINOAPIWrapper('acb5ee944d3ff954cb5d2c38d1f5cab8')
prompts = dict(image=image_path, prompt='cup.bowl.plate.chopstick.plastic bottle.spoon.fork.packet.container.robot gripper.trash bin.foil packet.plastic container. transparent plastic packet.cardboard food container')
# prompts = dict(image=image_path, prompt='cup.bowl.plate.chopstick.plastic bottle.spoon.fork.packet.container.robot gripper.trash bin.foil packet.plastic container.cardboard food box')
results = gdino.inference(prompts)
print(results)
# now visualize the results
image_pil = Image.open(prompts['image'])
image_pil = visualize(image_pil, results)
# Convert RGBA to RGB before saving
if image_pil.mode == 'RGBA':
    image_pil = image_pil.convert('RGB')
# dump the image to the disk
image_pil.save('/Users/niccolofusai/Documents/pi/data/output/demo_output.jpg')

{'boxes': [[254.161376953125, 151.2242431640625, 317.3817443847656, 267.26177978515625], [327.18585205078125, 273.8308410644531, 360.1203308105469, 303.373291015625], [257.3326721191406, 285.97821044921875, 305.6603088378906, 416.70050048828125], [330.5942077636719, 208.18125915527344, 371.4006652832031, 280.07720947265625], [290.14483642578125, 149.88197326660156, 361.82049560546875, 260.062255859375], [223.63839721679688, 152.44775390625, 287.3518371582031, 266.7746276855469], [360.3450927734375, 272.4895324707031, 462.4224853515625, 322.0088806152344], [514.9003295898438, 267.6614990234375, 593.6388549804688, 308.60931396484375], [465.001953125, 322.6297607421875, 541.7236938476562, 340.9342041015625], [295.7071838378906, 304.67388916015625, 372.47747802734375, 367.7409362792969], [193.13531494140625, 251.0772247314453, 342.9294128417969, 345.8377685546875], [370.5850830078125, 232.21463012695312, 440.8970642089844, 273.5421447753906], [370.3722839355469, 287.18707275390625, 466.724

In [36]:
results.keys()

dict_keys(['boxes', 'categorys', 'scores', 'masks'])

In [37]:
results['categorys']

['cup',
 'trash bin',
 'foil packet',
 'cardboard food container',
 'trash bin',
 'chopstick',
 'packet',
 'cardboard food container',
 'trash bin',
 'plastic bottle',
 'trash bin',
 ' transparent plastic packet',
 'plate',
 'bowl',
 'spoon']