Build CNN Classifier using Simulation data and test it using real images
==============================================================

Get Train and Test data using the simulation
----------------------------------------------



<div class="alert alert-block alert-info">
This section of the tutorial assumes that you have a functional ISAAC simulation working.
If not, you'll have to use pre-generated data for this tutorial purpose (not recommended).
</div>

Start the simulation:

`roslaunch isaac sim.launch gzclient:=true pose:="10.5 -8 4.5 0 0 0 1"`

Spawn the object

`roslaunch isaac_gazebo spawn_object.launch spawn:=sock`

Add an object to the simulation to be the anomaly (in this case a sock):

`rosrun img_analysis get_train_data -path_dataset $PATH_DATASET -vent_poses $VENT_POSES -other_poses $OTHER_POSES [OPTIONS]`

Arguments:
 - `path_dataset`        - Path to where to save the datasets, mandatory to define.
 - `vent_poses`          - *.txt* file containing the vent poses
 - `other_poses`         - *.txt* file containing the other non-vent poses
 - `robot_dist`          - Robot's distance to vent, standard is 1m
 - `train_pics_per_vent` - Number of pictures taken per vent/other for train data
 - `test_pics_per_vent`  - Number of pictures taken per vent/other for test data
 
 
 


Train CNN
-----------

In [None]:

# Parameters
data_dir = "data/vent"  # specify data path
classes = ["free", "obstacle"]  # specify the image classes
num_epochs = 30  # number of epochs to train
model_name = "model_cnn.pt"  # saved model name
trace_model_name = "traced_model_cnn.pt"  # saved traced model name

In [None]:
from pathlib import Path

import matplotlib.pyplot as plt
import numpy as np
import torch
import torch.nn.functional as F
import torchvision.transforms as transforms
from PIL import Image
from torch import nn, optim
from torchvision import datasets, models, transforms



# Define transforms for the training data and testing data
train_transforms = transforms.Compose(
    [
        transforms.Resize([256, 256]),
        transforms.RandomRotation(30),
        transforms.RandomResizedCrop(224),
        transforms.RandomHorizontalFlip(),
        transforms.ToTensor(),
        transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225]),
    ]
)

test_transforms = transforms.Compose(
    [
        transforms.Resize(224),
        transforms.ToTensor(),
        transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225]),
    ]
)

# Pass transforms in here, then run the next cell to see how the transforms look
train_data = datasets.ImageFolder(data_dir + "/train", transform=train_transforms)
test_data = datasets.ImageFolder(data_dir + "/test", transform=test_transforms)

trainloader = torch.utils.data.DataLoader(train_data, batch_size=64, shuffle=True)
testloader = torch.utils.data.DataLoader(test_data, batch_size=64)

In [None]:

# Visualize some of the data-----------------------
# helper function to un-normalize and display an image
def imshow(img):
    img = img / 2 + 0.5  # unnormalize
    plt.imshow(np.transpose(img, (1, 2, 0)))  # convert from Tensor image


# obtain one batch of training images
dataiter = iter(trainloader)
images, labels = dataiter.next()
images = images.numpy()  # convert images to numpy for display
# plot the images in the batch, along with the corresponding labels
fig = plt.figure(figsize=(25, 4))
# display 20 images
for idx in np.arange(20):
    ax = fig.add_subplot(2, 10, idx + 1, xticks=[], yticks=[])
    imshow(images[idx])
    ax.set_title(classes[labels[idx]])


In [None]:

# Use GPU if it's available
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

model = models.densenet121(pretrained=True)

# Freeze parameters so we don't backprop through them
for param in model.parameters():
    param.requires_grad = False

model.classifier = nn.Sequential(
    nn.Linear(1024, 256),
    nn.ReLU(),
    nn.Dropout(0.2),
    nn.Linear(256, 3),
    nn.LogSoftmax(dim=1),
)

criterion = nn.NLLLoss()

# Only train the classifier parameters, feature parameters are frozen
optimizer = optim.Adam(model.classifier.parameters(), lr=0.003)

In [None]:

model.to(device)

epochs = num_epochs
steps = 0
running_loss = 0
print_every = 5
test_loss_min = np.Inf
for epoch in range(epochs):
    for inputs, labels in trainloader:
        steps += 1
        # Move input and label tensors to the default device
        inputs, labels = inputs.to(device), labels.to(device)

        optimizer.zero_grad()

        logps = model.forward(inputs)
        loss = criterion(logps, labels)
        loss.backward()
        optimizer.step()

        running_loss += loss.item()

        if steps % print_every == 0:
            test_loss = 0
            accuracy = 0
            model.eval()
            with torch.no_grad():
                for inputs, labels in testloader:
                    inputs, labels = inputs.to(device), labels.to(device)
                    logps = model.forward(inputs)
                    batch_loss = criterion(logps, labels)

                    test_loss += batch_loss.item()

                    # Calculate accuracy
                    ps = torch.exp(logps)
                    top_p, top_class = ps.topk(1, dim=1)
                    equals = top_class == labels.view(*top_class.shape)
                    accuracy += torch.mean(equals.type(torch.FloatTensor)).item()

            print(
                "Epoch ",
                epoch + 1,
                "/",
                epochs,
                ".. " "Train loss: ",
                running_loss / print_every,
                ".. " "Test loss: ",
                test_loss / len(testloader),
                ".. " "Test accuracy: ",
                accuracy / len(testloader),
                "",
            )
            running_loss = 0

            # save model if validation loss has decreased
            if test_loss <= test_loss_min:
                torch.save(model.state_dict(), model_name)
                test_loss_min = test_loss

                model.to("cpu")
                # An example input you would normally provide to your model's forward() method.
                example = torch.rand(1, 3, 224, 224)
                # Use torch.jit.trace to generate a torch.jit.ScriptModule via tracing.
                traced_script_module = torch.jit.trace(model, example)
                output = traced_script_module(torch.ones(1, 3, 224, 224))
                print(output)
                traced_script_module.save(trace_model_name)
                model.to(device)

            model.train()


Validate Result using Real data
------------------------------

To validate the results using real data, let's use an image collected during ISS operations

In [None]:
from pathlib import Path

import matplotlib.pyplot as plt
import numpy as np
import torch
import torch.nn.functional as F
import torchvision.transforms as transforms
from PIL import Image
from torch import nn, optim
from torchvision import datasets, models, transforms

# Open and display image
image = Image.open("data/bags/sock_iss.jpg")
imgplot = plt.imshow(image)
plt.show()

# Open model
model = models.densenet121(pretrained=True)
model.classifier = nn.Sequential(
    nn.Linear(1024, 256),
    nn.ReLU(),
    nn.Dropout(0.2),
    nn.Linear(256, 3),
    nn.LogSoftmax(dim=1),
)
model.load_state_dict(torch.load("model_cnn.pt"))
model.eval()

# Classify Image!
test_transforms = transforms.Compose(
    [
        transforms.Resize(224),
        transforms.ToTensor(),
        transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225]),
    ]
)
image_tensor = test_transforms(image).float()
image_tensor = image_tensor.unsqueeze_(0)
output = model(image_tensor)

# Print Result
_, predicted = torch.max(output, 1)
print("Classification: ", classes[predicted])
