Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions inference/core/workflows/core_steps/loader.py
Original file line number Diff line number Diff line change
Expand Up @@ -298,6 +298,9 @@
from inference.core.workflows.core_steps.transformations.detections_filter.v1 import (
DetectionsFilterBlockV1,
)
from inference.core.workflows.core_steps.transformations.detections_merge.v1 import (
DetectionsMergeBlockV1,
)
from inference.core.workflows.core_steps.transformations.detections_transformation.v1 import (
DetectionsTransformationBlockV1,
)
Expand Down Expand Up @@ -524,6 +527,7 @@ def load_blocks() -> List[Type[WorkflowBlock]]:
BlurVisualizationBlockV1,
BoundingBoxVisualizationBlockV1,
BoundingRectBlockV1,
DetectionsMergeBlockV1,
ByteTrackerBlockV2,
CacheGetBlockV1,
CacheSetBlockV1,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,145 @@
from typing import Any, Dict, List, Literal, Optional, Type
from uuid import uuid4

import numpy as np
import supervision as sv
from pydantic import ConfigDict, Field

from inference.core.workflows.execution_engine.entities.base import OutputDefinition
from inference.core.workflows.execution_engine.entities.types import (
INSTANCE_SEGMENTATION_PREDICTION_KIND,
KEYPOINT_DETECTION_PREDICTION_KIND,
OBJECT_DETECTION_PREDICTION_KIND,
Selector,
)
from inference.core.workflows.prototypes.block import (
BlockResult,
WorkflowBlock,
WorkflowBlockManifest,
)

OUTPUT_KEY: str = "predictions"

SHORT_DESCRIPTION = "Merge multiple detections into a single bounding box."
LONG_DESCRIPTION = """
The `DetectionsMerge` block combines multiple detections into a single bounding box that encompasses all input detections.
This is useful when you want to:
- Merge overlapping or nearby detections of the same object
- Create a single region that contains multiple detected objects
- Simplify multiple detections into one larger detection

The resulting detection will have:
- A bounding box that contains all input detections
- The classname of the merged detection is set to "merged_detection" by default, but can be customized via the `class_name` parameter
- The confidence is set to the lowest confidence among all detections
"""


class DetectionsMergeManifest(WorkflowBlockManifest):
model_config = ConfigDict(
json_schema_extra={
"name": "Detections Merge",
"version": "v1",
"short_description": SHORT_DESCRIPTION,
"long_description": LONG_DESCRIPTION,
"license": "Apache-2.0",
"block_type": "transformation",
"ui_manifest": {
"section": "transformation",
"icon": "fal fa-object-union",
"blockPriority": 5,
},
}
)
type: Literal["roboflow_core/detections_merge@v1"]
predictions: Selector(
kind=[
OBJECT_DETECTION_PREDICTION_KIND,
Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

not sure if this should be only
OBJECT_DETECTION_PREDICTION_KIND and INSTANCE_SEGMENTATION_PREDICTION_KIND (tested with those two), but I think KEYPOINT_DETECTION_PREDICTION_KIND should also be OK / we have bounding box info via detections?

INSTANCE_SEGMENTATION_PREDICTION_KIND,
KEYPOINT_DETECTION_PREDICTION_KIND,
]
) = Field(
description="Object detection predictions to merge into a single bounding box.",
examples=["$steps.object_detection_model.predictions"],
)
class_name: str = Field(
default="merged_detection",
description="The class name to assign to the merged detection.",
)

@classmethod
def describe_outputs(cls) -> List[OutputDefinition]:
return [
OutputDefinition(name=OUTPUT_KEY, kind=[OBJECT_DETECTION_PREDICTION_KIND]),
]

@classmethod
def get_execution_engine_compatibility(cls) -> Optional[str]:
return ">=1.3.0,<2.0.0"


def calculate_union_bbox(detections: sv.Detections) -> np.ndarray:
"""Calculate a single bounding box that contains all input detections."""
if len(detections) == 0:
return np.array([], dtype=np.float32).reshape(0, 4)

# Get all bounding boxes
xyxy = detections.xyxy

# Calculate the union by taking min/max coordinates
x1 = np.min(xyxy[:, 0])
y1 = np.min(xyxy[:, 1])
x2 = np.max(xyxy[:, 2])
y2 = np.max(xyxy[:, 3])

return np.array([[x1, y1, x2, y2]])


def get_lowest_confidence_index(detections: sv.Detections) -> int:
"""Get the index of the detection with the lowest confidence."""
if detections.confidence is None:
return 0
return int(np.argmin(detections.confidence))


class DetectionsMergeBlockV1(WorkflowBlock):
@classmethod
def get_manifest(cls) -> Type[WorkflowBlockManifest]:
return DetectionsMergeManifest

def run(
self,
predictions: sv.Detections,
class_name: str = "merged_detection",
) -> BlockResult:
if predictions is None or len(predictions) == 0:
return {
OUTPUT_KEY: sv.Detections(
xyxy=np.array([], dtype=np.float32).reshape(0, 4)
)
}

# Calculate the union bounding box
union_bbox = calculate_union_bbox(predictions)

# Get the index of the detection with lowest confidence
lowest_conf_idx = get_lowest_confidence_index(predictions)

# Create a new detection with the union bbox and ensure numpy arrays for all fields
merged_detection = sv.Detections(
xyxy=union_bbox,
confidence=(
np.array([predictions.confidence[lowest_conf_idx]], dtype=np.float32)
if predictions.confidence is not None
else None
),
class_id=np.array(
[0], dtype=np.int32
), # Fixed class_id of 0 for merged detection
data={
"class_name": np.array([class_name]),
"detection_id": np.array([str(uuid4())]),
},
)

return {OUTPUT_KEY: merged_detection}
Original file line number Diff line number Diff line change
@@ -0,0 +1,109 @@
import numpy as np
import pytest
import supervision as sv

from inference.core.env import WORKFLOWS_MAX_CONCURRENT_STEPS
from inference.core.managers.base import ModelManager
from inference.core.workflows.core_steps.common.entities import StepExecutionMode
from inference.core.workflows.execution_engine.core import ExecutionEngine
from tests.workflows.integration_tests.execution.workflows_gallery_collector.decorators import (
add_to_workflows_gallery,
)

DETECTIONS_MERGE_WORKFLOW = {
"version": "1.0",
"inputs": [
{"type": "WorkflowImage", "name": "image"},
],
"steps": [
{
"type": "ObjectDetectionModel",
"name": "detection",
"image": "$inputs.image",
"model_id": "yolov8n-640",
},
{
"type": "roboflow_core/detections_merge@v1",
"name": "detections_merge",
"predictions": "$steps.detection.predictions",
},
],
"outputs": [
{
"type": "JsonField",
"name": "result",
"selector": "$steps.detections_merge.predictions",
}
],
}


@add_to_workflows_gallery(
category="Basic Workflows",
use_case_title="Workflow with detections merge",
use_case_description="""
This workflow demonstrates how to merge multiple object detections into a single bounding box.
This is useful when you want to:
- Combine overlapping detections of the same object
- Create a single region that contains multiple detected objects
- Simplify multiple detections into one larger detection
""",
workflow_definition=DETECTIONS_MERGE_WORKFLOW,
workflow_name_in_app="merge-detections",
)
def test_detections_merge_workflow(
model_manager: ModelManager,
dogs_image: np.ndarray,
) -> None:
# given
workflow_init_parameters = {
"workflows_core.model_manager": model_manager,
"workflows_core.api_key": None,
"workflows_core.step_execution_mode": StepExecutionMode.LOCAL,
}
execution_engine = ExecutionEngine.init(
workflow_definition=DETECTIONS_MERGE_WORKFLOW,
init_parameters=workflow_init_parameters,
max_concurrent_steps=WORKFLOWS_MAX_CONCURRENT_STEPS,
)

# when
result = execution_engine.run(
runtime_parameters={
"image": [dogs_image],
}
)

# then
assert len(result) == 1, "One set of outputs expected"
assert "result" in result[0], "Output must contain key 'result'"
assert isinstance(
result[0]["result"], sv.Detections
), "Output must be instance of sv.Detections"

# Check that we have exactly one merged detection
assert len(result[0]["result"]) == 1, "Should have exactly one merged detection"

# Check that the merged detection has all required fields
assert "class_name" in result[0]["result"].data, "Should have class_name in data"
assert "detection_id" in result[0]["result"].data, "Should have detection_id in data"

# Check that the bounding box has reasonable dimensions
merged_bbox = result[0]["result"].xyxy[0]
image_height, image_width = dogs_image.shape[:2]

# Check that coordinates are within image bounds
assert 0 <= merged_bbox[0] <= image_width, "x1 should be within image bounds"
assert 0 <= merged_bbox[1] <= image_height, "y1 should be within image bounds"
assert 0 <= merged_bbox[2] <= image_width, "x2 should be within image bounds"
assert 0 <= merged_bbox[3] <= image_height, "y2 should be within image bounds"

# Check that the box has reasonable dimensions
assert merged_bbox[2] > merged_bbox[0], "x2 should be greater than x1"
assert merged_bbox[3] > merged_bbox[1], "y2 should be greater than y1"

# Check that the box is large enough to likely contain the dogs
box_width = merged_bbox[2] - merged_bbox[0]
box_height = merged_bbox[3] - merged_bbox[1]
assert box_width > 100, "Merged box should be reasonably wide"
assert box_height > 100, "Merged box should be reasonably tall"
Original file line number Diff line number Diff line change
@@ -0,0 +1,117 @@
import numpy as np
import pytest
import supervision as sv

from inference.core.workflows.core_steps.transformations.detections_merge.v1 import (
DetectionsMergeBlockV1,
DetectionsMergeManifest,
calculate_union_bbox,
)


def test_calculate_union_bbox():
# given
detections = sv.Detections(
xyxy=np.array([[10, 10, 20, 20], [15, 15, 25, 25]]),
)

# when
union_bbox = calculate_union_bbox(detections)

# then
expected_bbox = np.array([[10, 10, 25, 25]])
assert np.allclose(
union_bbox, expected_bbox
), f"Expected bounding box to be {expected_bbox}, but got {union_bbox}"


@pytest.mark.parametrize("type_alias", ["roboflow_core/detections_merge@v1"])
def test_detections_merge_validation_when_valid_manifest_is_given(
type_alias: str,
) -> None:
# given
data = {
"type": type_alias,
"name": "detections_merge",
"predictions": "$steps.od_model.predictions",
"class_name": "custom_merged",
}

# when
result = DetectionsMergeManifest.model_validate(data)

# then
assert result == DetectionsMergeManifest(
type=type_alias,
name="detections_merge",
predictions="$steps.od_model.predictions",
class_name="custom_merged",
)


def test_detections_merge_block() -> None:
# given
block = DetectionsMergeBlockV1()
detections = sv.Detections(
xyxy=np.array([[10, 10, 20, 20], [15, 15, 25, 25]]),
confidence=np.array([0.9, 0.8]),
class_id=np.array([1, 1]),
data={
"class_name": np.array(["person", "person"]),
},
)

# when
output = block.run(predictions=detections)

# then
assert isinstance(output, dict)
assert "predictions" in output
assert len(output["predictions"]) == 1
assert np.allclose(output["predictions"].xyxy, np.array([[10, 10, 25, 25]]))
assert np.allclose(output["predictions"].confidence, np.array([0.8]))
assert np.allclose(output["predictions"].class_id, np.array([0]))
assert output["predictions"].data["class_name"][0] == "merged_detection"
assert isinstance(output["predictions"].data["detection_id"][0], str)


def test_detections_merge_block_with_custom_class() -> None:
# given
block = DetectionsMergeBlockV1()
detections = sv.Detections(
xyxy=np.array([[10, 10, 20, 20], [15, 15, 25, 25]]),
confidence=np.array([0.9, 0.8]),
class_id=np.array([1, 1]),
data={
"class_name": np.array(["person", "person"]),
},
)

# when
output = block.run(predictions=detections, class_name="custom_merged")

# then
assert isinstance(output, dict)
assert "predictions" in output
assert len(output["predictions"]) == 1
assert np.allclose(output["predictions"].xyxy, np.array([[10, 10, 25, 25]]))
assert np.allclose(output["predictions"].confidence, np.array([0.8]))
assert np.allclose(output["predictions"].class_id, np.array([0]))
assert output["predictions"].data["class_name"][0] == "custom_merged"
assert isinstance(output["predictions"].data["detection_id"][0], str)


def test_detections_merge_block_empty_input() -> None:
# given
block = DetectionsMergeBlockV1()
empty_detections = sv.Detections(xyxy=np.array([], dtype=np.float32).reshape(0, 4))

# when
output = block.run(predictions=empty_detections)

# then
assert isinstance(output, dict)
assert "predictions" in output
assert len(output["predictions"]) == 0
assert isinstance(output["predictions"].xyxy, np.ndarray)
assert output["predictions"].xyxy.shape == (0, 4)