In [None]:
import sys
import platform
import torch
import pandas as pd
import sklearn as sk
from PIL import Image
import matplotlib.pyplot as plt
import torchvision.transforms as T
import torchvision
import numpy as np
import cv2
import random
import time
import os
import shutil

has_gpu = torch.cuda.is_available()
has_mps = torch.backends.mps.is_built()
device = "mps" if has_mps else "cuda" if torch.cuda.is_available() else "cpu"

print(f"Python Platform: {platform.platform()}")
print(f"PyTorch Version: {torch.__version__}")
print()
print(f"Python {sys.version}")
print(f"Pandas {pd.__version__}")
print(f"Scikit-Learn {sk.__version__}")
print("NVIDIA/CUDA GPU is", "available" if has_gpu else "NOT AVAILABLE")
print("MPS (Apple Metal) is", "AVAILABLE" if has_mps else "NOT AVAILABLE")
print(f"Target device is {device}")

In [None]:
model = torchvision.models.detection.maskrcnn_resnet50_fpn(pretrained=True)
model.eval()

In [None]:
COCO_INSTANCE_CATEGORY_NAMES = [
    '__background__', 'person', 'bicycle', 'car', 'motorcycle', 'airplane', 'bus',
    'train', 'truck', 'boat', 'traffic light', 'fire hydrant', 'N/A', 'stop sign',
    'parking meter', 'bench', 'bird', 'cat', 'dog', 'horse', 'sheep', 'cow',
    'elephant', 'bear', 'zebra', 'giraffe', 'N/A', 'backpack', 'umbrella', 'N/A', 'N/A',
    'handbag', 'tie', 'suitcase', 'frisbee', 'skis', 'snowboard', 'sports ball',
    'kite', 'baseball bat', 'baseball glove', 'skateboard', 'surfboard', 'tennis racket',
    'bottle', 'N/A', 'wine glass', 'cup', 'fork', 'knife', 'spoon', 'bowl',
    'banana', 'apple', 'sandwich', 'orange', 'broccoli', 'carrot', 'hot dog', 'pizza',
    'donut', 'cake', 'chair', 'couch', 'potted plant', 'bed', 'N/A', 'dining table',
    'N/A', 'N/A', 'toilet', 'N/A', 'tv', 'laptop', 'mouse', 'remote', 'keyboard', 'cell phone',
    'microwave', 'oven', 'toaster', 'sink', 'refrigerator', 'N/A', 'book',
    'clock', 'vase', 'scissors', 'teddy bear', 'hair drier', 'toothbrush'
]

## FUNCTIONS

In [None]:
def get_prediction(img_path, threshold):
    """
    Obtains predictions from the model for an image at the given path.
    """
    img = Image.open(img_path).convert("RGB")
    transform = T.Compose([T.ToTensor()])
    img_tensor = transform(img)
    with torch.no_grad():
        pred = model([img_tensor])
    pred_score = list(pred[0]['scores'].detach().numpy())
    pred_t = [pred_score.index(x) for x in pred_score if x > threshold]
    if pred_t:
        pred_t = pred_t[-1]
    else:
        pred_t = 0  
    masks = (pred[0]['masks'] > 0.5).squeeze().detach().cpu().numpy()
    pred_class = [COCO_INSTANCE_CATEGORY_NAMES[i] for i in list(pred[0]['labels'].numpy())]
    pred_boxes = [[(int(i[0]), int(i[1])), (int(i[2]), int(i[3]))] for i in list(pred[0]['boxes'].detach().numpy())]
    masks = masks[:pred_t+1]
    pred_boxes = pred_boxes[:pred_t+1]
    pred_class = pred_class[:pred_t+1]
    return masks, pred_boxes, pred_class

In [None]:
def blur_mask(img_path, threshold=0.3):
    """
    Blurs the 'person' masks in an image.
    """
    person_detected = False
    no_detection = ""
    
    masks, boxes, pred_cls = get_prediction(img_path, threshold)
    img = cv2.imread(img_path)
    img = cv2.cvtColor(img, cv2.COLOR_BGR2RGB)
    for i in range(len(masks)):
        if pred_cls[i] == 'person':  
            person_detected = True
            mask = masks[i]
            blurred = cv2.GaussianBlur(img, (41, 41), 30)  # apply Gaussian blur to the entire image
            img[mask == 1] = blurred[mask == 1]  # apply the blurred area only to the mask
            
    if not person_detected:
        print("No persons detected in the image.")
        print("Image Path is: " + img_path)
        no_detection = img_path
        return None, no_detection
        
    height, width, _ = img.shape
    plt.figure(figsize=(width / 100, height / 100))  
    plt.imshow(img)
    plt.axis('off')  
    plt.show()

    return img, no_detection

In [None]:
def process_images(input_dir, output_dir):
    """
    Blurs the 'person' masks in an image and saves the processed photo to the output folder.
    """
    unprocessed_images = []

    # create output directory if it doesn't exist
    if not os.path.exists(output_dir):
        os.makedirs(output_dir)

    # iterate through all folders and subfolders in the input directory
    for root, dirs, files in os.walk(input_dir):
        for file in files:
            if 'rgb' in file:  # process only files with 'rgb' in their name
                img_path = os.path.join(root, file)
                output_subdir = os.path.join(output_dir, os.path.relpath(root, input_dir))
                output_img_path = os.path.join(output_subdir, file)

                # Check if the output image path already exists
                if os.path.exists(output_img_path):
                    print(f"{output_img_path} already exists, skipping processing.")
                    continue

                # create output subdirectory if it doesn't exist
                if not os.path.exists(output_subdir):
                    os.makedirs(output_subdir)

                # process the image
                processed_img, no_detection = blur_mask(img_path, threshold=0.3)
                print(output_img_path)

                if no_detection:
                    unprocessed_images.append(img_path)  # Append the image path instead of the flag

                # ensure processed_img is not None and is a valid image, or use the original image if processing failed
                if processed_img is not None and len(processed_img.shape) == 3:
                    pil_img = Image.fromarray(processed_img)
                else:
                    pil_img = Image.open(img_path)
                    print(f"No detection in {img_path}. Saving the original image.")

                # save the image (processed or original) to output path
                pil_img.save(output_img_path)

            if 'bounding_boxes' in file:  # adds bounding box files
                txt_input_path = os.path.join(root, file)
                txt_output_path = os.path.join(output_subdir, file)

                # Check if the bounding box file already exists
                if os.path.exists(txt_output_path):
                    print(f"{txt_output_path} already exists, skipping copy.")
                    continue

                # copy the txt file to the output directory
                shutil.copy(txt_input_path, txt_output_path)
                print(f"Uploaded {txt_output_path} successfully.")

    for path in unprocessed_images:
        print(f"No persons detected in image: {path}")

    print(f"There are {len(unprocessed_images)} images that were not processed successfully.")

    return unprocessed_images  # Return the list of unprocessed images

## BATCH PROCESSING

In [None]:
# batch 1 - learned objects
input_dir = 'learned_objects_input1/'
output_dir = 'learned_objects'
process_images(input_dir, output_dir)

In [None]:
# batch 2 - learned objects
input_dir = 'learned_objects_input2/'
output_dir = 'learned_objects'
process_images(input_dir, output_dir)

In [None]:
# batch 3 - learned objects
input_dir = 'learned_objects_input3/'
output_dir = 'learned_objects'
process_images(input_dir, output_dir)

In [None]:
# batch 1 - test objects
input_dir = 'test_objects_input/'
output_dir = 'test_objects'
process_images(input_dir, output_dir)

In [None]:
# batch 2 - test objects
input_dir = 'test_objects_input2/'
output_dir = 'test_objects'
process_images(input_dir, output_dir);