In [None]:
import os
import numpy as np
import torch
from PIL import Image

import torchvision.transforms as T


class ClothingDataset(torch.utils.data.Dataset):
    def __init__(self, root, transforms):
        self.root = root
        self.transforms = transforms
        # load all image files, sorting them to ensure that they are aligned.
        self.imgs = list(sorted(os.listdir(os.path.join(root, "png_images/IMAGES"))))
        self.masks = list(sorted(os.listdir(os.path.join(root, "png_masks/MASKS"))))

    def __getitem__(self, idx):
        # load images and masks
        img_path = os.path.join(self.root, "png_images/IMAGES", self.imgs[idx])
        mask_path = os.path.join(self.root, "png_masks/MASKS",self.masks[idx])
        img = Image.open(img_path).convert("RGB")
        mask = Image.open(mask_path).convert("L") # grey-scale

        # transform = T.RandomHorizontalFlip(p=0.5)
        randomize=1 if torch.rand(1)>=0.5 else 0
        if randomize==1:
          transform = T.RandomHorizontalFlip(p=randomize)
          img=transform(img)
          mask=transform(mask)
        mask = np.array(mask)
        # instances are encoded as different colors
        obj_ids = np.unique(mask)
        if len(obj_ids)==1:
            return
        # first id is the background, so remove it
        obj_ids = obj_ids[1:]

        # split the color-encoded mask into a set of binary masks.
        masks = mask == obj_ids[:, None, None]

        # get bounding box coordinates for each mask
        num_objs = len(obj_ids)
        boxes = []
        for i in range(num_objs):
            pos = np.nonzero(masks[i])
            xmin = np.min(pos[1])
            xmax = np.max(pos[1])
            ymin = np.min(pos[0])
            ymax = np.max(pos[0])
            boxes.append([xmin, ymin, xmax, ymax])
        def extract_masks(holistic_mask):
            unique_pixels = np.unique(holistic_mask)
            masker = []
            for pixel_value in unique_pixels:
                binary_mask = (holistic_mask == pixel_value).astype(np.uint8)
                masker.append(binary_mask)
            return masker
        masks = extract_masks(mask)
        # if len(boxes)==0:
        #   boxes.append([1,1,1,1])

        # convert everything into a torch.Tensor
        boxes = torch.as_tensor(boxes, dtype=torch.float32)
        # there is only one class
        # labels = torch.ones((num_objs,), dtype=torch.int64)
        # labels = torch.zeros((58,), dtype=torch.int64)
        labels=[]
        for i in obj_ids:
          # print(i)
          labels.append(i)
        labels=torch.as_tensor(labels, dtype=torch.int64)
        masks = torch.as_tensor(masks, dtype=torch.uint8)

        image_id = torch.tensor([idx])
        area = (boxes[:, 3] - boxes[:, 1]) * (boxes[:, 2] - boxes[:, 0])

        target = {}
        target["boxes"] = boxes
        target["labels"] = labels
        target["masks"] = masks
        target["image_id"] = image_id


        if self.transforms is not None:
          img = self.transforms(img)
        return img, target

    def __len__(self):
        return len(self.imgs)

In [None]:
from google.colab import drive
drive.mount('/content/drive', force_remount=True)

In [None]:
import os
os.chdir('/content/drive/My Drive/Colab Notebooks')  # Change directory to the root of your Google Drive
os.listdir('.')  # Lists the files in the current directory

In [None]:
import torchvision.transforms as T

img = Image.open("/content/drive/My Drive/Colab Notebooks/png_masks/MASKS/seg_0001.png").convert("L")
img_tensor = T.PILToTensor()(img)
img_tensor.shape
# img_tensor.unique()
# m=get_transform(True)
# print(typer)

In [None]:
#-----------------IGNORE---------------------------------------------

import torchvision
from torchvision.models.detection.faster_rcnn import FastRCNNPredictor

# load a model pre-trained on COCO
model = torchvision.models.detection.fasterrcnn_resnet50_fpn(weights="DEFAULT")

# replace the classifier with a new one, that has
# num_classes which is user-defined
num_classes = 58  # 1 class (person) + background
# get number of input features for the classifier
in_features = model.roi_heads.box_predictor.cls_score.in_features
# replace the pre-trained head with a new one
model.roi_heads.box_predictor = FastRCNNPredictor(in_features, num_classes)
print(model)

In [None]:
import torchvision
from torchvision.models.detection.faster_rcnn import FastRCNNPredictor
from torchvision.models.detection.mask_rcnn import MaskRCNNPredictor


def get_model_instance_segmentation(num_classes):
    # load an instance segmentation model pre-trained on COCO
    model = torchvision.models.detection.maskrcnn_resnet50_fpn(weights="DEFAULT")

    # get number of input features for the classifier
    in_features = model.roi_heads.box_predictor.cls_score.in_features
    # replace the pre-trained head with a new one
    model.roi_heads.box_predictor = FastRCNNPredictor(in_features, num_classes)

    # now get the number of input features for the mask classifier
    in_features_mask = model.roi_heads.mask_predictor.conv5_mask.in_channels
    hidden_layer = 128
    # and replace the mask predictor with a new one
    model.roi_heads.mask_predictor = MaskRCNNPredictor(in_features_mask,
                                                       hidden_layer,
                                                       num_classes)
    # print(model)

    return model

In [None]:
model = get_model_instance_segmentation(num_classes=58)
# model=torchvision.models.detection.maskrcnn_resnet50_fpn(weights="DEFAULT")
# model=torchvision.models.detection.rpn
print(model)

In [None]:
import torchvision.transforms as T

def get_transform():
    transforms = []
    transforms.append(T.PILToTensor())
    transforms.append(T.ConvertImageDtype(torch.float))
    # if train:
    #     transforms.append(T.RandomHorizontalFlip(0.5))
    # transforms.append(T.Resize((height, width)))
    return T.Compose(transforms)

In [None]:
import torch
import torchvision
from torchvision import transforms, utils
# def collate_fn(batch):
#     return tuple(zip(*batch))
def collate_fn(batch):
    batch = [data for data in batch if data is not None]
    # Your remaining collate logic here
    return tuple(zip(*batch))

model = torchvision.models.detection.fasterrcnn_resnet50_fpn(weights="DEFAULT")
dataset = ClothingDataset('/content/drive/My Drive/Colab Notebooks', transforms=get_transform())
data_loader = torch.utils.data.DataLoader(
 dataset, batch_size=2, shuffle=True,collate_fn=collate_fn)
# For Training
images,targets = next(iter(data_loader))
images = list(image for image in images)
targets = [{k: v for k, v in t.items()} for t in targets]
output = model(images,targets)   # Returns losses and detections
# For inference
model.eval()
x = [torch.rand(3, 300, 400), torch.rand(3, 500, 400)]
predictions = model(x)           # Returns predictions

In [None]:
def train_one_epoch(model, optimizer, data_loader, device, epoch, print_freq):
    model.train()
    metric_logger = MetricLogger(delimiter="  ")
    metric_logger.add_meter('lr', SmoothedValue(window_size=1, fmt='{value:.6f}'))
    header = 'Epoch: [{}]'.format(epoch)

    for images, targets in metric_logger.log_every(data_loader, print_freq, header):
        images = list(image.to(device) for image in images)
        targets = [{k: v.to(device) for k, v in t.items()} for t in targets]

        loss_dict = model(images, targets)

        losses = sum(loss for loss in loss_dict.values())
        loss_dict_reduced = reduce_dict(loss_dict)
        losses_reduced = sum(loss for loss in loss_dict_reduced.values())

        optimizer.zero_grad()
        losses.backward()
        optimizer.step()

        metric_logger.update(loss=losses_reduced, **loss_dict_reduced)
        metric_logger.update(lr=optimizer.param_groups[0]["lr"])

# train on the GPU or on the CPU, if a GPU is not available
device = torch.device('cuda') if torch.cuda.is_available() else torch.device('cpu')

# our dataset has two classes only - background and person
num_classes = 59
dataset = ClothingDataset('/content/drive/MyDrive/Clothing DataSet', get_transform())
dataset_test = ClothingDataset('/content/drive/MyDrive/Clothing DataSet', get_transform())

# split the dataset into train and test sets
indices = torch.randperm(len(dataset)).tolist()
dataset = torch.utils.data.Subset(dataset, indices[:-50])
dataset_test = torch.utils.data.Subset(dataset_test, indices[-50:])
print(len(dataset))

# define training and validation data loaders
data_loader = torch.utils.data.DataLoader(dataset, batch_size=2, shuffle=True, num_workers=2,collate_fn=collate_fn)

data_loader_test = torch.utils.data.DataLoader(dataset_test, batch_size=1, shuffle=False, num_workers=2,collate_fn=collate_fn)

# get the model using our helper function
model = get_model_instance_segmentation(num_classes)

# move model to the right device
model.to(device)

# construct an optimizer
params = [p for p in model.parameters() if p.requires_grad]
optimizer = torch.optim.SGD(params, lr=0.005, momentum=0.9, weight_decay=0.0005)
# # and a learning rate scheduler
lr_scheduler = torch.optim.lr_scheduler.StepLR(optimizer, step_size=3, gamma=0.1)
# optimizer=torch.optim.Adam(model.parameters(), lr=0.003, betas=(0.9, 0.9), eps=1)# and a learning rate scheduler
# lr_scheduler = torch.optim.lr_scheduler.StepLR(optimizer, step_size=3, gamma=0.1)
# let's train it for 10 epochs
num_epochs = 3

for epoch in range(num_epochs):
  train_one_epoch(model, optimizer, data_loader, device, epoch, print_freq=10)
  lr_scheduler.step()
  # evaluate(model, data_loader_test, device=device)


In [None]:
def train_one_epoch(model, optimizer, data_loader, device, epoch, print_freq):
    model.train()
    metric_logger = MetricLogger(delimiter="  ")
    metric_logger.add_meter('lr', SmoothedValue(window_size=1, fmt='{value:.6f}'))
    header = 'Epoch: [{}]'.format(epoch)

    for images, targets in metric_logger.log_every(data_loader, print_freq, header):
        images = list(image.to(device) for image in images)
        targets = [{k: v.to(device) for k, v in t.items()} for t in targets]

        loss_dict = model(images, targets)

        losses = sum(loss for loss in loss_dict.values())
        loss_dict_reduced = reduce_dict(loss_dict)
        losses_reduced = sum(loss for loss in loss_dict_reduced.values())

        optimizer.zero_grad()
        losses.backward()
        optimizer.step()

        metric_logger.update(loss=losses_reduced, **loss_dict_reduced)
        metric_logger.update(lr=optimizer.param_groups[0]["lr"])

# train on the GPU or on the CPU, if a GPU is not available
device = torch.device('cuda') if torch.cuda.is_available() else torch.device('cpu')

# our dataset has two classes only - background and person
num_classes = 59
# dataset = ClothingDataset('/content/drive/MyDrive/Clothing DataSet', get_transform(train=True))
dataset_test = ClothingDataset('/content/drive/MyDrive/Clothing DataSet', get_transform(),train=False)

# split the dataset into train and test sets
indices = torch.randperm(len(dataset)).tolist()
dataset = torch.utils.data.Subset(dataset, indices[:-50])
dataset_test = torch.utils.data.Subset(dataset_test, indices[-50:])

# define training and validation data loaders
data_loader = torch.utils.data.DataLoader(dataset, batch_size=2, shuffle=True, num_workers=2,collate_fn=collate_fn)

data_loader_test = torch.utils.data.DataLoader(dataset_test, batch_size=1, shuffle=False, num_workers=2,collate_fn=collate_fn)

# get the model using our helper function
model = get_model_instance_segmentation(num_classes)

# move model to the right device
model.to(device)

# construct an optimizer
params = [p for p in model.parameters() if p.requires_grad]
optimizer = torch.optim.SGD(params, lr=0.005, momentum=0.9, weight_decay=0.0005)
# and a learning rate scheduler
lr_scheduler = torch.optim.lr_scheduler.StepLR(optimizer, step_size=3, gamma=0.1)

# let's train it for 10 epochs
num_epochs = 10

for epoch in range(num_epochs):
  train_one_epoch(model, optimizer, data_loader, device, epoch, print_freq=10)
  lr_scheduler.step()
  # evaluate(model, data_loader_test, device=device)


In [None]:
device = torch.device('cuda') if torch.cuda.is_available() else torch.device('cpu')
device=torch.device('cpu')
def calculate_iou(box1, box2):
    """
    Calculate the Intersection over Union (IoU) between two bounding boxes.

    Arguments:
    - box1 (tuple or list): C noordinates of the first bounding box in the format (x1, y1, x2, y2).
    - box2 (tuple or list): Coordinates of the second bounding box in the format (x1, y1, x2, y2).

    Returns:
    - iou (float): Intersection over Union (IoU) between the two bounding boxes.
    """
    x1, y1, x2, y2 = box1
    x1_, y1_, x2_, y2_ = box2

    # Calculate the coordinates of the intersection rectangle
    inter_x1 = max(x1, x1_)
    inter_y1 = max(y1, y1_)
    inter_x2 = min(x2, x2_)
    inter_y2 = min(y2, y2_)

    # Calculate the area of intersection rectangle
    inter_area = max(0, inter_x2 - inter_x1 + 1) * max(0, inter_y2 - inter_y1 + 1)

    # Calculate the area of both bounding boxes
    box1_area = (x2 - x1 + 1) * (y2 - y1 + 1)
    box2_area = (x2_ - x1_ + 1) * (y2_ - y1_ + 1)

    # Calculate the IoU
    iou = inter_area / float(box1_area + box2_area - inter_area)

    return iou



def evaluate_bounding_boxes(predictions, target):
    iou_threshold = 0.5  # IoU threshold for considering a prediction as correct

    total_predictions = len(predictions)
    # print(total_predictions,"total")
    total_ground_truth = len(target["boxes"])
    correct_predictions = 0
    ground_truthpred=0
    total_actual_predictions=0


    for pred_box, pred_score, pred_label in predictions:
        best_iou = 0
        best_match = None
        # ground_truthpred=0

        for gt_box, gt_label in zip(target["boxes"], target["labels"]):
            # print(gt_box,gt_label)
            iou = calculate_iou(pred_box, gt_box)


            if iou > best_iou and iou > iou_threshold and pred_label == gt_label:
                best_iou = iou
                best_match = (gt_box, gt_label)
            elif iou>0.6 and pred_label!=gt_label:
                total_actual_predictions+=1


        if best_match is not None:
          correct_predictions += 1



    precision = correct_predictions / total_actual_predictions if total_actual_predictions > 0 else 0
    recall = correct_predictions / total_ground_truth if total_ground_truth > 0 else 0

    if precision + recall > 0:
        f1_score = 2 * (precision * recall) / (precision + recall)
    else:
        f1_score = 0

    return precision, recall, f1_score



def evaluate(model, data_loader, device):
    model.eval()
    metric_logger = MetricLogger(delimiter="  ")
    header = "Test:"

    with torch.no_grad():
        for images, targets in metric_logger.log_every(data_loader, 100, header):
            images = [img.to(device) for img in images]  # Move images to the correct device
            targets = [{k: v.to(device) for k, v in t.items()} for t in targets]

            model_time = time.time()
            outputs = model(images)
            model_time = time.time() - model_time

            metric_logger.update(model_time=model_time)

            # Evaluate bounding boxes
            for output, target in zip(outputs, targets):
                pred_boxes = output["boxes"].cpu().numpy()
                pred_scores = output["scores"].cpu().numpy()
                pred_labels = output["labels"].cpu().numpy()

                true_boxes = target["boxes"].cpu().numpy()
                true_labels = target["labels"].cpu().numpy()

                unique_pred_labels = set(pred_labels)  # Get unique predicted labels

                predictions = []
                for label in unique_pred_labels:
                    label_indices = np.where(pred_labels == label)[0]
                    max_score_index = np.argmax(pred_scores[label_indices])
                    max_score_box = pred_boxes[label_indices[max_score_index]]
                    max_score = pred_scores[label_indices[max_score_index]]
                    predictions.append((max_score_box, max_score, label))

                target = {"boxes": true_boxes, "labels": true_labels}
            # for output, target in zip(outputs, targets):
            #     pred_boxes = output["boxes"].cpu().numpy()
            #     pred_scores = output["scores"].cpu().numpy()
            #     pred_labels = output["labels"].cpu().numpy()

            #     true_boxes = target["boxes"].cpu().numpy()
            #     true_labels = target["labels"].cpu().numpy()

                # predictions = list(zip(pred_boxes, pred_scores, pred_labels))
                target = {"boxes": true_boxes, "labels": true_labels}

                precision, recall, f1_score = evaluate_bounding_boxes(predictions, target)
                # print(recall)

                metric_logger.update(recall=recall,precision=precision)
                    # precision=precision, recall=recall, f1_score=f1_score)
    # recall_meter = metric_logger.meters['recall']
    # recall_avg = sum(recall_meter)/ len(recall_meter) if len(recall_meter)>0 else 0.0

    # Print evaluation metrics
    print("DONE (t={:.2f}s).".format(metric_logger.meters['model_time'].global_avg))
    print("Avg Precision: {:.4f}".format(metric_logger.meters['precision'].global_avg))
    print("Avg Recall: {:.4f}".format(metric_logger.meters['recall'].global_avg))
    # print("F1 Score: {:.4f}".format(metric_logger.meters['f1_score'].global_avg))
    # print("Recall avg: {:.4f}".format(recall_avg))

evaluate(model, data_loader_test, device=device)



In [None]:
#only showing top 8 predictions for each picture
dataset = ClothingDataset('/content/drive/MyDrive/Clothing DataSet', get_transform(train=True))

dataset_test = ClothingDataset('/content/drive/MyDrive/Clothing DataSet', get_transform(train=False))
indices = torch.randperm(len(dataset)).tolist()
dataset = torch.utils.data.Subset(dataset, indices[:-50])
dataset_test = torch.utils.data.Subset(dataset_test, indices[-50:])

img, _ = dataset_test[14]
# put the model in evaluation mode
model.eval()
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model = model.to(device)
img = img.to(device)

with torch.no_grad():
    prediction = model([img])

In [None]:
Image.fromarray(img.mul(255).permute(1, 2, 0).byte().cpu().numpy())


In [None]:
prediction

In [None]:
import matplotlib.pyplot as plt
import matplotlib.patches as patches

def plot_prediction(image, prediction):
    fig, ax = plt.subplots(1)
    ax.imshow(Image.fromarray(image.mul(255).permute(1, 2, 0).byte().cpu().numpy()))
    boxes = prediction[0]['boxes'].cpu().numpy()
    labels = prediction[0]['labels'].cpu().numpy()
    scores = prediction[0]['scores'].cpu().numpy()
    labname=['null',
    'accessories',
    'bag',
    'belt',
    'blazer',
    'blouse',
    'bodysuit',
    'boots',
    'bra',
    'bracelet',
    'cape',
    'cardigan',
    'clogs',
    'coat',
    'dress',
    'earrings',
    'flats',
    'glasses',
    'gloves',
    'hair',
    'hat',
    'heels',
    'hoodie',
    'intimate',
    'jacket',
    'jeans',
    'jumper',
    'leggings',
    'loafers',
    'necklace',
    'panties',
    'pants',
    'pumps',
    'purse',
    'ring',
    'romper',
    'sandals',
    'scarf',
    'shirt',
    'shoes',
    'shorts',
    'skin',
    'skirt',
    'sneakers',
    'socks',
    'stockings',
    'suit',
    'sunglasses',
    'sweater',
    'sweatshirt',
    'swimwear',
    't-shirt',
    'tie',
    'tights',
    'top',
    'vest',
    'wallet',
    'watch',
    'wedges']
    label_data = {}
    for box, label, score in zip(boxes, labels, scores):
        if label not in label_data or score > label_data[label]['score'] and label!=0:
            label_data[label] = {'score': score, 'box': box}

    unique_labels = list(label_data.keys())
    highest_scores = [label_data[label]['score'] for label in unique_labels]
    corresponding_boxes = [label_data[label]['box'] for label in unique_labels]
    counter=0
    for label, score, box in zip(unique_labels, highest_scores, corresponding_boxes):
      if score > 0.0and counter<=8:  # Filter boxes based on score threshold
            # Get box coordinates
            xmin, ymin, xmax, ymax = box


            # Create a rectangle patch
            rect = patches.Rectangle((xmin, ymin), xmax - xmin, ymax - ymin,
                                     linewidth=1, edgecolor='r', facecolor='none')

            # Add the patch to the plot
            ax.add_patch(rect)

            # Add label and score text
            label_text = f"Label: {labname[label.item()]}"
            score_text = f"Score: {score:.2f}"
            ax.text(xmin, ymin, label_text, fontsize=8, color='r')
            ax.text(xmin, ymin + 15, score_text, fontsize=8, color='r')
            counter+=1
            print(labname[label.item()])

    plt.axis('off')
    plt.show()


plot_prediction(img.cpu(), prediction)

In [None]:
img, _ = dataset_test[16]
# put the model in evaluation mode
model.eval()
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model = model.to(device)
img = img.to(device)

with torch.no_grad():
    prediction = model([img])
Image.fromarray(img.mul(255).permute(1, 2, 0).byte().cpu().numpy())


In [None]:
plot_prediction(img.cpu(), prediction)

In [None]:
img, _ = dataset_test[22]
# put the model in evaluation mode
model.eval()
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model = model.to(device)
img = img.to(device)

with torch.no_grad():
    prediction = model([img])
Image.fromarray(img.mul(255).permute(1, 2, 0).byte().cpu().numpy())

In [None]:
plot_prediction(img.cpu(), prediction)

In [None]:
import numpy as np
from PIL import Image
mask_tensor = prediction[0]["masks"].cpu().numpy()  # Convert tensor to numpy array
mask_index = 0 # Index of the mask to print

mask_array = np.squeeze(mask_tensor[mask_index])  # Get the selected mask array and remove unnecessary dimensions
mask_image = Image.fromarray((mask_array * 255).astype(np.uint8))

mask_image.save("mask_image.png")
mask_image


In [None]:
import numpy as np
import matplotlib.pyplot as plt

# Assuming prediction[0]["masks"] is a tensor
mask_tensor = prediction[0]["masks"].cpu().numpy()  # Convert tensor to numpy array

# Create a blank canvas with the same dimensions as the masks
num_masks = mask_tensor.shape[0]  # Number of masks
height, width = mask_tensor.shape[-2:]  # Height and width of each mask
canvas = np.zeros((height, width))

# Overlay each mask on the canvas
for mask_index in range(num_masks):
    mask_array = np.squeeze(mask_tensor[mask_index])  # Get the mask array
    canvas += mask_array

# Normalize the values of the canvas to [0, 1]
canvas = np.clip(canvas, 0, 1)

# Display the canvas
plt.imshow(canvas, cmap='gray')
plt.axis('off')
plt.show()


In [None]:
import numpy as np
import matplotlib.pyplot as plt
import matplotlib.cm as cm

# Assuming prediction[0]["masks"] is a tensor
mask_tensor = prediction[0]["masks"].cpu().numpy()  # Convert tensor to numpy array

# Create a blank canvas with the same dimensions as the masks
num_masks = mask_tensor.shape[0]  # Number of masks
height, width = mask_tensor.shape[-2:]  # Height and width of each mask
canvas = np.zeros((height, width))

# Overlay each mask on the canvas
for mask_index in range(num_masks):
    mask_array = np.squeeze(mask_tensor[mask_index])  # Get the mask array
    canvas += mask_array

# Normalize the values of the canvas to [0, 1]
canvas = np.clip(canvas, 0, 1)

# Convert the grayscale mask to RGB
mask_rgb = cm.inferno(canvas)  # Apply a color map (e.g., inferno) to the grayscale image

# Display the RGB mask
plt.imshow(mask_rgb)
plt.axis('off')
plt.show()
