In [None]:
!nvidia-smi

In [None]:
import torch

DEVICE = torch.device('cuda:0' if torch.cuda.is_available() else 'cpu')
# DEVICE = 'cpu'
MODEL_TYPE = "vit_h"

In [None]:
print(DEVICE,torch.cuda.is_available())

In [None]:
import os
HOME = os.getcwd()
print("HOME:", HOME)

### Run this cell if running for first time

In [None]:
# Run only if you dont have the below libraries 
pip install git+https://github.com/facebookresearch/segment-anything.git
pip install -q jupyter_bbox_widget roboflow dataclasses-json supervision

In [None]:
# UNCOMMENT FOR FIRST TIME

# %cd {HOME}
# !mkdir {HOME+"\\weights"}
%cd {HOME+"\\weights"}

!wget -q "https://dl.fbaipublicfiles.com/segment_anything/sam_vit_h_4b8939.pth"

In [None]:
import os

CHECKPOINT_PATH = os.path.join(HOME, "weights", "sam_vit_h_4b8939.pth")
print(CHECKPOINT_PATH, "; exist:", os.path.isfile(CHECKPOINT_PATH))

In [None]:
from segment_anything import sam_model_registry, SamPredictor

sam = sam_model_registry[MODEL_TYPE](checkpoint=CHECKPOINT_PATH).to(device=DEVICE)
# sam.to(DEVICE, dtype=torch.half, non_blocking=True)

In [None]:
# DO NOT RUN UNLESS YOU WANNA TEST MEMORY ISSUE

import os
os.environ['PYTORCH_CUDA_ALLOC_CONF']='max_split_size_mb:1024' #'garbage_collection_threshold:0.8,max_split_size_mb:512'

In [None]:
%cd {HOME}

import roboflow
from roboflow import Roboflow

roboflow.login()

rf = Roboflow()

project = rf.workspace("arosphenotyping-nxtf4").project("blueberry_object_detection")
# dataset = project.version(20).download("coco")

In [None]:
import time

time1 = time.time()

dataset = project.version(20) #Can be modified later
dataset_loc = os.path.join(HOME,dataset.name+"-"+dataset.version)
time2 = time.time()

print(time2-time1)



In [None]:
import pandas as pd
import math

def read_excluded():
    df = pd.read_csv("completed_list.csv", header=None, delimiter=',')
    # Create a dictionary of dictionaries
    result_dict = {'test': {}, 'train': {}, 'valid': {}}

    file_names = []
    flag = 0
    # Iterate through the DataFrame and populate the inner dictionaries
    for index, row in df.iterrows():
        if not isinstance(row[0], str):
            file_names = row[1:]
        else:
            type =  row[0].lower()
            for img_name, time in zip(file_names, row[1:]):
                print(img_name,time)
                if not math.isnan(float(time)):
                    result_dict[type][img_name] = float(time)
    del file_names
    return result_dict

times = read_excluded()
times = {'test': {}, 'train': {}, 'valid': {}}

In [None]:
import random
import cv2
import numpy as np
import sys

import supervision as sv


DATA_DIRECTORIES = [i for i in (os.listdir(dataset_loc)) if ".txt" not in i]
ANNOTATIONS_FILE_NAME = "_annotations.coco.json"

sam_predictor = SamPredictor(sam)

mask_annotator = sv.MaskAnnotator()
box_annotator = sv.BoxAnnotator()

random.seed(2023)

def segment(sam_predictor: SamPredictor, image: np.ndarray, xyxy: np.ndarray) -> np.ndarray:
    sam_predictor.set_image(image)
    result_masks = []
    for box in xyxy:
        masks, scores, logits = sam_predictor.predict(
            box=box,
            multimask_output=False
        )
        index = np.argmax(scores)
        result_masks.append(masks[index])
    return np.array(result_masks)




In [None]:
def image_count(subdirectory_path):
    # Specify the path to the subdirectory you want to count images in
    subdirectory_path = os.path.join(dataset_loc, 'TEST')

    # Initialize a count variable to keep track of the number of images
    image_count = 0

    # Iterate through the files in the subdirectory using os.scandir
    for entry in os.scandir(subdirectory_path):
        if entry.is_file() and entry.name.lower().endswith('.jpg'):
            image_count += 1
    return image_count



### Main Code

In [None]:
import time
import os
import sys
import pandas as pd

# Define your constants and variables here, including dataset_loc, ANNOTATIONS_FILE_NAME, sv.DetectionDataset, sam_predictor, segment, etc.
# Make sure times is properly initialized with completed image data

DATA_DIRECTORIES = ['test', 'train', 'valid']
image_names = []
def mask_annotate():
    try:
        for DATA_SET_SUBDIRECTORY in DATA_DIRECTORIES:
            IMAGES_DIRECTORY_PATH = os.path.join(dataset_loc, DATA_SET_SUBDIRECTORY)
            if len(times[DATA_SET_SUBDIRECTORY]) == image_count(IMAGES_DIRECTORY_PATH):
                continue

            # time_dir_start = time.time()
            print(DATA_SET_SUBDIRECTORY + " data Loading.......")
            
            ANNOTATIONS_FILE_PATH = os.path.join(IMAGES_DIRECTORY_PATH, ANNOTATIONS_FILE_NAME)

            object_detection_dataset = sv.DetectionDataset.from_coco(
                images_directory_path=IMAGES_DIRECTORY_PATH,
                annotations_path=ANNOTATIONS_FILE_PATH
            )
            # del object_detection_dataset
            # detections = object_detection_dataset.annotations
            image_names = list(object_detection_dataset.images.keys()) 
            completed_images = set(times[DATA_SET_SUBDIRECTORY].keys())

            for image_name in image_names:
                if image_name.replace(IMAGES_DIRECTORY_PATH + "\\", "") not in completed_images:
                    time_img_start = time.time()
                    # Process the image and annotations here
                    # detection = detections[image_name]
                    object_detection_dataset.annotations[image_name].mask = segment(sam_predictor=sam_predictor, image=cv2.imread(image_name, cv2.COLOR_BGR2RGB),xyxy=object_detection_dataset.annotations[image_name].xyxy)
                    time_img_end = time.time()
                    # image_name = image_name.replace(IMAGES_DIRECTORY_PATH + "\\", "")
                    print(image_name + " processing time: " + str(time_img_end - time_img_start) + " secs")
                    print(sys.getsizeof(object_detection_dataset))
                    times[DATA_SET_SUBDIRECTORY][image_name.replace(IMAGES_DIRECTORY_PATH + "\\", "")] = time_img_end - time_img_start
            # time_dir_end = time.time()
            # print("Data dir " + DATA_SET_SUBDIRECTORY + " finished. Time taken = " + str(time_dir_end - time_dir_start) + " secs")
            object_detection_dataset.as_coco(annotations_path=ANNOTATIONS_FILE_PATH, images_directory_path=IMAGES_DIRECTORY_PATH)
            
            object_detection_dataset1 = sv.DetectionDataset.from_coco(
                images_directory_path=IMAGES_DIRECTORY_PATH,
                annotations_path=ANNOTATIONS_FILE_PATH
            )

            print("************************************************************")
            print(object_detection_dataset == object_detection_dataset1)
    except MemoryError:
        # Handle memory allocation issues here, if needed
        print("Memory allocation failed. Not enough memory available. Trying to free up memory")
        object_detection_dataset.as_coco(annotations_path=ANNOTATIONS_FILE_PATH, images_directory_path=None)
        # del object_detection_dataset
        del object_detection_dataset
        print("Saving completed files stats to CSV file")
        pd.DataFrame.from_dict(times, orient='index').to_csv('completed_list.csv')
        mask_annotate()

# Call the mask_annotate function to start the processing
mask_annotate()


### Testing

In [1]:
import random
import cv2
import numpy as np

import roboflow
from roboflow import Roboflow
import supervision as sv
import os
from segment_anything import sam_model_registry, SamPredictor
import torch

import sv_coco_read

DEVICE = torch.device('cuda:0' if torch.cuda.is_available() else 'cpu')
# DEVICE = 'cpu'
MODEL_TYPE = "vit_h"

HOME = os.getcwd()

# Download SAM Weights into weights directory
CHECKPOINT_PATH = os.path.join(HOME, "weights", "sam_vit_h_4b8939.pth")

sam = sam_model_registry[MODEL_TYPE](checkpoint=CHECKPOINT_PATH).to(device=DEVICE)

sam_predictor = SamPredictor(sam)

mask_annotator = sv.MaskAnnotator()
box_annotator = sv.BoxAnnotator()


# Segmentation algorithm function, as given in Roboflow Notebook
def segment(sam_predictor: SamPredictor, image: np.ndarray, xyxy: np.ndarray) -> np.ndarray:
    sam_predictor.set_image(image)
    result_masks = []
    for box in xyxy:
        masks, scores, logits = sam_predictor.predict(
            box=box,
            multimask_output=False
        )
        index = np.argmax(scores)
        result_masks.append(masks[index])
    return np.array(result_masks)

# Roboflow loading downloaded dataset from local 
roboflow.login()
rf = Roboflow()
project = rf.workspace("arosphenotyping-nxtf4").project("blueberry_object_detection") #Workspace_name , project_name
dataset = project.version(20) # Already downloaded the dataset


You are already logged into Roboflow. To make a different login, run roboflow.login(force=True).
loading Roboflow workspace...
loading Roboflow project...


In [2]:

# Paths definition
dataset_loc = os.path.join(HOME,dataset.name+"-"+dataset.version)
ANNOTATIONS_FILE_NAME = '_annotations.coco.json'
DATA_SET_SUBDIRECTORY = 'test'
IMAGES_DIRECTORY_PATH = os.path.join(dataset_loc, DATA_SET_SUBDIRECTORY) 
ANNOTATIONS_FILE_PATH = os.path.join(IMAGES_DIRECTORY_PATH, ANNOTATIONS_FILE_NAME)

# Loading the dataset using from_coco
object_detection_dataset = sv_coco_read.DetectionDataset.from_coco(
    images_directory_path=IMAGES_DIRECTORY_PATH,
    annotations_path=ANNOTATIONS_FILE_PATH
)
image_names = list(object_detection_dataset.images.keys()) 

In [5]:
# Getting a random image name
#
random.seed(random.choice([i for i in range(2024)]))
image_name = random.choice(image_names)
print(image_name.replace(IMAGES_DIRECTORY_PATH + "\\", ""))
# Applying segment function to update image annotation's mask attribute
object_detection_dataset.annotations[image_name].mask = segment(
sam_predictor=sam_predictor, image=cv2.imread(image_name, cv2.COLOR_BGR2RGB),xyxy=object_detection_dataset.annotations[image_name].xyxy)

# Saving annotation using as_coco (ISSUE HERE)
object_detection_dataset.as_coco(annotations_path=ANNOTATIONS_FILE_PATH, images_directory_path=IMAGES_DIRECTORY_PATH)

# Reading the dataset again to compare 
object_detection_dataset1 = sv_coco_read.DetectionDataset.from_coco(
    images_directory_path=IMAGES_DIRECTORY_PATH,
    annotations_path=ANNOTATIONS_FILE_PATH
)

# # Output is FALSE!
print(object_detection_dataset.annotations == object_detection_dataset1.annotations)


17-002_F2_23_12_jpg.rf.a336f28ee12e1ce650053f2c6da07bda.jpg
Entered save coco function!
False


In [6]:
object_detection_dataset1.as_coco(annotations_path=ANNOTATIONS_FILE_NAME, images_directory_path=IMAGES_DIRECTORY_PATH)


Entered save coco function!


In [5]:
# Reading the dataset again to compare 
object_detection_dataset1 = sv_coco_read.DetectionDataset.from_coco(
    images_directory_path=IMAGES_DIRECTORY_PATH,
    annotations_path=ANNOTATIONS_FILE_PATH
)
object_detection_dataset1.annotations[image_name].mask

In [7]:
print(object_detection_dataset1.annotations[image_name].mask)

None


In [8]:
print(object_detection_dataset.annotations[image_name].mask)

[[[False False False ... False False False]
  [False False False ... False False False]
  [False False False ... False False False]
  ...
  [False False False ... False False False]
  [False False False ... False False False]
  [False False False ... False False False]]

 [[False False False ... False False False]
  [False False False ... False False False]
  [False False False ... False False False]
  ...
  [False False False ... False False False]
  [False False False ... False False False]
  [False False False ... False False False]]

 [[False False False ... False False False]
  [False False False ... False False False]
  [False False False ... False False False]
  ...
  [False False False ... False False False]
  [False False False ... False False False]
  [False False False ... False False False]]

 ...

 [[False False False ... False False False]
  [False False False ... False False False]
  [False False False ... False False False]
  ...
  [False False False ... False False Fal

In [9]:
"H1-NC5313_F2_15_17A_jpg.rf.2f02ad05c2a744c67fc8920da818c6b6.jpg"

'H1-NC5313_F2_15_17A_jpg.rf.2f02ad05c2a744c67fc8920da818c6b6.jpg'

In [10]:
if ANNOTATIONS_FILE_NAME is not None:
    ans = save_coco_annotations(
        annotation_path=ANNOTATIONS_FILE_PATH,
        images=object_detection_dataset.images,
        annotations=object_detection_dataset.annotations,
        classes=object_detection_dataset.classes,
        min_image_area_percentage=0.0,
        max_image_area_percentage=1.0,
        approximation_percentage=0.0,
    )
    print(ans)


Entered save coco function!
{'info': {}, 'licenses': [{'id': 1, 'url': 'https://creativecommons.org/licenses/by/4.0/', 'name': 'CC BY 4.0'}], 'categories': [{'id': 0, 'name': 'blueberry', 'supercategory': 'common-objects'}, {'id': 1, 'name': 'blueberry', 'supercategory': 'common-objects'}], 'images': [{'id': 1, 'license': 1, 'file_name': 'H1-NC5286_-H2_2_3E_jpg.rf.0bb85d6b2bf556a137cec96c95664252.jpg', 'height': 3120, 'width': 4160, 'date_captured': '09/29/2023,15:51:54'}, {'id': 2, 'license': 1, 'file_name': '15-003_F2_18_21_jpg.rf.0378b9b5b771e54dec293d6af47330b5.jpg', 'height': 3120, 'width': 4160, 'date_captured': '09/29/2023,15:51:54'}, {'id': 3, 'license': 1, 'file_name': 'NCV-06-1_F2_24_16_jpg.rf.00b4c1fd026755c5847cf5de76274ac3.jpg', 'height': 4160, 'width': 3120, 'date_captured': '09/29/2023,15:51:54'}, {'id': 4, 'license': 1, 'file_name': 'H1-18-104_F2_36_2_jpg.rf.15c1c10b845208240bce02977c37de06.jpg', 'height': 4160, 'width': 3120, 'date_captured': '09/29/2023,15:51:54'}, {'

In [15]:
from __future__ import annotations

from typing import Dict, Iterator, List, Optional, Tuple
from abc import ABC, abstractmethod
from dataclasses import dataclass
from supervision.detection.core import Detections
from sv_coco_test import load_coco_annotations

@dataclass
class BaseDataset(ABC):
    @abstractmethod
    def __len__(self) -> int:
        pass

    @abstractmethod
    def split(
        self, split_ratio=0.8, random_state=None, shuffle: bool = True
    ) -> Tuple[BaseDataset, BaseDataset]:
        pass

@dataclass
class DetectionDataset(BaseDataset):
    """
    Dataclass containing information about object detection dataset.

    Attributes:
        classes (List[str]): List containing dataset class names.
        images (Dict[str, np.ndarray]): Dictionary mapping image name to image.
        annotations (Dict[str, Detections]): Dictionary mapping
            image name to annotations.
    """

    classes: List[str]
    images: Dict[str, np.ndarray]
    annotations: Dict[str, Detections]

    def __len__(self) -> int:
        """
        Return the number of images in the dataset.

        Returns:
            int: The number of images.
        """
        return len(self.images)

    def __iter__(self) -> Iterator[Tuple[str, np.ndarray, Detections]]:
        """
        Iterate over the images and annotations in the dataset.

        Yields:
            Iterator[Tuple[str, np.ndarray, Detections]]:
                An iterator that yields tuples containing the image name,
                the image data, and its corresponding annotation.
        """
        for image_name, image in self.images.items():
            yield image_name, image, self.annotations.get(image_name, None)

    def __eq__(self, other):
        if not isinstance(other, DetectionDataset):
            return False

        if set(self.classes) != set(other.classes):
            return False

        for key in self.images:
            if not np.array_equal(self.images[key], other.images[key]):
                return False
            if not self.annotations[key] == other.annotations[key]:
                return False

        return True
    @classmethod
    def from_coco(
        cls,
        images_directory_path: str,
        annotations_path: str,
        force_masks: bool = False,
    ) -> DetectionDataset:
        """
        Creates a Dataset instance from COCO formatted data.

        Args:
            images_directory_path (str): The path to the
                directory containing the images.
            annotations_path (str): The path to the json annotation files.
            force_masks (bool, optional): If True,
                forces masks to be loaded for all annotations,
                regardless of whether they are present.

        Returns:
            DetectionDataset: A DetectionDataset instance containing
                the loaded images and annotations.

        Example:
            ```python
            >>> import roboflow
            >>> from roboflow import Roboflow
            >>> import supervision as sv

            >>> roboflow.login()

            >>> rf = Roboflow()

            >>> project = rf.workspace(WORKSPACE_ID).project(PROJECT_ID)
            >>> dataset = project.version(PROJECT_VERSION).download("coco")

            >>> ds = sv.DetectionDataset.from_coco(
            ...     images_directory_path=f"{dataset.location}/train",
            ...     annotations_path=f"{dataset.location}/train/_annotations.coco.json",
            ... )

            >>> ds.classes
            ['dog', 'person']
            ```
        """
        classes, images, annotations = load_coco_annotations(
            images_directory_path=images_directory_path,
            annotations_path=annotations_path,
            force_masks=force_masks,
        )
        return DetectionDataset(classes=classes, images=images, annotations=annotations)


In [17]:
# Reading the dataset again to compare 
object_detection_dataset1 = DetectionDataset.from_coco(
    images_directory_path=IMAGES_DIRECTORY_PATH,
    annotations_path=ANNOTATIONS_FILE_NAME
)

TypeError: Can't instantiate abstract class DetectionDataset with abstract method split

In [8]:
object_detection_dataset1.annotations == object_detection_dataset.annotations

40

In [9]:
if ANNOTATIONS_FILE_NAME is not None:
    ans = save_coco_annotations(
        annotation_path=ANNOTATIONS_FILE_NAME,
        images=object_detection_dataset1.images,
        annotations=object_detection_dataset1.annotations,
        classes=object_detection_dataset1.classes,
        min_image_area_percentage=0.0,
        max_image_area_percentage=1.0,
        approximation_percentage=0.0,
    )
    print(ans)

Entered save coco function!
{'info': {}, 'licenses': [{'id': 1, 'url': 'https://creativecommons.org/licenses/by/4.0/', 'name': 'CC BY 4.0'}], 'categories': [{'id': 0, 'name': 'blueberry', 'supercategory': 'common-objects'}, {'id': 1, 'name': 'blueberry', 'supercategory': 'common-objects'}], 'images': [{'id': 1, 'license': 1, 'file_name': 'H1-NC5286_-H2_2_3E_jpg.rf.0bb85d6b2bf556a137cec96c95664252.jpg', 'height': 3120, 'width': 4160, 'date_captured': '09/29/2023,15:51:42'}, {'id': 2, 'license': 1, 'file_name': '15-003_F2_18_21_jpg.rf.0378b9b5b771e54dec293d6af47330b5.jpg', 'height': 3120, 'width': 4160, 'date_captured': '09/29/2023,15:51:42'}, {'id': 3, 'license': 1, 'file_name': 'NCV-06-1_F2_24_16_jpg.rf.00b4c1fd026755c5847cf5de76274ac3.jpg', 'height': 4160, 'width': 3120, 'date_captured': '09/29/2023,15:51:42'}, {'id': 4, 'license': 1, 'file_name': 'H1-18-104_F2_36_2_jpg.rf.15c1c10b845208240bce02977c37de06.jpg', 'height': 4160, 'width': 3120, 'date_captured': '09/29/2023,15:51:42'}, {'

### Modify Annotations once the images are cropped


In [None]:

for i in list(object_detection_dataset.annotations.values())[0]:
    print(i[0] - [100.0,50.0,100.0,50.0],type(i))
    break

### First Demo for masks

In [None]:
import random
import cv2
import numpy as np

import supervision as sv

DATA_DIRECTORIES = [i for i in (os.listdir(dataset_loc)) if ".txt" not in i]
ANNOTATIONS_FILE_NAME = "_annotations.coco.json"

sam_predictor = SamPredictor(sam)

mask_annotator = sv.MaskAnnotator()
box_annotator = sv.BoxAnnotator()

random.seed(2023)

def segment(sam_predictor: SamPredictor, image: np.ndarray, xyxy: np.ndarray) -> np.ndarray:
    sam_predictor.set_image(image)
    result_masks = []
    for box in xyxy:
        masks, scores, logits = sam_predictor.predict(
            box=box,
            multimask_output=False
        )
        index = np.argmax(scores)
        result_masks.append(masks[index])
    return np.array(result_masks)

DATA_SET_SUBDIRECTORY = 'test' 

IMAGES_DIRECTORY_PATH = os.path.join(dataset_loc, DATA_SET_SUBDIRECTORY)
ANNOTATIONS_FILE_PATH = os.path.join(IMAGES_DIRECTORY_PATH, ANNOTATIONS_FILE_NAME)

object_detection_dataset = sv.DetectionDataset.from_coco(
    images_directory_path=IMAGES_DIRECTORY_PATH,
    annotations_path=ANNOTATIONS_FILE_PATH
)

time_img_start = time.time()
image_names = list(object_detection_dataset.images.keys())
CLASSES = np.unique(object_detection_dataset.classes) 

image_name = random.choice(image_names)
image_name.replace(dataset_loc+"\\train\\","")
    
image = object_detection_dataset.images[image_name]
detections = object_detection_dataset.annotations[image_name]
# box_annotated_image = box_annotator.annotate(scene=image.copy(), detections=detections, skip_label=True)
# convert detections to masks
detections.mask = segment(
    sam_predictor=sam_predictor,
    image=cv2.cvtColor(image, cv2.COLOR_BGR2RGB),
    xyxy=detections.xyxy
)
time_img_end = time.time()
image_name = image_name.replace(IMAGES_DIRECTORY_PATH+"\\","")
print(image_name+ " processing time: " + str(time_img_end - time_img_start) + " secs")


# annotate image with detections
labels = [f"{CLASSES[class_id-1]} {class_id}" 
    for _, _, confidence, class_id, _ 
    in detections]
mask_annotated_image = mask_annotator.annotate(scene=image.copy(), detections=detections)

In [None]:
%matplotlib inline
sv.plot_image(mask_annotated_image,size = (6,6))

In [None]:
sv.plot_image(image,size = (6,6))

### Testing annotations

In [None]:
import pycocotools.coco

# Initialize the COCO API.
ANNOTATIONS_FILE_PATH1 = os.path.join(dataset_loc, DATA_SET_SUBDIRECTORY, "_annotations.coco.json")
coco = pycocotools.coco.COCO(ANNOTATIONS_FILE_PATH)

In [None]:
# Find the annotation that you want to modify.
ann_id = 1
ann = coco.loadAnns(ann_id)[0]


# Save the COCO annotations file.
coco.save_json("path/to/coco/annotations/instances_train2017.json")

In [None]:
# Find the annotation that has the same image name.
ann_id = coco.getAnnIds(imgIds=coco.getImgIds(image_name.replace(dataset_loc+"\\train\\","")))

# Print the annotation ID.
print(ann_id)

In [None]:
# Iterate over the images list and get the file name and id for each image
image_filenames = []
for i in coco.loadImgs(coco.getImgIds()):
    image_filenames.append((i['id'],i['file_name']))

# Print the image file names and IDs
print(image_filenames)

In [None]:
for i in coco.loadImgs(coco.getImgIds()):
    print(i)
    break

## Testing if Roboflow and SAM are working!

In [None]:
import cv2
import supervision as sv

image_bgr = cv2.imread(IMAGE_PATH)
image_rgb = cv2.cvtColor(image_bgr, cv2.COLOR_BGR2RGB)

sam_result = mask_generator.generate(image_rgb)

In [None]:
mask_annotator = sv.MaskAnnotator()

detections = sv.Detections.from_sam(sam_result=sam_result)

annotated_image = mask_annotator.annotate(scene=image_bgr.copy(), detections=detections)

sv.plot_images_grid(
    images=[image_bgr, annotated_image],
    grid_size=(1, 2),
    titles=['source image', 'segmented image']
)

In [None]:
masks = [
    mask['segmentation']
    for mask
    in sorted(sam_result, key=lambda x: x['area'], reverse=True)
]

sv.plot_images_grid(
    images=masks,
    grid_size=(8, int(len(masks) / 8)),
    size=(16, 16)
)

In [None]:
print(sam_result[0])

In [None]:
# helper function that loads an image before adding it to the widget

import base64

def encode_image(filepath):
    with open(filepath, 'rb') as f:
        image_bytes = f.read()
    encoded = str(base64.b64encode(image_bytes), 'utf-8')
    return "data:image/jpg;base64,"+encoded

from jupyter_bbox_widget import BBoxWidget

widget = BBoxWidget()
widget.image = encode_image(EXAMPLE_IMAGE_PATH)
widget

In [None]:
widget.bboxes

In [None]:
import numpy as np

# default_box is going to be used if you will not draw any box on image above
default_box = {'x': 68, 'y': 247, 'width': 555, 'height': 678, 'label': ''}

box = widget.bboxes[0] if widget.bboxes else default_box
box = np.array([
    box['x'], 
    box['y'], 
    box['x'] + box['width'], 
    box['y'] + box['height']
])

In [None]:
import cv2

image_bgr = cv2.imread(EXAMPLE_IMAGE_PATH)
image_rgb = cv2.cvtColor(image_bgr, cv2.COLOR_BGR2RGB)

mask_predictor.set_image(image_rgb)

masks, scores, logits = mask_predictor.predict(
    box=box,
    multimask_output=True
)

In [None]:
box_annotator = sv.BoxAnnotator(color=sv.Color.green())
mask_annotator = sv.MaskAnnotator(color=sv.Color.green())

detections = sv.Detections(xyxy=sv.mask_to_xyxy(masks=masks),mask=masks)
detections = detections[detections.area == np.max(detections.area)]

source_image = box_annotator.annotate(scene=image_bgr.copy(), detections=detections, skip_label=True)
segmented_image = mask_annotator.annotate(scene=image_bgr.copy(), detections=detections)

sv.plot_images_grid(
    images=[source_image, segmented_image],
    grid_size=(1, 2),
    titles=['source image', 'segmented image']
)

In [None]:
%cd {HOME}

import roboflow
from roboflow import Roboflow

roboflow.login()

rf = Roboflow()

In [None]:
project = rf.workspace("arosphenotyping-nxtf4").project("blueberry_object_detection")
dataset = project.version(20).download("coco")

In [None]:
DATA_SET_SUBDIRECTORY = "test"
ANNOTATIONS_FILE_NAME = "_annotations.coco.json"
IMAGES_DIRECTORY_PATH = os.path.join(dataset.location, DATA_SET_SUBDIRECTORY)
ANNOTATIONS_FILE_PATH = os.path.join(dataset.location, DATA_SET_SUBDIRECTORY, ANNOTATIONS_FILE_NAME)

In [None]:
import numpy as np
from dataclasses import dataclass
from typing import List, Tuple, Union, Optional
from dataclasses_json import dataclass_json
from supervision import Detections


@dataclass_json
@dataclass
class COCOCategory:
    id: int
    name: str
    supercategory: str


@dataclass_json
@dataclass
class COCOImage:
    id: int
    width: int
    height: int
    file_name: str
    license: int
    date_captured: str
    coco_url: Optional[str] = None
    flickr_url: Optional[str] = None


@dataclass_json
@dataclass
class COCOAnnotation:
    id: int
    image_id: int
    category_id: int
    segmentation: List[List[float]]
    area: float
    bbox: Tuple[float, float, float, float]
    iscrowd: int


@dataclass_json
@dataclass
class COCOLicense:
    id: int
    name: str
    url: str


@dataclass_json
@dataclass
class COCOJson:
    images: List[COCOImage]
    annotations: List[COCOAnnotation]
    categories: List[COCOCategory]
    licenses: List[COCOLicense]


def load_coco_json(json_file: str) -> COCOJson:
    import json

    with open(json_file, "r") as f:
        json_data = json.load(f)

    return COCOJson.from_dict(json_data)


class COCOJsonUtility:
    @staticmethod
    def get_annotations_by_image_id(coco_data: COCOJson, image_id: int) -> List[COCOAnnotation]:
        return [annotation for annotation in coco_data.annotations if annotation.image_id == image_id]

    @staticmethod
    def get_annotations_by_image_path(coco_data: COCOJson, image_path: str) -> Optional[List[COCOAnnotation]]:
        image = COCOJsonUtility.get_image_by_path(coco_data, image_path)
        if image:
            return COCOJsonUtility.get_annotations_by_image_id(coco_data, image.id)
        else:
            return None

    @staticmethod
    def get_image_by_path(coco_data: COCOJson, image_path: str) -> Optional[COCOImage]:
        for image in coco_data.images:
            if image.file_name == image_path:
                return image
        return None

    @staticmethod
    def annotations2detections(annotations: List[COCOAnnotation]) -> Detections:
        class_id, xyxy = [], []

        for annotation in annotations:
            x_min, y_min, width, height = annotation.bbox
            class_id.append(annotation.category_id)
            xyxy.append([
                x_min,
                y_min,
                x_min + width,
                y_min + height
            ])

        return Detections(
            xyxy=np.array(xyxy, dtype=int),
            class_id=np.array(class_id, dtype=int)
        )


In [None]:
coco_data = load_coco_json(json_file=ANNOTATIONS_FILE_PATH)

CLASSES = [
    category.name
    for category
    in coco_data.categories
    if category.supercategory != 'none'
]

IMAGES = [
    image.file_name
    for image
    in coco_data.images
]

In [None]:
CLASSES

In [None]:
import random
random.seed(2023)

In [None]:
EXAMPLE_IMAGE_NAME = random.choice(IMAGES)
EXAMPLE_IMAGE_PATH = os.path.join(dataset.location, DATA_SET_SUBDIRECTORY, EXAMPLE_IMAGE_NAME)

# load dataset annotations
annotations = COCOJsonUtility.get_annotations_by_image_path(coco_data=coco_data, image_path=EXAMPLE_IMAGE_NAME)
ground_truth = COCOJsonUtility.annotations2detections(annotations=annotations)

# small hack - coco numerate classes from 1, model from 0 + we drop first redundant class from coco json
ground_truth.class_id = ground_truth.class_id - 1

# load image
image_bgr = cv2.imread(EXAMPLE_IMAGE_PATH)
image_rgb = cv2.cvtColor(image_bgr, cv2.COLOR_BGR2RGB)

# initiate annotator
box_annotator = sv.BoxAnnotator(color=sv.Color.red())
mask_annotator = sv.MaskAnnotator(color=sv.Color.red())

# annotate ground truth
annotated_frame_ground_truth = box_annotator.annotate(scene=image_bgr.copy(), detections=ground_truth, skip_label=True)

# run SAM inference
mask_predictor.set_image(image_rgb)

masks, scores, logits = mask_predictor.predict(
    box=ground_truth.xyxy[0],
    multimask_output=True
)

detections = sv.Detections(
    xyxy=sv.mask_to_xyxy(masks=masks),
    mask=masks
)
detections = detections[detections.area == np.mean(detections.area)]
annotated_image = mask_annotator.annotate(scene=image_bgr.copy(), detections=detections)

sv.plot_images_grid(
    images=[annotated_frame_ground_truth, annotated_image],
    grid_size=(1, 2),
    titles=['source image', 'segmented image']
)

In [None]:
import matplotlib.pyplot as plt

plt.imshow(image_rgb)