In [1]:
import os
import glob
import xml.etree.ElementTree as ET
import torch
from torchvision.models.detection import fasterrcnn_resnet50_fpn
from torchvision.models.detection.faster_rcnn import FastRCNNPredictor
from torch.utils.data import DataLoader, Dataset
from torchvision import transforms
from PIL import Image

# Define your dataset class
class CustomDataset(Dataset):
    def __init__(self, image_folder, annotation_folder, transform=None):
        self.image_folder = image_folder
        self.annotation_folder = annotation_folder
        self.transform = transform

        # Get paths for images and annotations
        self.image_paths = glob.glob(os.path.join(image_folder, '*.jpg'))
        self.annotation_paths = [os.path.join(annotation_folder, os.path.basename(p).replace('.jpg', '.xml')) for p in self.image_paths]

    def __len__(self):
        return len(self.image_paths)

    def __getitem__(self, idx):
        image_path = self.image_paths[idx]
        annotation_path = self.annotation_paths[idx]

        # Load image
        image = Image.open(image_path).convert("RGB")

        # Load annotations
        boxes = self.parse_xml(annotation_path)

        # Filter out invalid bounding boxes
        boxes = [box for box in boxes if self.is_valid_box(box)]

        # Convert bounding box coordinates to tensors
        boxes = torch.as_tensor(boxes, dtype=torch.float32)

        # If there are no valid annotations, create dummy labels
        if len(boxes) == 0:
            boxes = torch.zeros((1, 4), dtype=torch.float32)
            labels = torch.tensor([0], dtype=torch.int64)  # Background class label
        else:
            labels = torch.ones(boxes.shape[0], dtype=torch.int64)  # Spaghetti class label

        target = {}
        target["boxes"] = boxes
        target["labels"] = labels

        if self.transform:
            image = self.transform(image)

        return image, target


    def parse_xml(self, xml_path):
        tree = ET.parse(xml_path)
        root = tree.getroot()
        boxes = []
        for obj in root.findall('object'):
            bbox = obj.find('bndbox')
            if bbox is not None:
                xmin = int(bbox.find('xmin').text)
                ymin = int(bbox.find('ymin').text)
                xmax = int(bbox.find('xmax').text)
                ymax = int(bbox.find('ymax').text)
                boxes.append([xmin, ymin, xmax, ymax])
        return boxes

    def is_valid_box(self, box):
        xmin, ymin, xmax, ymax = box
        return xmax > xmin and ymax > ymin

# Define transforms
transform = transforms.Compose([transforms.ToTensor()])

# Initialize dataset
image_folder = "/content/drive/MyDrive/print fails.v1i.voc/train/img/"
annotation_folder = "/content/drive/MyDrive/print fails.v1i.voc/train/annot/"
train_dataset = CustomDataset(image_folder=image_folder, annotation_folder=annotation_folder, transform=transform)

# Define batch size
batch_size = 2  # Adjust as needed

# Initialize dataloader with custom collate_fn
train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True, collate_fn=lambda x: tuple(zip(*x)))

# Define model
model = fasterrcnn_resnet50_fpn(pretrained=True)
num_classes = 2  # Only one class (spaghetti)
in_features = model.roi_heads.box_predictor.cls_score.in_features
model.roi_heads.box_predictor = FastRCNNPredictor(in_features, num_classes)

# Set device
device = torch.device('cuda') if torch.cuda.is_available() else torch.device('cpu')
model.to(device)

# Define optimizer and learning rate scheduler
params = [p for p in model.parameters() if p.requires_grad]
optimizer = torch.optim.SGD(params, lr=0.005, momentum=0.9, weight_decay=0.0005)
lr_scheduler = torch.optim.lr_scheduler.StepLR(optimizer, step_size=3, gamma=0.1)

# Train the model
num_epochs = 10  # Adjust as needed
for epoch in range(num_epochs):
    model.train()
    for images, targets in train_loader:
        images = list(image.to(device) for image in images)
        targets = [{k: v.to(device) for k, v in target.items()} for target in targets]  # Ensure targets are on the same device as images
        optimizer.zero_grad()
        loss_dict = model(images, targets)
        losses = sum(loss for loss in loss_dict.values())
        losses.backward()
        optimizer.step()
    print(f"Epoch [{epoch + 1}/{num_epochs}], Loss: {losses.item()}")

# Save the trained model
torch.save(model.state_dict(), 'spaghetti_detection_model.pth')


Downloading: "https://download.pytorch.org/models/fasterrcnn_resnet50_fpn_coco-258fb6c6.pth" to /root/.cache/torch/hub/checkpoints/fasterrcnn_resnet50_fpn_coco-258fb6c6.pth
100%|██████████| 160M/160M [00:02<00:00, 83.3MB/s]


Epoch [1/10], Loss: 0.5320544242858887
Epoch [2/10], Loss: 0.8433959484100342
Epoch [3/10], Loss: 0.6781300902366638
Epoch [4/10], Loss: 0.47257164120674133
Epoch [5/10], Loss: 0.6237234473228455
Epoch [6/10], Loss: 0.649628758430481
Epoch [7/10], Loss: 0.40053001046180725
Epoch [8/10], Loss: 0.2242920994758606
Epoch [9/10], Loss: 0.3859308958053589
Epoch [10/10], Loss: 0.24816091358661652


In [None]:
import os
import glob
import xml.etree.ElementTree as ET
import torch
from torchvision.models.detection import fasterrcnn_resnet50_fpn
from torchvision.models.detection.faster_rcnn import FastRCNNPredictor
from torch.utils.data import DataLoader, Dataset
from torchvision import transforms
from PIL import Image

# Define your dataset class
class CustomDataset(Dataset):
    def __init__(self, image_folder, annotation_folder, transform=None):
        self.image_folder = image_folder
        self.annotation_folder = annotation_folder
        self.transform = transform

        # Get paths for images and annotations
        self.image_paths = glob.glob(os.path.join(image_folder, '*.jpg'))
        self.annotation_paths = [os.path.join(annotation_folder, os.path.basename(p).replace('.jpg', '.xml')) for p in self.image_paths]

        # Filter out samples with invalid bounding boxes
        print("Filtering out samples with invalid bounding boxes...")
        self.valid_samples = []
        for image_path, annotation_path in zip(self.image_paths, self.annotation_paths):
            boxes = self.parse_xml(annotation_path)
            if self.has_valid_boxes(boxes):
                self.valid_samples.append((image_path, annotation_path))
        print(f"Total valid samples: {len(self.valid_samples)}")

    def __len__(self):
        return len(self.valid_samples)

    def __getitem__(self, idx):
        image_path, annotation_path = self.valid_samples[idx]

        # Load image
        print(f"Loading image: {image_path}")
        image = Image.open(image_path).convert("RGB")

        # Load annotations
        print(f"Loading annotations: {annotation_path}")
        boxes = self.parse_xml(annotation_path)

        # Convert bounding box coordinates to tensors
        boxes = torch.as_tensor(boxes, dtype=torch.float32)

        # Create dummy labels
        labels = torch.ones(boxes.shape[0], dtype=torch.int64)  # Spaghetti class label

        target = {}
        target["boxes"] = boxes
        target["labels"] = labels

        if self.transform:
            image = self.transform(image)

        return image, target

    def parse_xml(self, xml_path):
        tree = ET.parse(xml_path)
        root = tree.getroot()
        boxes = []
        for obj in root.findall('object'):
            bbox = obj.find('bndbox')
            if bbox is not None:
                xmin = int(bbox.find('xmin').text)
                ymin = int(bbox.find('ymin').text)
                xmax = int(bbox.find('xmax').text)
                ymax = int(bbox.find('ymax').text)
                boxes.append([xmin, ymin, xmax, ymax])
        return boxes

    def has_valid_boxes(self, boxes):
        return any(box[2] > box[0] and box[3] > box[1] for box in boxes)

# Define transforms
transform = transforms.Compose([transforms.ToTensor()])

# Initialize dataset
print("Initializing dataset...")
image_folder = "/content/drive/MyDrive/prob_dataset/train/img/"
annotation_folder = "/content/drive/MyDrive/prob_dataset/train/annot/"
train_dataset = CustomDataset(image_folder=image_folder, annotation_folder=annotation_folder, transform=transform)

# Define batch size
batch_size = 2  # Adjust as needed

# Initialize dataloader with custom collate_fn
print("Initializing dataloader...")
train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True, collate_fn=lambda x: tuple(zip(*x)))

# Define model
print("Defining model...")
model = fasterrcnn_resnet50_fpn(pretrained=True)
num_classes = 2  # Only one class (spaghetti)
in_features = model.roi_heads.box_predictor.cls_score.in_features
model.roi_heads.box_predictor = FastRCNNPredictor(in_features, num_classes)

# Set device
device = torch.device('cuda') if torch.cuda.is_available() else torch.device('cpu')
model.to(device)
print(f"Using device: {device}")

# Define optimizer and learning rate scheduler
print("Defining optimizer and learning rate scheduler...")
params = [p for p in model.parameters() if p.requires_grad]
optimizer = torch.optim.SGD(params, lr=0.005, momentum=0.9, weight_decay=0.0005)
lr_scheduler = torch.optim.lr_scheduler.StepLR(optimizer, step_size=3, gamma=0.1)

# Train the model
num_epochs = 10  # Adjust as needed
print("Training the model...")
for epoch in range(num_epochs):
    model.train()
    for images, targets in train_loader:
        images = list(image.to(device) for image in images)
        targets = [{k: v.to(device) for k, v in target.items()} for target in targets]  # Ensure targets are on the same device as images
        optimizer.zero_grad()
        loss_dict = model(images, targets)
        losses = sum(loss for loss in loss_dict.values())
        losses.backward()
        optimizer.step()
    print(f"Epoch [{epoch + 1}/{num_epochs}], Loss: {losses.item()}")

# Save the trained model
print("Saving the trained model...")
torch.save(model.state_dict(), '/content/drive/MyDrive/prob_dataset/spaghetti_detection_model.pth')

print("Training completed.")


In [None]:
import torch
import torchvision.transforms as T
from torchvision.models.detection import fasterrcnn_resnet50_fpn
from torchvision.models.detection.faster_rcnn import FastRCNNPredictor
import cv2

# Load the trained model
model = fasterrcnn_resnet50_fpn(pretrained=False)
num_classes = 2  # Background + Spaghetti
in_features = model.roi_heads.box_predictor.cls_score.in_features
model.roi_heads.box_predictor = FastRCNNPredictor(in_features, num_classes)
model.load_state_dict(torch.load('/content/drive/MyDrive/print fails.v1i.voc/train/spaghetti_detection_model.pth'))
model.eval()

# Define the transform to preprocess the image
transform = T.Compose([T.ToTensor()])

# Define the function to perform inference and draw bounding boxes
def predict(frame, prev_boxes=None, alpha=0.7):
    image_tensor = transform(frame).unsqueeze(0)

    # Perform inference
    with torch.no_grad():
        prediction = model(image_tensor)[0]

    # Get bounding boxes from the prediction
    current_boxes = prediction['boxes'].cpu().numpy().astype(int)

    # Apply temporal smoothing
    if prev_boxes is not None and len(prev_boxes) > 0:
        smoothed_boxes = alpha * current_boxes + (1 - alpha) * prev_boxes
        smoothed_boxes = smoothed_boxes.astype(int)
        for box in smoothed_boxes:
            xmin, ymin, xmax, ymax = box
            cv2.rectangle(frame, (xmin, ymin), (xmax, ymax), (0, 0, 255), 2)
    else:
        for box in current_boxes:
            xmin, ymin, xmax, ymax = box
            cv2.rectangle(frame, (xmin, ymin), (xmax, ymax), (0, 0, 255), 2)
        smoothed_boxes = current_boxes

    return frame, smoothed_boxes


# Define input and output video paths
input_video_path = "/content/drive/MyDrive/print fails.v1i.voc/3dprintfail.mp4"
output_video_path = "/content/drive/MyDrive/print fails.v1i.voc/detected_with_temporal_smoothing.mp4"

# Open the input video file
cap = cv2.VideoCapture(input_video_path)

# Get video properties
frame_width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
frame_height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
frame_rate = cap.get(cv2.CAP_PROP_FPS)

# Create VideoWriter object to save the output video
fourcc = cv2.VideoWriter_fourcc(*'mp4v')
out = cv2.VideoWriter(output_video_path, fourcc, frame_rate, (frame_width, frame_height))

# Process the first frame to initialize previous_boxes
ret, frame = cap.read()
prev_boxes = None

# Process each frame of the video
frame_count = 0
while ret:
    frame_with_boxes, prev_boxes = predict(frame, prev_boxes)
    out.write(frame_with_boxes)
    frame_count += 1
    print(f"Processed frame {frame_count}")
    ret, frame = cap.read()

# Release video objects
cap.release()
out.release()


Processed frame 1
Processed frame 2


ValueError: operands could not be broadcast together with shapes (0,4) (4,4) 

In [None]:
import torch
import torchvision.transforms as T
from PIL import Image, ImageDraw
from torchvision.models.detection import fasterrcnn_resnet50_fpn
from torchvision.models.detection.faster_rcnn import FastRCNNPredictor

# Load the trained model
model = fasterrcnn_resnet50_fpn(pretrained=False)
num_classes = 2  # Background + Spaghetti
in_features = model.roi_heads.box_predictor.cls_score.in_features
model.roi_heads.box_predictor = FastRCNNPredictor(in_features, num_classes)
model.load_state_dict(torch.load('/content/drive/MyDrive/print fails.v1i.voc/train/spaghetti_detection_model.pth'))
model.eval()

# Define the transform to preprocess the image
transform = T.Compose([T.ToTensor()])

# Define the function to perform inference and draw bounding boxes
def predict(image_path, output_path):
    image = Image.open(image_path).convert("RGB")
    image_tensor = transform(image).unsqueeze(0)

    # Perform inference
    with torch.no_grad():
        prediction = model(image_tensor)[0]

    # Draw bounding boxes on the image
    image_draw = image.copy()
    draw = ImageDraw.Draw(image_draw)
    for box in prediction['boxes']:
        draw.rectangle([(box[0], box[1]), (box[2], box[3])], outline="red")

    # Save the image with bounding boxes
    image_draw.save(output_path)

# Run inference on an input image and save the result
image_path = "/content/drive/MyDrive/prob_dataset/train/1672793083-107055_jpg.rf.56edc6f514bc2354fb6e0ae2c7bdfdb1.jpg"
output_path = "/content/drive/MyDrive/print fails.v1i.voc/result.jpg"

predict(image_path, output_path)
