<a href="https://colab.research.google.com/github/stephenjkaplan/snow-grooming-object-detection/blob/master/Object_Detection_Transfer_Learning_with_PyTorch.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Object Detection - Transfer Learning with PyTorch

This notebook can be used generally to fine tune a Faster R-CNN model with custom images/annotations in PyTorch to perform object detection on a domain specific task. 

It was initially written to perform this task and apply the model to video footage of ski resorts to serve as a proof of concept for autonomous snow grooming vehicles. More info [here](https://github.com/stephenjkaplan/snow-grooming-object-detection).


Stephen Kaplan, 9-16-2020

## Setup

### Download additional utility files.

**Torchvision**

Clones functionality form the `torchvision` library not directly accessible through a normal `pip install` command.

In [None]:
%%shell

# Download TorchVision repo to use some files from
# references/detection
git clone https://github.com/pytorch/vision.git
cd vision
git checkout v0.3.0

cp references/detection/utils.py ../
cp references/detection/transforms.py ../
cp references/detection/coco_eval.py ../
cp references/detection/engine.py ../
cp references/detection/coco_utils.py ../

**Clone full repository containing**

### Global Variables

**Detectable Objects**

Define list of objects you plan to detect. These must match the data 
downloaded in the data acquisition step.

In [None]:
obj_class_labels = ['tree', 'person', 'street light']

**Current Working Directory**

Specify current working directory. If you are working in Google Colab, you should follow a similar convention as the example below.

In [None]:
root_dir = '/content/drive/My Drive/Colab Notebooks/snow_grooming/'

### Imports 

Import library, and mount Google Drive. If not using a Google Colab notebook, you should Comment the relevant code out. *Tip: Manually copy and paste the verification code that Google provides when mounting the Drive. Using the copy button doesn't seem to work well.*

In [None]:
import os
import sys
import time
from datetime import datetime
from google.colab import drive
drive.mount("/content/drive")

import numpy as np
from PIL import Image
from ytdownloader.downloader import Downloader

import cv2
import utils
import torch
import torchvision

# custom modules
import sys
root_dir = '/content/drive/My Drive/Colab Notebooks/snow_grooming/'
sys.path.append(root_dir)

from utilities import train_model, evaluate
from 
from dataset import GoogleOpenImageDataset

### Set Device
Sets to CPU if GPU not available. In order to enable GPU in Google Colab, 
select `Runtime` --> `Change runtime type` and select `GPU` for `Hardware accelerator`.

In [None]:
device = torch.device('cuda') if torch.cuda.is_available() else torch.device('cpu')

## Data Acquisition

### Download data.

### Explore data format.

In [None]:
dataset = GoogleOpenImageDataset(root_dir, obj_class_labels, max_images_per_class=5000, train=True)

In [None]:
dataset[40]

## Model Selection

*Note: I would have preferred to do K-Folds Cross Validation while selecting the model, but iteration was too slow given the time constraints of the project. I chose to just use one training and validation set, but recognize that isn't an optimal strategy for hyperparameter selection.*

##### Create training / validation / test dataset splits.

In [None]:
# create 2 versions of the dataset. one used for training 
dataset_train = GoogleOpenImageDataset(root_dir, obj_class_labels, max_images_per_class=5000, train=True)
dataset_train_val = GoogleOpenImageDataset(root_dir, obj_class_labels, max_images_per_class=5000, train=True)
dataset_val = GoogleOpenImageDataset(root_dir, obj_class_labels, max_images_per_class=5000)
dataset_test = GoogleOpenImageDataset(root_dir, obj_class_labels, max_images_per_class=5000)

In [None]:
total_size = len(dataset_test)

# for trainval/test split
train_val_percent = 0.80
test_percent = 0.20
train_val_size = int(train_val_percent*total_size)
test_size = total_size - train_val_size
splits_1 = [train_val_size, test_size]

# for train/val split
train_percent = 0.80
val_percent = 0.20
train_size = int(train_percent*train_val_size)
val_size = train_val_size - train_size
splits_2 = [train_size, val_size]

In [None]:
# split the dataset in train, val and test set
torch.manual_seed(1)
indices = torch.randperm(total_size).tolist()

train_val_idx, test_idx = torch.utils.data.random_split(indices, splits_1)
train_idx, val_idx = torch.utils.data.random_split(train_val_idx, splits_2)

# make subsets based on train/val/test splits
dataset_train_val = torch.utils.data.Subset(dataset_train_val, train_val_idx)
dataset_train = torch.utils.data.Subset(dataset_train, train_idx)
dataset_val = torch.utils.data.Subset(dataset_val, val_idx)
dataset_test = torch.utils.data.Subset(dataset_test, test_idx)

# define training and validation data loaders
data_loader_train_val = torch.utils.data.DataLoader(
    dataset_train_val, batch_size=2, shuffle=True, num_workers=4,
    collate_fn=utils.collate_fn)

data_loader_train = torch.utils.data.DataLoader(
    dataset_train, batch_size=2, shuffle=True, num_workers=4,
    collate_fn=utils.collate_fn)

data_loader_val = torch.utils.data.DataLoader(
    dataset_val, batch_size=1, shuffle=False, num_workers=4,
    collate_fn=utils.collate_fn)

data_loader_test = torch.utils.data.DataLoader(
    dataset_test, batch_size=1, shuffle=False, num_workers=4,
    collate_fn=utils.collate_fn)

##### Define hyperparameters.


In [None]:
# optimizer
learning_rate = 0.001
momentum = 0.9
weight_decay = 0.0005

# learning rate schedule
step_size = 3   # learning rate will step every __ epochs
gamma = 0.1    # learning rate will be multiplied by gamma every step 

num_epochs = 10
trainable_layers = 3

##### Train Neural Network

In [None]:
train_model(obj_class_labels, trainable_layers, device, learning_rate, momentum, 
            weight_decay, step_size, gamma, num_epochs, data_loader_train, 
            data_loader_val=data_loader_val, score_val=True)

## Final Model

#### Train Neural Network
Train final model with ALL training data.

In [None]:
model = train_model(obj_class_labels, trainable_layers, device, learning_rate, momentum, 
                    weight_decay, step_size, gamma, num_epochs=5, 
                    data_loader_train=data_loader_train_val, data_loader_val=data_loader_test) 

#### Evaluate on test set.

In [None]:
evaluate(model, data_loader_test, device=device)

#### Persist model.

In [None]:
if not os.path.exists('models'):
    os.mkdir('models')

torch.save(model, f'{root_dir}models/final_model.pkl')

## Prediction & Visualization

In [None]:
model = torch.load('/content/drive/My Drive/Colab Notebooks/snow_grooming/models/final_model.pkl')

##### Pick an image from the test set.

In [None]:
test_img_idx = 424

In [None]:
# pick one image from the test set
img, _ = dataset_test[test_img_idx]

Image.fromarray(img.mul(255).permute(1, 2, 0).byte().numpy())

##### Make a boundary box prediction.

In [None]:
def make_boundary_box_prediction(image_no_box):
# put the model in evaluation mode
  model.eval()
  with torch.no_grad():
      prediction = model([image_no_box.to(device)])

  return prediction

In [None]:
predict_example = make_boundary_box_prediction(img)
predict_example

##### Define function for drawing boundary box.

In [None]:
class_lookup_table = {
    1: (obj_class_labels[0], (255, 0, 0)),
    2: (obj_class_labels[1], (0, 255, 0)),
    3: (obj_class_labels[2], (255, 255, 0)),
}

def draw_all_boundary_boxes(image_path, prediction, threshold=0.5):
    # get boundary boxes, scores, and labels from prediction
    boxes = prediction[0]['boxes'].tolist()
    scores = prediction[0]['scores'].tolist()
    class_labels = prediction[0]['labels'].tolist()    

    image = cv2.imread(image_path)
    # im is a PIL Image object
    #im_arr = np.asarray(image)
    for box, score, label in zip(boxes, scores, class_labels):
      # convert rgb array to opencv's bgr format
      #image = cv2.cvtColor(image, cv2.COLOR_RGB2BGR)
      if score < threshold:
        continue
      x1 = int(box[0])
      y1 = int(box[3])
      x2 = int(box[2])
      y2 = int(box[1])
      # pts1 and pts2 are the upper left and bottom right coordinates of the rectangle
      cv2.rectangle(image, (x1, y1), (x2, y2), class_lookup_table[label][1], 3)
      obj_label = 'pole' if class_lookup_table[label][0] == 'street light' else class_lookup_table[label][0]
      cv2.putText(image, obj_label, (x1, y2-10), cv2.FONT_HERSHEY_SIMPLEX, 0.8, class_lookup_table[label][1], 2)
    #im_arr = cv2.cvtColor(im_arr_bgr, cv2.COLOR_BGR2RGB)
    #Image.fromarray(image)
    return image


In [None]:
test_img_idx_abs = test_idx[test_img_idx]
path = dataset.imgs[test_img_idx_abs]

In [None]:
image = draw_all_boundary_boxes(path, predict_example)
Image.fromarray(image)

## Demo

Demo on ski resort video footage.

##### Download video from YouTube.



In [None]:
#video_url = "https://www.youtube.com/watch?v=3tg_DOaUZ4Y"
video_url = "https://www.youtube.com/watch?v=LKBQ0J-RUF8"
video_quality = 1080 
only_video = True 
do_trim = False 
start = "00:28:16" 
end = "00:28:46"
#output_file = 'pb600_winchcat_full.mp4'
output_file = 'groomer.mp4'

In [None]:
yt_d = Downloader(video_url, output_file, quality=video_quality, only_vid=only_video)
yt_d.download()

if do_trim:
  yt_d.trim(start, end, delete_original=False)
  os.rename('downloaded_vid_trimmed.mp4', 'downloaded_vid.mp4')

In [None]:
for f in os.listdir():
  if 'frame' in f:
    os.remove(f)

##### Split video into frames.

In [None]:
#cap = cv2.VideoCapture('pb600_winchcat_full.mp4')
cap = cv2.VideoCapture('groomer.mp4')
i=0
while(cap.isOpened()):
    ret, frame = cap.read()
    if ret == False:
        break
    cv2.imwrite('frame'+str(i)+'.jpg',frame)
    i+=1

cap.release()
cv2.destroyAllWindows()
print(f'{i + 1} Frames Created.')

##### Draw boundary boxes on each frame.

In [None]:
for idx in range(i):
  if idx % 100 == 0:
    print(f'Making boundary box predictions ({idx}/{i})...')
  # load image
  img_frame = Image.open(f'frame{idx}.jpg').convert("RGB")
  transforms = torchvision.transforms.Compose([torchvision.transforms.ToTensor()])
  # make prediction
  prediction = make_boundary_box_prediction(transforms(img_frame))

  # draw box
  img_frame = draw_all_boundary_boxes(f'frame{idx}.jpg', prediction, threshold=0.5) 

  # resave image
  cv2.imwrite(f'frame{idx}.jpg', img_frame)

print('Done!')

In [None]:
Image.open('frame4900.jpg').convert("RGB")

In [None]:
def convert_frames_to_video(num_frames,path_out,fps):
    frame_array = []
    files = [f'frame{idx}.jpg' for idx in range(num_frames)][2000:5000]
 
    #for sorting the file names properly
    #files.sort(key = lambda x: int(x[5:-4]))
    for f, filename in enumerate(files):
        if f % 100 == 0:
            print(f'Processing frame ({f}/{len(files)})...')
        try:
          #reading each files
          img = cv2.imread(filename)
          height, width, layers = img.shape
          size = (width,height)
          #inserting the frames into an image array
          frame_array.append(img)
        except AttributeError:
          continue
 
    out = cv2.VideoWriter(path_out,cv2.VideoWriter_fourcc(*'MJPG'), fps, size)
 
    for i in range(len(frame_array)):
        # writing to a image array
        out.write(frame_array[i])
    out.release()

convert_frames_to_video(i + 1, root_dir + '/groomer_boxes.mp4', fps=30)