In [None]:
import torch
import torch.nn as nn
import torch.nn.functional as F
from tqdm.notebook import tqdm, trange
import torch.optim as optim
from torchvision import models
import numpy as np
from collections import deque
import random
import time
import torch.nn.functional as F
from torch.utils.data import ConcatDataset, DataLoader, Subset
import torchvision.models.segmentation as segmentation
from collections import defaultdict
import os

def set_seed(seed):
    random.seed(seed)
    np.random.seed(seed)
    torch.manual_seed(seed)
    if torch.cuda.is_available():
        torch.cuda.manual_seed_all(seed)
        torch.backends.cudnn.deterministic = True  # Enforce deterministic algorithms
        torch.backends.cudnn.benchmark = False     # Disable benchmark for reproducibility

    os.environ['PYTHONHASHSEED'] = str(seed)       # Seed Python hashing, which can affect ordering
set_seed(42)

### Explicit Heuristic Split Model

#### ZoeDepth - HuggingFace

In [None]:
# import os
# from pathlib import Path
# import torch
# import numpy as np
# from transformers import AutoImageProcessor, ZoeDepthForDepthEstimation
# from PIL import Image
# from tqdm import tqdm
# from torchvision import transforms

# device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
# output_dir = Path('../data/depth')
# output_dir.mkdir(parents=True, exist_ok=True)

# image_processor = AutoImageProcessor.from_pretrained("Intel/zoedepth-nyu-kitti", use_fast=True)
# model = ZoeDepthForDepthEstimation.from_pretrained("Intel/zoedepth-nyu-kitti").to(device).eval()

# # Prepare image paths list
# image_paths = df['image_path'].tolist()

# batch_size = 10  # or whatever batch size you want
# for batch_idx in tqdm(range(0, len(image_paths), batch_size)):
#     batch_paths = image_paths[batch_idx:batch_idx + batch_size]
    
#     # Load images as PIL Images (no manual transform)
#     batch_images = [Image.open(img_path).convert("RGB") for img_path in batch_paths]
    
#     # Preprocess with ZoeDepth image processor
#     inputs = image_processor(images=batch_images, return_tensors="pt").to(device)
    
#     with torch.no_grad():
#         outputs = model(**inputs)
    
#     # Post-process depth maps to original sizes
#     source_sizes = [(img.height, img.width) for img in batch_images]
#     post_processed = image_processor.post_process_depth_estimation(
#         outputs,
#         source_sizes=source_sizes
#     )
    
#     for i, depth_dict in enumerate(post_processed):
#         # Get raw depth map
#         depth_array = depth_dict["predicted_depth"].cpu().numpy()
#         img_stem = Path(batch_paths[i]).stem
#         np.save(output_dir / f"{img_stem}.npy", depth_array)
#         # Save visualization PNG
#         depth_norm = (depth_array - depth_array.min()) / (depth_array.max() - depth_array.min())
#         depth_img = Image.fromarray((depth_norm * 255).astype(np.uint8))
#         depth_img.save(output_dir / f"{img_stem}_depth.png")



#### segmentation generation

In [None]:
from autodistill_grounded_sam import GroundedSAM
from autodistill.detection import CaptionOntology
from autodistill.utils import plot
import cv2
import pickle
import bz2


import gc
gc.collect()
torch.cuda.empty_cache()

In [None]:
# define an ontology to map class names to our GroundedSAM prompt
# the ontology dictionary has the format {caption: class}
# where caption is the prompt sent to the base model, and class is the label that will
# be saved for that caption in the generated annotations
# then, load the model
base_model = GroundedSAM(
    ontology=CaptionOntology(
        {
            "human . child . person": "human",
            "robot": "robot",
            "dog": "dog"
        }
    )
)

# human : human, anima: animal, robot:robot

In [None]:
# # run inference on a single image
# results = base_model.predict(df['image_path'].iloc[0])

# plot(
#     image=cv2.imread(df['image_path'].iloc[0]),
#     classes=base_model.ontology.classes(),
#     detections=results
# )


In [None]:
with bz2.BZ2File('autodistill_dataset_home.pbz2', 'rb') as f:
    dataset_home = pickle.load(f)

In [None]:
# with bz2.BZ2File('autodistill_dataset.pbz2', 'wb') as f:
#     pickle.dump(dataset, f, protocol=pickle.HIGHEST_PROTOCOL)


In [None]:

gc.collect()
torch.cuda.empty_cache()

In [None]:
# dataset_home = base_model.label("../../socialsense/data/images/home", extension=".png")
# with bz2.BZ2File('autodistill_dataset_home.pbz2', 'wb') as f:
#     pickle.dump(dataset_home, f, protocol=pickle.HIGHEST_PROTOCOL)

In [None]:
with open("autodistill_temp_dataset.pkl", "rb") as f:
    dataset = pickle.load(f)

In [None]:
from PIL import Image, ImageDraw, ImageFont
import numpy as np
import os
import supervision as sv
from tqdm.notebook import tqdm, trange

# Define a color palette for your classes
# Use RGBA for masks (with transparency), RGB for boxes (solid)
class_colors = {
    0: (255, 0, 0, 100),    # Red for class 0 (human)
    1: (0, 255, 0, 100),    # Green for class 1 (robot)
    2: (0, 0, 255, 100),    # Blue for class 2 (animal)
    3: (122, 122, 0, 100),
    # Add more if you have more classes
}

def visualize_and_save_pil_colored(dataset, confidence_threshold=0.3, output_dir="../data/temp_masks_coloured"):
    os.makedirs(output_dir, exist_ok=True)
    
    for idx, (path, image, detections) in enumerate(tqdm(dataset)):
        # Convert BGR to RGB for PIL (if your image is BGR)
        rgb_image = image[:, :, ::-1]
        pil_img = Image.fromarray(rgb_image)
        draw = ImageDraw.Draw(pil_img, "RGBA")
        
        # Filter detections by confidence
        keep_indices = [i for i, conf in enumerate(detections.confidence) if conf >= confidence_threshold]
        if not keep_indices:
            print(f"No detections above threshold for image {path}")
            continue

        filtered_boxes = detections.xyxy[keep_indices]
        filtered_masks = detections.mask[keep_indices]
        filtered_confidences = detections.confidence[keep_indices]
        filtered_class_ids = detections.class_id[keep_indices]
        
        # Overlay masks with transparency and class colors
        for i, (mask, conf) in enumerate(zip(filtered_masks, filtered_confidences)):
            class_id = filtered_class_ids[i] if filtered_class_ids is not None else 0
            color = class_colors.get(class_id, (255, 255, 255, 100))  # Default white if class unknown
            # Create a colored mask with alpha
            mask_img = Image.fromarray((mask * 255).astype(np.uint8), mode="L")
            colored_mask = Image.new("RGBA", pil_img.size, color)
            # Composite the colored mask onto the image with transparency
            pil_img = Image.alpha_composite(pil_img.convert("RGBA"), Image.composite(colored_mask, Image.new("RGBA", pil_img.size), mask_img))
        
        draw = ImageDraw.Draw(pil_img)
        
        # Draw bounding boxes and confidence with class colors
        for i, (box, conf) in enumerate(zip(filtered_boxes, filtered_confidences)):
            class_id = filtered_class_ids[i] if filtered_class_ids is not None else 0
            color = class_colors[class_id][:3]  # Use RGB for boxes (no alpha)
            x1, y1, x2, y2 = map(int, box)
            draw.rectangle([x1, y1, x2, y2], outline=color, width=2)
            draw.text((x1, y1 - 10*class_id), f"{class_id}: {conf:.2f}", fill=color)
        
        # Save the image as PNG
        filename = os.path.basename(path)
        save_path = os.path.join(output_dir, os.path.splitext(filename)[0] + ".png")
        pil_img.convert("RGB").save(save_path)


# Example usage



In [None]:
import numpy as np
from supervision import Detections

def calculate_iou(box1, box2):
    """Calculate Intersection over Union for two boxes [x1,y1,x2,y2]"""
    x1_inter = max(box1[0], box2[0])
    y1_inter = max(box1[1], box2[1])
    x2_inter = min(box1[2], box2[2])
    y2_inter = min(box1[3], box2[3])
    inter_area = max(0, x2_inter - x1_inter) * max(0, y2_inter - y1_inter)
    box1_area = (box1[2] - box1[0]) * (box1[3] - box1[1])
    box2_area = (box2[2] - box2[0]) * (box2[3] - box2[1])
    return inter_area / (box1_area + box2_area - inter_area + 1e-6)

def remove_duplicates_any_class(detections: Detections, iou_threshold=0.9) -> Detections:
    """
    Remove duplicate detections using IoU and confidence, regardless of class.
    
    Args:
        detections: supervision.Detections object
        iou_threshold: IoU threshold for considering duplicates
        
    Returns:
        Filtered Detections object
    """
    # Extract components from Detections
    xyxy = detections.xyxy
    confidence = detections.confidence
    class_id = detections.class_id
    mask = detections.mask

    # Convert to list of dicts for processing
    detections_list = [
        {
            'box': xyxy[i],
            'mask': mask[i] if mask is not None else None,
            'confidence': confidence[i],
            'class_id': class_id[i]
        }
        for i in range(len(xyxy))
    ]

    # Sort by confidence (highest first)
    detections_list.sort(key=lambda x: x['confidence'], reverse=True)
    
    # Filter duplicates (do NOT check class_id)
    keep = []
    while detections_list:
        current = detections_list.pop(0)
        keep.append(current)
        detections_list = [
            d for d in detections_list
            if calculate_iou(current['box'], d['box']) <= iou_threshold
        ]

    # Reconstruct Detections object
    return Detections(
        xyxy=np.array([d['box'] for d in keep]),
        confidence=np.array([d['confidence'] for d in keep]),
        class_id=np.array([d['class_id'] for d in keep]),
        mask=np.array([d['mask'] for d in keep]) if mask is not None else None
    )

# Usage example


In [None]:
filtered_dataset = []
for path, image, detections in dataset_746:
    filtered_detections = remove_duplicates_any_class(detections, iou_threshold=0.95)
    filtered_dataset.append((path, image, filtered_detections))

In [None]:
# visualize_and_save_pil_colored(dataset, confidence_threshold=0.3)

#### depth mean std

In [None]:
import numpy as np
import os
from tqdm.notebook import tqdm

def calculate_mean_std_for_npy(folder_path):
    total_sum = 0
    total_sum_sq = 0
    total_count = 0

    # List all .npy files
    files = [f for f in os.listdir(folder_path) if f.endswith('.npy')]
    
    # Wrap the loop with tqdm for progress bar
    for filename in tqdm(files):
        file_path = os.path.join(folder_path, filename)
        img = np.load(file_path).astype(np.float64)
        total_sum += img.sum()
        total_sum_sq += (img ** 2).sum()
        total_count += img.size

    mean = total_sum / total_count if total_count > 0 else None
    variance = (total_sum_sq / total_count) - (mean ** 2) if total_count > 0 else None
    std = np.sqrt(variance) if variance is not None else None
    return mean, std


In [None]:
# calculate_mean_std_for_npy('../data/depth')

#### mask fusion

In [None]:
import cv2
import numpy as np
import torch
import torchvision.transforms as transforms
from PIL import Image
import os
from pathlib import Path

def process_segmentation_data(detections_dataset, output_dir, imagenet_mean=[0.485, 0.456, 0.406], confidence_threshold=0.3):
    """
    Process supervision Detections dataset to create social and environment images
    
    Args:
        detections_dataset: List of tuples (image_path, Detection_object, ...)
        output_dir: Base directory to save processed images
        imagenet_mean: RGB mean values for filling masked areas
    """
    
    # Create output directories
    social_dir = Path(output_dir) / "social"
    env_dir = Path(output_dir) / "environment"
    social_dir.mkdir(parents=True, exist_ok=True)
    env_dir.mkdir(parents=True, exist_ok=True)
    
    for item in tqdm(detections_dataset):
        image_path = item[0]
        detections = item[2]
        
        # Load original image
        original_image = cv2.imread(str(image_path))
        original_image = cv2.cvtColor(original_image, cv2.COLOR_BGR2RGB)
        height, width = original_image.shape[:2]
        
        # Get filename for saving
        filename = Path(image_path).name

        keep_indices = [i for i, conf in enumerate(detections.confidence) if conf >= confidence_threshold]
        filtered_masks = detections.mask[keep_indices]
        
        # Combine all masks into one
        combined_mask = combine_masks(filtered_masks, height, width)
        
        # Create social image (only people and robot visible)
        social_image = apply_mask_with_mean(
            original_image, combined_mask, imagenet_mean, keep_masked=True
        )
        
        # Create environment image (room only, people and robot masked out)
        env_image = apply_mask_with_mean(
            original_image, combined_mask, imagenet_mean, keep_masked=False
        )
        
        # Save images
        social_path = social_dir / filename
        env_path = env_dir / filename
        
        save_image(social_image, social_path)
        save_image(env_image, env_path)


def combine_masks(masks, height, width, confidence_threshold=0.3):
    """
    Combine multiple masks into a single binary mask using union operation
    
    Args:
        masks: Array of individual masks from Detection object
        height, width: Dimensions of the original image
        
    Returns:
        combined_mask: Single binary mask (1 = object, 0 = background)
    """
    if masks is None or len(masks) == 0:
        return np.zeros((height, width), dtype=np.uint8)
    
    # Initialize combined mask
    combined_mask = np.zeros((height, width), dtype=np.uint8)
    
    # Union all individual masks using maximum function (as shown in search results)
    for mask in masks:
        # Ensure mask is the right size
        if mask.shape != (height, width):
            raise ValueError(f"Mask shape incorrect: {mask.shape}, should be {(height, width)}")
        
        # Union operation: take maximum of current combined mask and new mask
        combined_mask = np.maximum(combined_mask, mask.astype(np.uint8))
    
    return combined_mask

def apply_mask_with_mean(image, mask, imagenet_mean, keep_masked=True):
    """
    Apply mask to image and fill empty areas with ImageNet mean values
    
    Args:
        image: Original RGB image (H, W, 3)
        mask: Binary mask (H, W) where 1 = object, 0 = background
        imagenet_mean: RGB mean values [R, G, B] in range [0, 1]
        keep_masked: If True, keep masked areas (social). If False, remove masked areas (environment)
        
    Returns:
        processed_image: Image with mask applied and filled with mean values
    """
    processed_image = image.copy().astype(np.float32) / 255.0
    
    # Convert imagenet_mean to same range as image
    mean_values = np.array(imagenet_mean).reshape(1, 1, 3)
    
    if keep_masked:
        # Social image: keep people/robot, fill background with mean
        fill_mask = (mask == 0)  # Areas to fill (background)
    else:
        # Environment image: keep background, fill people/robot with mean  
        fill_mask = (mask == 1)  # Areas to fill (people/robot)
    
    # Fill specified areas with ImageNet mean values
    for c in range(3):  # RGB channels
        processed_image[:, :, c][fill_mask] = imagenet_mean[c]
    
    # Convert back to uint8
    processed_image = (processed_image * 255).astype(np.uint8)
    
    return processed_image

def save_image(image, save_path):
    """
    Save image to specified path
    
    Args:
        image: RGB image array (H, W, 3)
        save_path: Path to save the image
    """
    # Convert to PIL Image and save
    pil_image = Image.fromarray(image)
    pil_image.save(save_path)


In [None]:
# with bz2.BZ2File('autodistill_dataset_home.pbz2', 'rb') as f:
#     dataset_home = pickle.load(f)

# process_segmentation_data(
#     detections_dataset=dataset_home,
#     output_dir='../data/masked',
#     imagenet_mean=[0.485, 0.456, 0.406]  # ImageNet RGB means
# )
