# Visual Robot Collision Identification

We are deploying a Kuka robot in a factory, and we want to implement a remote visual monitoring system. Part of this system includes automatic identification of collisions. We will train a machine learning model on image data to build this system. We've provided some code from previous assignments to facilitate this task.

**You should download this notebook and complete it on Colab or another platform that can access GPU hardware. For submission, please attach the notebook printout to your PDF submission on Gradescope, and also re-upload the completed notebook with outputs here.**

In [2]:
!pip install pybullet

In [3]:
import os
import time
import pybullet as p
import numpy as np

from matplotlib import pyplot as plt
import matplotlib.image as mpl_img
from tqdm import tqdm
from PIL import Image

import torch
import torch.nn as nn
import torch.optim as optim
from torch.optim import lr_scheduler
import torchvision
import torchvision.transforms as transforms
from torchvision.models import resnet18, ResNet18_Weights
from torch.utils.data import Dataset, DataLoader

In [4]:
# Set up the simulation
sim_id = p.connect(p.DIRECT)

In [5]:
# PyBullet has a lot of built-in data (e.g., robor models), so let's get access to it
import pybullet_data
p.setAdditionalSearchPath(pybullet_data.getDataPath())

In [6]:
# load up the robots!
p.resetSimulation()
plane_id=p.loadURDF('plane.urdf',
            physicsClientId=sim_id)
robot_id=p.loadURDF("kuka_iiwa/model.urdf",
            basePosition=[0,0,0],
            baseOrientation=p.getQuaternionFromEuler([0,0,0]),
            useFixedBase=True,
            physicsClientId=sim_id,
            globalScaling=1,
            flags=p.URDF_USE_IMPLICIT_CYLINDER)
cube_id=p.loadURDF('cube.urdf',
            basePosition=[0.5, 0, 0.5],
            physicsClientId=sim_id,
            globalScaling=0.3)
sphere_id=p.loadURDF('sphere2.urdf',
            basePosition=[0, 0.5, 0.5],
            physicsClientId=sim_id,
            globalScaling=0.3)
p.getNumBodies()

In [7]:
def is_collision():
    p.performCollisionDetection(physicsClientId=sim_id)
    all_contact_points = [cp for cp in p.getContactPoints(bodyA=robot_id) \
                          if cp[1] != plane_id and cp[2] != plane_id and cp[8] < 0]
    # cp[1] is first collision object, cp[2] is second collision object
    # cp[8] is collision distance, where NEGATIVE value indicates penetration (pos value is separation)
    return len(all_contact_points) > 0

In [8]:
JOINT_LIMITS = np.array([np.pi * 3/4, np.pi * 2/3, np.pi * 3/4, np.pi * 2/3, np.pi * 3/4, np.pi * 2/3, np.pi * 3/4])

# Part 1: Model Selection (4 pts)

We will be using a pre-trained ResNet-18 model, which is a type of convolutional neural network. Briefly answer the following:
* Why is a convolutional neural network a good choice for this task?
* Why is a pre-trained model a good choice for this task?


# Part 2: Data Collection (8 pts)

We will generate training data by collecting 1000 images of robots in various collision statuses. We can do so by repeating the following steps:
* Sample a random configuration within joint limits.
* Find its collision status using `is_collision()`.
* Generate and store the rgb image using `show_image()` (needs to be amended).

The images should be saved in a folder called ``robot_imgs``, with file name formats ``pose{sample_number}_{collision_status}.png``, where ``collision_status`` is 1 for collision, 0 for no collision. See below for proposed directory format.

```
robot_imgs/
  pose0_1.png
  pose1_1.png
  pose2_1.png
  pose3_0.png
  pose4_0.png
  pose5_1.png
  pose6_0.png
```



In [9]:
# run this if you need to regenerate the images
import shutil
if os.path.exists('robot_imgs'):
    shutil.rmtree('robot_imgs')
os.makedirs('robot_imgs', exist_ok=True)

In [10]:
def showImage(img_idx, collision_status, cameraPos=[2, 2, 2]):
    # Let's take some images, as a sanity check:
    viewMatrix = p.computeViewMatrix(
                cameraEyePosition=cameraPos,
                cameraTargetPosition=[0, 0, 0],
                cameraUpVector=[0, 0, 1])
    projectionMatrix = p.computeProjectionMatrixFOV(
                fov=60.0,
                aspect=1.0,
                nearVal=0.1,
                farVal=10)
    width, height, rgbImg, depthImg, segImg = p.getCameraImage(
                width=512,
                height=512,
                viewMatrix=viewMatrix,
                projectionMatrix=projectionMatrix)

    # TODO: Save the image in folder structure as shown above.
    # HINT: use matplotlib.image (already imported as mpl_img)


In [11]:
# TODO: Sample and save 1000 images of robot configurations and their collision status


# Part 3: Dataset Class (10 pts)

We will be using PyTorch to train a model. We need to have a ``Dataset`` subclass, which gives data to PyTorch in the form of tuples ``(image, label)``. We also want an option to split the images into train and test sets, which we can do by specifying `desired_indices` as the training images. Feel free to read this tutorial (from which much of the code in this assignment was adopted) for help: [PyTorch DataLoading](https://pytorch.org/tutorials/beginner/data_loading_tutorial.html).

Complete the main loop in the ``load_all_images()`` function in the ``RobotCollisionDataset`` class below by performing the following:

* Retrieve each image with index in `desired_indices`.
* Convert the image to a tensor and ensure that its dimensions are `(3,512,512)`. [`torch.from_numpy()`](https://pytorch.org/docs/stable/generated/torch.from_numpy.html) and [`torch.permute()`](https://pytorch.org/docs/stable/generated/torch.permute.html) may be useful here.
* Append the tensor to the `images` list and the corresponding collision status (1/0) to the `labels` list.

In [12]:
class RobotCollisionDataset(Dataset):
    """Face Landmarks dataset."""
    def __init__(self, root_dir, desired_indices):
        """
        Arguments:
            root_dir (string): Directory with all the images.
        """
        self.root_dir = root_dir
        self.desired_indices = desired_indices
        self.images, self.labels = self.load_all_images()

        self.transform = transforms.Compose([
            transforms.Normalize((201, 212, 230), (42, 33, 23)),
            transforms.Resize((224, 224))
        ])

    def load_all_images(self):
        """
        Creates:
        -> self.images: contains robot images from self.root_dir
        -> self.labels: self.labels[i] = 1 if self.images[i] depicts a collision, 0 otherwise
        """
        images = list()
        labels = list()
        assert os.path.exists(self.root_dir)

        for filename in os.listdir(self.root_dir):
            # TODO: Load all images from file directory
            pass

        assert len(labels) == len(images)
        return images, labels

    # Thanks https://stackoverflow.com/a/7769424
    def load_image(self, infilename) :
        img = Image.open(infilename)
        img.load()
        data = np.asarray(img, dtype="float32")
        return data

    def __len__(self):
        return len(self.labels)

    def __getitem__(self, idx):
        if torch.is_tensor(idx):
          idx = idx.tolist()
        return self.transform(self.images[idx]), self.labels[idx]

# Part 4: Loading the Data and Model (6 pts)

Now we can create our dataset and store it in two `RobotCollisionDataset` objects. First define `train_dataset` containing 900 images, and `val_dataset` containing the remaining 100 images. This should only require a couple lines of code.

Next, we will load a pre-trained ResNet model. To use it for our collision detection task, we need to ensure that the model has the correct output layer dimensionality. Recall that the output layer for a neural network is the probability distribution over categories. Natively, ResNet has 1000 dimensions in the output layer because it was trained for a 1000-category classification task.

Load the `resnet18` model with default weights into a variable called `model` and change its dimensionality as necessary. This should only require a couple lines of code. This tutorial may be helpful: [Transfer Learning for Computer Vision](https://pytorch.org/tutorials/beginner/transfer_learning_tutorial.html).

In [13]:
# TODO: Create train and validation datasets


In [14]:
# TODO: Load and adjust a pre-trained model


# Part 5: Training the Model (4 pts)

The last function we need to write is one that will train and validate the model. We have a partial implementation of `train_model()` below. Complete the loop portion that performs prediction and obtains a loss on an input. Additionally, if the current `phase` is `'train'`, you will need to perform an optimization step on the model (this second step would not be done if `phase` is `'val'`).

Note that this function essentially replicates the example shown in the [Transfer Learning](https://pytorch.org/tutorials/beginner/transfer_learning_tutorial.html) tutorial. You are free to replicate any code from there to complete the implementation.

In [15]:
def train_model(model, criterion, optimizer, scheduler, dataloaders, num_epochs=25, device='cuda'):
    since = time.time()
    best_model_params_path = 'best_model_params.pt'
    torch.save(model.state_dict(), best_model_params_path)
    best_acc = 0.0

    train_losses = list()
    train_accs = list()
    val_losses = list()
    val_accs = list()

    for epoch in range(num_epochs):
        print(f'Epoch {epoch}/{num_epochs - 1}')
        print('-' * 10)

        # Each epoch has a training and validation phase
        for phase in ['train', 'val']:
            if phase == 'train':
                model.train()  # Set model to training mode
            else:
                model.eval()   # Set model to evaluate mode

            running_loss = 0.0
            running_corrects = 0

            # Iterate over data
            for inputs, labels in tqdm(dataloaders[phase]):
                inputs = inputs.to(device)
                labels = labels.to(device)

                # zero the parameter gradients
                optimizer.zero_grad()

                # TODO: Obtain model prediction and loss on inputs
                # If in training phase, compute gradients and perform optimization step
                with torch.set_grad_enabled(phase == 'train'):
                    pass

                # statistics
                running_loss += loss.item() * inputs.size(0)
                running_corrects += torch.sum(preds == labels.data)

            if phase == 'train':
                scheduler.step()

            epoch_loss = running_loss / len(dataloaders[phase].dataset)
            epoch_acc = running_corrects.double() / len(dataloaders[phase].dataset)

            print(f'{phase} Loss: {epoch_loss:.4f} Acc: {epoch_acc:.4f}')

            if phase == 'train':
                train_losses.append(epoch_loss)
                train_accs.append(epoch_acc)
            else:
                val_losses.append(epoch_loss)
                val_accs.append(epoch_acc)

            # deep copy the model
            if phase == 'val' and epoch_acc > best_acc:
                best_acc = epoch_acc
                torch.save(model.state_dict(), best_model_params_path)

        print()

    time_elapsed = time.time() - since
    print(f'Training complete in {time_elapsed // 60:.0f}m {time_elapsed % 60:.0f}s')
    print(f'Best val Acc: {best_acc:4f}')

    # load best model weights
    model.load_state_dict(torch.load(best_model_params_path))
    return model, train_losses, train_accs, val_losses, val_accs

# Part 6: Putting Everything Together (8 pts)

Now we are ready to put everything together. The following code sets up the required components (model, criterion, optimizer, scheduler) and then runs the training function that you completed above. There are also some parameters, e.g. learning rate and number of epochs, that you can experiment with after verifying initial success.

**NOTE: Set your runtime hardware to ``T4 GPU`` only when you are ready to train the full model. Do not do so before that, or you will risk using up your GPU quota. It takes almost two hours to train on CPU, but only about five minutes on GPU.**

Run the provided code, and address the prompts below.

1. Use ``plot_losses()`` to plot the training and validation losses. Describe your observations and how they indicate that our learning task is successful (or failed).

2. Use ``visualize_model()`` to see some images along with their predictions. Do the predictions appear mostly correct? Do you see any failure cases?

3. Experiment with changing the learning rate and number of epochs. You can just try a lower and a higher value for each parameter separately. Comment on changes in model performance.

4. Experiment with loading a ResNet model that isn't pre-trained. Comment on changes in model performance.

In [16]:
learning_rate = 0.001
num_epochs = 25

data_loader_train = DataLoader(train_dataset,
                            batch_size=64, shuffle=True)
data_loader_val = DataLoader(val_dataset,
                            batch_size=64, shuffle=True)
dataloaders = {'train': data_loader_train,
               'val': data_loader_val}
device = 'cuda' if torch.cuda.is_available() else 'cpu'

model = model.to(device)
criterion = nn.CrossEntropyLoss()
# Observe that all parameters are being optimized
optimizer = optim.SGD(model.parameters(), lr=learning_rate, momentum=0.9)
# Decay LR by a factor of 0.1 every 7 epochs
scheduler = lr_scheduler.StepLR(optimizer, step_size=7, gamma=0.1)

In [17]:
model, train_losses, train_accs, val_losses, val_accs = train_model(model, criterion, optimizer, scheduler, dataloaders, num_epochs, device=device)

In [18]:
def plot_losses(train_losses, val_losses):
    plt.plot(train_losses, label='train')
    plt.plot(val_losses, label='val')
    plt.legend()
    plt.xlabel('epoch')
    plt.ylabel('loss')
    plt.show()

def imshow(inp, title=None):
    """Display image for Tensor."""
    inp = inp.numpy().transpose((1, 2, 0))
    mean = np.array([201/255, 212/255, 230/255])
    std = np.array([42/255, 33/255, 23/255])
    inp = std * inp + mean
    inp = np.clip(inp, 0, 1)
    plt.imshow(inp)
    if title is not None:
        plt.title(title)
    plt.pause(0.001)  # pause a bit so that plots are updated

def visualize_model(model, dataloaders, num_images=6, class_names={0:'free', 1:'collision'}):
    was_training = model.training
    model.eval()
    images_so_far = 0
    fig = plt.figure()

    with torch.no_grad():
        for i, (inputs, labels) in enumerate(dataloaders['val']):
            inputs = inputs.to(device)
            labels = labels.to(device)

            outputs = model(inputs)
            _, preds = torch.max(outputs, 1)

            for j in range(inputs.size()[0]):
                images_so_far += 1
                ax = plt.subplot(num_images//2, 2, images_so_far)
                ax.axis('off')
                ax.set_title(f'predicted: {class_names[preds[j].item()]}; actual: {class_names[labels[j].item()]}')
                imshow(inputs.cpu().data[j])

                if images_so_far == num_images:
                    model.train(mode=was_training)
                    return
        model.train(mode=was_training)

In [19]:
plot_losses(train_losses, val_losses)

In [21]:
visualize_model(model, dataloaders)