In [1]:
"""
Solution for D2K Audubon project

Create images for bird detection
"""

import numpy as np
import utils.data_processing as dp
import utils.data_vis as vis
from utils.global_const import *

In [2]:
#######################################################################################
# Part 1 - Data processing

def test_get_file_names():
    ''' Test get_file_names() '''
    csv_files = dp.get_file_names(DATA_PATH + 'raw/', 'csv')
    jpg_files = dp.get_file_names(DATA_PATH + 'raw/', 'jpg')
    FILES['dataset'] = {'jpg': jpg_files, 'csv': csv_files}

test_get_file_names()

In [3]:
def test_csv_to_df():
    ''' Test csv_to_df() '''
    file_name = FILES['dataset']['csv'][0]
    data_df = dp.csv_to_df(file_name, COL_NAMES)
    # print(data_df)
    
test_csv_to_df()

In [4]:
def test_concat_frames():
    ''' Test concat_frames() ''' 
    concated_frame =  dp.concat_frames(FILES['dataset']['csv'], COL_NAMES)
    # print(concated_frame)

test_concat_frames()

In [5]:
def test_add_col():
    ''' Test add_col() '''
    values_dict = {}
    for key, vals in GROUPS.items():
        for val in vals:
            values_dict[val] = key
    frame = dp.concat_frames(FILES['dataset']['csv'], COL_NAMES)
    FRAMES['combined annotations'] = dp.add_col(frame, 'group_id', 'class_id', values_dict)
    # print(FRAMES['combined annotations'])

test_add_col()

In [6]:
def test_read_jpg():
    ''' test read_jpg() '''
    file_name = FILES['dataset']['jpg'][0]
    dp.read_jpg(file_name)

test_read_jpg()

In [7]:
#######################################################################################
# Part 2 - Data visualization

def test_plot_distribution():
    ''' Test plot_distribution() ''' 
    vis.plot_distribution(FRAMES['combined annotations'], "class_id", 
                          ("Frequency", "Bird Species", "Bird Species Distribution"), PLOTS_PATH, filt=100)
    vis.plot_distribution(FRAMES['combined annotations'], "group_id", 
                          ("Frequency", "Bird Group", "Bird Group Distribution"), PLOTS_PATH)
    vis.plot_distribution(FRAMES['combined annotations'].loc[FRAMES['combined annotations']['group_id'] == 'BRPE'], "class_id", 
                          ("Frequency", "Bird Species", "BRPE Bird Species Distribution"), PLOTS_PATH)
    vis.plot_distribution(FRAMES['combined annotations'].loc[FRAMES['combined annotations']['group_id'] == 'LGHT'], "class_id", 
                          ("Frequency", "Bird Species", "LGHT Bird Species Distribution"), PLOTS_PATH)            

# test_plot_distribution()

In [8]:
def test_plot_boxes():
    ''' Test plot_boxes() ''' 
    vis.plot_boxes(FILES['dataset']['jpg'][10], FILES['dataset']['csv'][10], 'Annonations', PLOTS_PATH)

# test_plot_boxes()

In [9]:
#######################################################################################
# Part 3 - Dataloader

# Split the dataset into trainset, testset, and valset
def test_split_img_annos():
    ''' Test split_img_annos() '''
    FILES['trainset'], FILES['testset'], FILES['valset'] = dp.split_img_annos(
        FILES['dataset']['jpg'], FILES['dataset']['csv'], (0.8, 0.1, 0.1), seed=2023)

test_split_img_annos()

In [10]:
# cropping
# Make a dataloader

import torch
import torchvision
from torchvision.transforms import functional as F
from utils.data_loader import BirdDataset, collate_fn

In [11]:
# Train a model
# https://pytorch.org/vision/main/models/generated/torchvision.models.detection.fasterrcnn_resnet50_fpn.html#torchvision.models.detection.fasterrcnn_resnet50_fpn
model = torchvision.models.detection.fasterrcnn_resnet50_fpn(weights='DEFAULT')
dataset = BirdDataset(FILES['dataset'], F.to_tensor)
data_loader = torch.utils.data.DataLoader(
    dataset, batch_size=2, shuffle=True, num_workers=4,
    collate_fn=collate_fn # collate_fn is important otherwise it raises an error
) 

In [13]:
# For Training
images, targets = next(iter(data_loader))
images = list(image for image in images)
targets = [{k: v for k, v in t.items()} for t in targets]

In [14]:
output = model(images, targets)   # Returns losses and detections
# For inference
model.eval()
x = [torch.rand(3, 300, 400), torch.rand(3, 500, 400)]
predictions = model(x)  

In [15]:
predictions

[{'boxes': tensor([], size=(0, 4), grad_fn=<StackBackward0>),
  'labels': tensor([], dtype=torch.int64),
  'scores': tensor([], grad_fn=<IndexBackward0>)},
 {'boxes': tensor([], size=(0, 4), grad_fn=<StackBackward0>),
  'labels': tensor([], dtype=torch.int64),
  'scores': tensor([], grad_fn=<IndexBackward0>)}]

In [20]:
# use our dataset and defined transformations
trainset = BirdDataset(FILES['trainset'], F.to_tensor)
train_dataloader = torch.utils.data.DataLoader(
    trainset, batch_size=2, shuffle=True, num_workers=4,
    collate_fn=collate_fn # collate_fn is important otherwise it raises an error
) 

testset = BirdDataset(FILES['testset'], F.to_tensor)
test_dataloader = torch.utils.data.DataLoader(
    testset, batch_size=1, shuffle=False, num_workers=4,
    collate_fn=collate_fn # collate_fn is important otherwise it raises an error
) 

In [22]:
device = torch.device('cuda') if torch.cuda.is_available() else torch.device('cpu')

# our dataset has two classes only - background and person
num_classes = 2

model = torchvision.models.detection.fasterrcnn_resnet50_fpn(weights='DEFAULT')
# move model to the right device
model.to(device)

# construct an optimizer
params = [p for p in model.parameters() if p.requires_grad]
optimizer = torch.optim.SGD(params, lr=0.005,
                            momentum=0.9, weight_decay=0.0005)

# and a learning rate scheduler which decreases the learning rate by
# 10x every 3 epochs
lr_scheduler = torch.optim.lr_scheduler.StepLR(optimizer,
                                               step_size=3,
                                               gamma=0.1)

In [None]:
def train_one_epoch(model, optimizer, data_loader, device, epoch, print_freq):
    model.train()
    metric_logger = utils.MetricLogger(delimiter="  ")
    metric_logger.add_meter('lr', utils.SmoothedValue(window_size=1, fmt='{value:.6f}'))
    header = 'Epoch: [{}]'.format(epoch)

    lr_scheduler = None
    if epoch == 0:
        warmup_factor = 1. / 1000
        warmup_iters = min(1000, len(data_loader) - 1)

        lr_scheduler = utils.warmup_lr_scheduler(optimizer, warmup_iters, warmup_factor)

    for images, targets in metric_logger.log_every(data_loader, print_freq, header):
        images = list(image.to(device) for image in images)
        targets = [{k: v.to(device) for k, v in t.items()} for t in targets]

        loss_dict = model(images, targets)

        losses = sum(loss for loss in loss_dict.values())

        # reduce losses over all GPUs for logging purposes
        loss_dict_reduced = utils.reduce_dict(loss_dict)
        losses_reduced = sum(loss for loss in loss_dict_reduced.values())

        loss_value = losses_reduced.item()

        if not math.isfinite(loss_value):
            print("Loss is {}, stopping training".format(loss_value))
            print(loss_dict_reduced)
            sys.exit(1)

        optimizer.zero_grad()
        losses.backward()
        optimizer.step()

        if lr_scheduler is not None:
            lr_scheduler.step()

        metric_logger.update(loss=losses_reduced, **loss_dict_reduced)
        metric_logger.update(lr=optimizer.param_groups[0]["lr"])

    return metric_logger

In [None]:
# let's train it for 10 epochs
from torch.optim.lr_scheduler import StepLR
num_epochs = 10

for epoch in range(num_epochs):
    # train for one epoch, printing every 10 iterations
    train_one_epoch(model, optimizer, train_dataloader, device, epoch, print_freq=10)
    # update the learning rate
    lr_scheduler.step()
    # evaluate on the test dataset
    evaluate(model, test_dataloader, device=device)