In [1]:
import json
import numpy as np
from pycocotools import mask as mask_util
import os

def check_empty_masks(json_path):
    """
    Check for empty masks in a COCO format JSON file.
    
    Args:
        json_path: Path to the JSON annotation file
    """
    print(f"\nLoading {json_path}...")
    with open(json_path, 'r') as f:
        coco_data = json.load(f)
    
    images = {img['id']: img for img in coco_data['images']}
    
    empty_masks = []
    invalid_polygons = []
    total_annotations = len(coco_data['annotations'])
    
    print(f"Checking {total_annotations} annotations...")
    
    for idx, ann in enumerate(coco_data['annotations']):
        if idx % 1000 == 0:
            print(f"Progress: {idx}/{total_annotations}")
        
        if 'segmentation' not in ann or ann['segmentation'] is None or ann['segmentation'] == []:
            continue
        
        img_id = ann['image_id']
        img_info = images[img_id]
        h, w = img_info['height'], img_info['width']
        
        segm = ann['segmentation']
        
        try:
            # Convert to RLE format (this is what the code does internally)
            if isinstance(segm, list):
                # Polygon format
                if len(segm) == 0:
                    empty_masks.append({
                        'annotation_id': ann.get('id', idx),
                        'image_id': img_id,
                        'image_file': img_info['file_name'],
                        'reason': 'Empty polygon list',
                        'segmentation': segm
                    })
                    continue
                
                # Check if polygon is valid
                for poly in segm:
                    if len(poly) < 6:  # Need at least 3 points (x,y pairs)
                        invalid_polygons.append({
                            'annotation_id': ann.get('id', idx),
                            'image_id': img_id,
                            'image_file': img_info['file_name'],
                            'reason': f'Polygon has only {len(poly)//2} points',
                            'polygon': poly
                        })
                
                # Convert polygon to RLE
                rles = mask_util.frPyObjects(segm, h, w)
                rle = mask_util.merge(rles)
                
            elif isinstance(segm['counts'], list):
                # Uncompressed RLE
                rle = mask_util.frPyObjects(segm, h, w)
            else:
                # Already compressed RLE
                rle = segm
            
            # Check if mask area is zero
            area = mask_util.area(rle)
            
            if area == 0:
                empty_masks.append({
                    'annotation_id': ann.get('id', idx),
                    'image_id': img_id,
                    'image_file': img_info['file_name'],
                    'reason': 'Zero area after RLE conversion',
                    'segmentation': segm,
                    'bbox': ann.get('bbox', None)
                })
                
        except Exception as e:
            invalid_polygons.append({
                'annotation_id': ann.get('id', idx),
                'image_id': img_id,
                'image_file': img_info['file_name'],
                'reason': f'Error during conversion: {str(e)}',
                'segmentation': str(segm)[:100]  # Truncate for readability
            })
    
    # Print summary
    print("\n" + "="*80)
    print("SUMMARY")
    print("="*80)
    print(f"Total annotations: {total_annotations}")
    print(f"Empty masks found: {len(empty_masks)}")
    print(f"Invalid polygons found: {len(invalid_polygons)}")
    
    # Print detailed results
    if empty_masks:
        print("\n" + "-"*80)
        print("EMPTY MASKS DETAILS:")
        print("-"*80)
        for i, mask_info in enumerate(empty_masks[:10]):  # Show first 10
            print(f"\n{i+1}. Annotation ID: {mask_info['annotation_id']}")
            print(f"   Image ID: {mask_info['image_id']}")
            print(f"   Image file: {mask_info['image_file']}")
            print(f"   Reason: {mask_info['reason']}")
            if 'bbox' in mask_info and mask_info['bbox']:
                print(f"   BBox: {mask_info['bbox']}")
        
        if len(empty_masks) > 10:
            print(f"\n... and {len(empty_masks) - 10} more")
    
    if invalid_polygons:
        print("\n" + "-"*80)
        print("INVALID POLYGONS DETAILS:")
        print("-"*80)
        for i, poly_info in enumerate(invalid_polygons[:10]):  # Show first 10
            print(f"\n{i+1}. Annotation ID: {poly_info['annotation_id']}")
            print(f"   Image ID: {poly_info['image_id']}")
            print(f"   Image file: {poly_info['image_file']}")
            print(f"   Reason: {poly_info['reason']}")
        
        if len(invalid_polygons) > 10:
            print(f"\n... and {len(invalid_polygons) - 10} more")
    
    # Save detailed report to file
    report_path = json_path.replace('.json', '_mask_check_report.json')
    report = {
        'summary': {
            'total_annotations': total_annotations,
            'empty_masks_count': len(empty_masks),
            'invalid_polygons_count': len(invalid_polygons)
        },
        'empty_masks': empty_masks,
        'invalid_polygons': invalid_polygons
    }
    
    with open(report_path, 'w') as f:
        json.dump(report, f, indent=2)
    
    print(f"\nDetailed report saved to: {report_path}")
    
    return empty_masks, invalid_polygons


def check_multiple_datasets(base_path="./results/plants_coco_640"):
    """
    Check JSON files in both train and test directories.
    
    Args:
        base_path: Base path to the dataset
    """
    datasets = ['train', 'test']
    all_results = {}
    
    print("="*80)
    print("CHECKING MULTIPLE DATASETS")
    print("="*80)
    
    for dataset in datasets:
        json_path = os.path.join(base_path, dataset, f"{dataset}_annotations.json")
        
        if not os.path.exists(json_path):
            print(f"\nWarning: {json_path} does not exist. Skipping...")
            continue
        
        print(f"\n{'#'*80}")
        print(f"# Processing {dataset.upper()} dataset")
        print(f"{'#'*80}")
        
        empty_masks, invalid_polygons = check_empty_masks(json_path)
        all_results[dataset] = {
            'empty_masks': empty_masks,
            'invalid_polygons': invalid_polygons
        }
    
    # Print overall summary
    print("\n\n" + "="*80)
    print("OVERALL SUMMARY FOR ALL DATASETS")
    print("="*80)
    for dataset, results in all_results.items():
        print(f"\n{dataset.upper()}:")
        print(f"  Empty masks: {len(results['empty_masks'])}")
        print(f"  Invalid polygons: {len(results['invalid_polygons'])}")
    
    return all_results


# Usage
if __name__ == "__main__":
    # Check both train and test datasets
    results = check_multiple_datasets("./result/plants_coco_p640_o0_6tiff/images")
    
    # Alternatively, check individual files:
    # train_empty, train_invalid = check_empty_masks("./results/plants_coco_640/train/train_annotations.json")
    # test_empty, test_invalid = check_empty_masks("./results/plants_coco_640/test/test_annotations.json")

CHECKING MULTIPLE DATASETS

################################################################################
# Processing TRAIN dataset
################################################################################

Loading ./result/plants_coco_p640_o0_6tiff/images/train/train_annotations.json...
Checking 9296 annotations...
Progress: 0/9296
Progress: 1000/9296
Progress: 2000/9296
Progress: 3000/9296
Progress: 4000/9296
Progress: 5000/9296
Progress: 6000/9296
Progress: 7000/9296
Progress: 8000/9296
Progress: 9000/9296

SUMMARY
Total annotations: 9296
Empty masks found: 9
Invalid polygons found: 0

--------------------------------------------------------------------------------
EMPTY MASKS DETAILS:
--------------------------------------------------------------------------------

1. Annotation ID: 482
   Image ID: 6
   Image file: 20211222_094342_petiole_test_00002_patch_010.tif
   Reason: Zero area after RLE conversion
   BBox: [0, 0, 118, 131]

2. Annotation ID: 1391
   Image ID: 13


In [4]:
import json
from collections import defaultdict
from pycocotools import mask as mask_util

def ann_to_rle(segm, h, w):
    """Convert COCO segmentation (poly or RLE) to compressed RLE."""
    if segm is None or segm == []:
        return None

    # Polygon format: list of polygons
    if isinstance(segm, list):
        if len(segm) == 0:
            return None
        # Filter out invalid polygons (need >= 3 points => >= 6 numbers)
        valid_polys = [poly for poly in segm if isinstance(poly, list) and len(poly) >= 6]
        if len(valid_polys) == 0:
            return None
        rles = mask_util.frPyObjects(valid_polys, h, w)
        return mask_util.merge(rles)

    # RLE dict format
    if isinstance(segm, dict) and "counts" in segm:
        # Uncompressed RLE has counts as a list
        if isinstance(segm["counts"], list):
            return mask_util.frPyObjects(segm, h, w)
        # Already compressed RLE
        return segm

    return None

def clean_coco_json(
    input_json_path: str,
    output_json_path: str,
    drop_images_without_annotations: bool = False,
):
    print(f"\nLoading {input_json_path} ...")
    with open(input_json_path, "r") as f:
        coco = json.load(f)

    images_by_id = {img["id"]: img for img in coco.get("images", [])}
    anns = coco.get("annotations", [])

    kept_anns = []
    removed = {
        "missing_segmentation": 0,
        "invalid_polygon_or_rle": 0,
        "zero_area_mask": 0,
        "missing_image_id": 0,
    }

    print(f"Checking {len(anns)} annotations ...")

    for idx, ann in enumerate(anns):
        if idx % 2000 == 0 and idx > 0:
            print(f"Progress: {idx}/{len(anns)}")

        img_id = ann.get("image_id", None)
        if img_id not in images_by_id:
            removed["missing_image_id"] += 1
            continue

        img = images_by_id[img_id]
        h, w = img["height"], img["width"]

        segm = ann.get("segmentation", None)
        if segm is None or segm == []:
            removed["missing_segmentation"] += 1
            continue

        rle = ann_to_rle(segm, h, w)
        if rle is None:
            removed["invalid_polygon_or_rle"] += 1
            continue

        area = float(mask_util.area(rle))
        if area <= 0:
            removed["zero_area_mask"] += 1
            continue

        kept_anns.append(ann)

    # Optionally drop images with no annotations
    if drop_images_without_annotations:
        ann_count = defaultdict(int)
        for ann in kept_anns:
            ann_count[ann["image_id"]] += 1

        kept_images = [img for img in coco["images"] if ann_count.get(img["id"], 0) > 0]
        dropped_images = len(coco["images"]) - len(kept_images)
        coco["images"] = kept_images
        print(f"Dropped {dropped_images} images with no remaining annotations.")

    coco["annotations"] = kept_anns

    print("\nRemoved annotation counts:")
    for k, v in removed.items():
        print(f"  {k}: {v}")
    print(f"Kept annotations: {len(kept_anns)} / {len(anns)}")

    print(f"Saving cleaned COCO to {output_json_path} ...")
    with open(output_json_path, "w") as f:
        json.dump(coco, f)

    print("Done.")

if __name__ == "__main__":
    # Clean train annotations
    print("=" * 60)
    print("CLEANING TRAIN ANNOTATIONS")
    print("=" * 60)
    clean_coco_json(
        input_json_path="./result/plants_coco_p640_o0_6tiff/images/train/train_annotations.json",
        output_json_path="./result/plants_coco_p640_o0_6tiff/images/train/train_annotations_cleaned.json",
        drop_images_without_annotations=True,
    )
    
    # Clean test annotations
    print("\n" + "=" * 60)
    print("CLEANING TEST ANNOTATIONS")
    print("=" * 60)
    clean_coco_json(
        input_json_path="./result/plants_coco_p640_o0_6tiff/images/test/test_annotations.json",
        output_json_path="./result/plants_coco_p640_o0_6tiff/images/test/test_annotations_cleaned.json",
        drop_images_without_annotations=True,
    )
    
    print("\n" + "=" * 60)
    print("ALL CLEANING COMPLETE")
    print("=" * 60)

CLEANING TRAIN ANNOTATIONS

Loading ./result/plants_coco_p640_o0_6tiff/images/train/train_annotations.json ...
Checking 9296 annotations ...
Progress: 2000/9296
Progress: 4000/9296
Progress: 6000/9296
Progress: 8000/9296
Dropped 0 images with no remaining annotations.

Removed annotation counts:
  missing_segmentation: 0
  invalid_polygon_or_rle: 0
  zero_area_mask: 9
  missing_image_id: 0
Kept annotations: 9287 / 9296
Saving cleaned COCO to ./result/plants_coco_p640_o0_6tiff/images/train/train_annotations_cleaned.json ...
Done.

CLEANING TEST ANNOTATIONS

Loading ./result/plants_coco_p640_o0_6tiff/images/test/test_annotations.json ...
Checking 3230 annotations ...
Progress: 2000/3230
Dropped 0 images with no remaining annotations.

Removed annotation counts:
  missing_segmentation: 0
  invalid_polygon_or_rle: 0
  zero_area_mask: 2
  missing_image_id: 0
Kept annotations: 3228 / 3230
Saving cleaned COCO to ./result/plants_coco_p640_o0_6tiff/images/test/test_annotations_cleaned.json ...


In [5]:
import json
import numpy as np
from pycocotools import mask as mask_util

def check_empty_masks(json_path):
    """
    Check for empty masks in a COCO format JSON file.
    
    Args:
        json_path: Path to the JSON annotation file
    """
    print(f"Loading {json_path}...")
    with open(json_path, 'r') as f:
        coco_data = json.load(f)
    
    images = {img['id']: img for img in coco_data['images']}
    
    empty_masks = []
    invalid_polygons = []
    total_annotations = len(coco_data['annotations'])
    
    print(f"Checking {total_annotations} annotations...")
    
    for idx, ann in enumerate(coco_data['annotations']):
        if idx % 1000 == 0:
            print(f"Progress: {idx}/{total_annotations}")
        
        if 'segmentation' not in ann or ann['segmentation'] is None or ann['segmentation'] == []:
            continue
        
        img_id = ann['image_id']
        img_info = images[img_id]
        h, w = img_info['height'], img_info['width']
        
        segm = ann['segmentation']
        
        try:
            # Convert to RLE format (this is what the code does internally)
            if isinstance(segm, list):
                # Polygon format
                if len(segm) == 0:
                    empty_masks.append({
                        'annotation_id': ann.get('id', idx),
                        'image_id': img_id,
                        'image_file': img_info['file_name'],
                        'reason': 'Empty polygon list',
                        'segmentation': segm
                    })
                    continue
                
                # Check if polygon is valid
                for poly in segm:
                    if len(poly) < 6:  # Need at least 3 points (x,y pairs)
                        invalid_polygons.append({
                            'annotation_id': ann.get('id', idx),
                            'image_id': img_id,
                            'image_file': img_info['file_name'],
                            'reason': f'Polygon has only {len(poly)//2} points',
                            'polygon': poly
                        })
                
                # Convert polygon to RLE
                rles = mask_util.frPyObjects(segm, h, w)
                rle = mask_util.merge(rles)
                
            elif isinstance(segm['counts'], list):
                # Uncompressed RLE
                rle = mask_util.frPyObjects(segm, h, w)
            else:
                # Already compressed RLE
                rle = segm
            
            # Check if mask area is zero
            area = mask_util.area(rle)
            
            if area == 0:
                empty_masks.append({
                    'annotation_id': ann.get('id', idx),
                    'image_id': img_id,
                    'image_file': img_info['file_name'],
                    'reason': 'Zero area after RLE conversion',
                    'segmentation': segm,
                    'bbox': ann.get('bbox', None)
                })
                
        except Exception as e:
            invalid_polygons.append({
                'annotation_id': ann.get('id', idx),
                'image_id': img_id,
                'image_file': img_info['file_name'],
                'reason': f'Error during conversion: {str(e)}',
                'segmentation': str(segm)[:100]  # Truncate for readability
            })
    
    # Print summary
    print("\n" + "="*80)
    print("SUMMARY")
    print("="*80)
    print(f"Total annotations: {total_annotations}")
    print(f"Empty masks found: {len(empty_masks)}")
    print(f"Invalid polygons found: {len(invalid_polygons)}")
    
    # Print detailed results
    if empty_masks:
        print("\n" + "-"*80)
        print("EMPTY MASKS DETAILS:")
        print("-"*80)
        for i, mask_info in enumerate(empty_masks[:10]):  # Show first 10
            print(f"\n{i+1}. Annotation ID: {mask_info['annotation_id']}")
            print(f"   Image ID: {mask_info['image_id']}")
            print(f"   Image file: {mask_info['image_file']}")
            print(f"   Reason: {mask_info['reason']}")
            if 'bbox' in mask_info and mask_info['bbox']:
                print(f"   BBox: {mask_info['bbox']}")
        
        if len(empty_masks) > 10:
            print(f"\n... and {len(empty_masks) - 10} more")
    
    if invalid_polygons:
        print("\n" + "-"*80)
        print("INVALID POLYGONS DETAILS:")
        print("-"*80)
        for i, poly_info in enumerate(invalid_polygons[:10]):  # Show first 10
            print(f"\n{i+1}. Annotation ID: {poly_info['annotation_id']}")
            print(f"   Image ID: {poly_info['image_id']}")
            print(f"   Image file: {poly_info['image_file']}")
            print(f"   Reason: {poly_info['reason']}")
        
        if len(invalid_polygons) > 10:
            print(f"\n... and {len(invalid_polygons) - 10} more")
    
    # Save detailed report to file
    report_path = json_path.replace('.json', '_mask_check_report.json')
    report = {
        'summary': {
            'total_annotations': total_annotations,
            'empty_masks_count': len(empty_masks),
            'invalid_polygons_count': len(invalid_polygons)
        },
        'empty_masks': empty_masks,
        'invalid_polygons': invalid_polygons
    }
    
    with open(report_path, 'w') as f:
        json.dump(report, f, indent=2)
    
    print(f"\n\nDetailed report saved to: {report_path}")
    
    return empty_masks, invalid_polygons


# Usage
if __name__ == "__main__":
    json_path = "./result/plants_coco_p640_o0_6tiff/images/train/train_annotations_cleaned.json"  # Change this to your actual path
    empty_masks, invalid_polygons = check_empty_masks(json_path)

    json_path = "./result/plants_coco_p640_o0_6tiff/images/test/test_annotations_cleaned.json"  # Change this to your actual path
    empty_masks, invalid_polygons = check_empty_masks(json_path)

Loading ./result/plants_coco_p640_o0_6tiff/images/train/train_annotations_cleaned.json...
Checking 9287 annotations...
Progress: 0/9287
Progress: 1000/9287
Progress: 2000/9287
Progress: 3000/9287
Progress: 4000/9287
Progress: 5000/9287
Progress: 6000/9287
Progress: 7000/9287
Progress: 8000/9287
Progress: 9000/9287

SUMMARY
Total annotations: 9287
Empty masks found: 0
Invalid polygons found: 0


Detailed report saved to: ./result/plants_coco_p640_o0_6tiff/images/train/train_annotations_cleaned_mask_check_report.json
Loading ./result/plants_coco_p640_o0_6tiff/images/test/test_annotations_cleaned.json...
Checking 3228 annotations...
Progress: 0/3228
Progress: 1000/3228
Progress: 2000/3228
Progress: 3000/3228

SUMMARY
Total annotations: 3228
Empty masks found: 0
Invalid polygons found: 0


Detailed report saved to: ./result/plants_coco_p640_o0_6tiff/images/test/test_annotations_cleaned_mask_check_report.json
