Import Libraries

In [10]:
import os
import shutil
from datetime import datetime
import numpy as np
import cv2
import time
from concurrent.futures import ThreadPoolExecutor
from tqdm.notebook import tqdm
from getpass import getpass
import dropbox
import http.client
from requests.exceptions import ChunkedEncodingError
import urllib3.exceptions
import requests

def copy_directory(source_dir, destination_dir):
    if not os.path.exists(destination_dir):
        os.makedirs(destination_dir)
    for item in os.listdir(source_dir):
        source_item = os.path.join(source_dir, item)
        destination_item = os.path.join(destination_dir, item)
        if os.path.isdir(source_item):
            copy_directory(source_item, destination_item)
        else:
            shutil.copy2(source_item, destination_item)

def base10_to_base36(number):
    if number == 0:
        return '0'
    base36_chars = "0123456789abcdefghijklmnopqrstuvwxyz"
    result = ''
    while number > 0:
        number, remainder = divmod(number, 36)
        result = base36_chars[remainder] + result
    return result

def copy_directory_from_dropbox(source_dir, destination_dir, dbx=None, dbx_access_token=None, use_thread=True):

    if dbx is None:
        if dbx_access_token is None:
            dbx_access_token = getpass.getpass("Enter your DropBox access token: ")
        dbx = dropbox.Dropbox(dbx_access_token)

    if not os.path.exists(destination_dir):
        os.makedirs(destination_dir)

    result = dbx.files_list_folder(source_dir)
    entries = result.entries
    while result.has_more:
        result = dbx.files_list_folder_continue(result.cursor)
        entries.extend(result.entries)
    total_items = len(entries)

    def download_and_save(item, max_retries=5):
        source_item_path = item.path_display
        destination_item_path = os.path.join(destination_dir, os.path.basename(source_item_path))
        for retry in range(max_retries):
            try:
                if isinstance(item, dropbox.files.FolderMetadata):
                    copy_directory_from_dropbox(source_item_path, destination_item_path, dbx=dbx, use_thread=use_thread)
                else:
                    response = requests.get(dbx.files_get_temporary_link(source_item_path).link, timeout=60)
                    content = response.content
                    nparr = np.frombuffer(content, np.uint8)
                    image = cv2.imdecode(nparr, cv2.IMREAD_COLOR)
                    cv2.imwrite(destination_item_path, image)
                break
            except Exception as e:
                if retry < max_retries - 1:
                    time.sleep(5)
                else:
                    print(f"\nUnable to download: {item.path_display}\n\tError: {e}")
                
    if use_thread:
        with ThreadPoolExecutor() as executor:
            list(tqdm(executor.map(download_and_save, entries), total=total_items, desc=f"Downloading {source_dir} :", unit=" image"))
    else:
        for item in tqdm(entries, total=total_items, desc=f"Downloading {source_dir} :", unit=" image"):
            download_and_save(item)

def download_datasets_from_dropbox(dbx=None, dbx_access_token=None, use_thread=False, datasets=None, include_all_datasets=True):

    if dbx is None:
        if dbx_access_token is None:
            dbx_access_token = getpass("Enter your DropBox access token: ")
        dbx = dropbox.Dropbox(dbx_access_token)
    
    dbx_datasets_dir = '/UMARV/ComputerVision/ScenePerception/datasets'

    if datasets is not None:
        dataset_dirs = datasets
            
    else:
        dataset_dirs = []
        for dataset_category in ["real_world", "benchmarks"]:
            # Collect dataset image directories from DropBox
            dataset_category_dir = f"{dbx_datasets_dir}/{dataset_category}"
            result = dbx.files_list_folder(dataset_category_dir)
            for entry in result.entries:
                if isinstance(entry, dropbox.files.FolderMetadata):
                    found_dataset_dir = entry.path_display.lower().replace(dbx_datasets_dir.lower(),"")
                    dataset_dirs.append(found_dataset_dir)
            while result.has_more:
                result = dbx.files_list_folder_continue(result.cursor)
                for entry in result.entries:
                    if isinstance(entry, dropbox.files.FolderMetadata):
                        found_dataset_dir = entry.path_display.lower().replace(dbx_datasets_dir.lower(),"")
                        dataset_dirs.append(found_dataset_dir)

    for dataset_dir in dataset_dirs:
        copy_directory_from_dropbox(
            source_dir = f"{dbx_datasets_dir}/{dataset_dir}",
            destination_dir = f"{os.getenv('ROOT_DIR')}/datasets/{dataset_dir}",
            dbx = dbx,
            dbx_access_token = dbx_access_token,
            use_thread = use_thread
        )

In [1]:
import os
import sys
from getpass import getpass
import torch.optim as optim
!pip install dropbox > /dev/null

Configure Environment

In [2]:
os.environ["ENVIRONMENT"] = "colab"
os.environ["REPO_DIR"] = "/content/UMARV-CV-ScenePerception"
os.environ["ROOT_DIR"] = "/content"
os.environ["MODEL_ID"] = "32pzewvj"
os.environ["MODEL_DIR"] = f"{os.getenv('REPO_DIR')}/models/model_{os.getenv('MODEL_ID')}"

Configure git

In [3]:
# Fill in your GitHub branch
git_branch = "user/PedroBrandao"

In [4]:
while not git_branch:
    git_branch = input("Enter your branch: ")

git_repo_url = "https://github.com/AwrodHaghiTabrizi/UMARV-CV-ScenePerception.git"
!git clone -b $git_branch $git_repo_url
%cd "{os.getenv('REPO_DIR')}"

Cloning into 'UMARV-CV-ScenePerception'...
fatal: Remote branch user/PedroBrandao not found in upstream origin
[Errno 2] No such file or directory: '/content/UMARV-CV-ScenePerception'
/home/brandaop/UMARV/ScenePerception/UMARV-CV-ScenePerception/models/model_32pzewvj/src/notebooks


  bkms = self.shell.db.get('bookmarks', {})


Import Repository Resources

In [17]:
sys.path.insert(0, f"{os.getenv('REPO_DIR')}/src")


sys.path.insert(0, f"{os.getenv('MODEL_DIR')}/src")
from methods import *
from architecture import *
from dataset import *

ModuleNotFoundError: No module named 'methods'

Download Datasets

In [16]:
dbx_access_token = getpass("Enter your DropBox access token: ")

KeyboardInterrupt: Interrupted by user

In [None]:
# Set to True if datasets need to be downloaded to your google drive
download_datasets = False

if download_datasets:
    download_datasets_from_dropbox(
        dbx_access_token = dbx_access_token,
        include_all_datasets = False,
        use_thread = True
    )
    upload_datasets_to_google_drive()

In [None]:
# Set to True to pull datasets from google drive to your colab environment
get_datasets = True

if get_datasets:
    get_datasets_from_google_drive()

Code

In [5]:
num_epochs = 50
batch_size = 20
val_batch_size = 20
val_ratio = 0.2                         # Percent of training set used for validation
lookback = {"count": 0, "stride": 1}    # Prior frames model has access to

In [None]:
device = set_device()
model = initialize_model(
    device = device,
    dbx_access_token = dbx_access_token,
    lookback = lookback,
    reset_weights = False
)

train_dataset, val_dataset = create_datasets(
    device = device,
    include_all_datasets = False,
    include_real_world_datasets = True,
    val_ratio = val_ratio,
    lookback = lookback
)

train_dataloader, val_dataloader = create_dataloaders(
    train_dataset = train_dataset,
    val_dataset = val_dataset,
    batch_size = batch_size,
    val_batch_size = val_batch_size
)

optimizer = optim.SGD(model.parameters(), lr=0.01, momentum=0.9)
criterion = nn.CrossEntropyLoss()

In [None]:
model, train_loss_hist, val_performance_hist, best_val_performance = training_loop(
    model = model,
    criterion = criterion,
    optimizer = optimizer,
    train_dataloader = train_dataloader,
    val_dataloader = val_dataloader,
    dbx_access_token = dbx_access_token,
    num_epochs = num_epochs,
    critiqueing_metric = "Accuracy",
    auto_stop = False
)

In [None]:
graph_loss_history(train_loss_hist)

graph_performance_history(
    performance_hist = val_performance_hist,
    split = "Val",
    metrics = ["Accuracy", "Mean IoU"]
)

show_sample_results(
    model = model,
    dataset = val_dataset,
    device = device,
    num_samples = 2
)

In [None]:
test_model_on_benchmarks(
    model = model,
    device = device,
    all_benchmarks = True,
    num_sample_results = 2,
    lookback = lookback
)

Push Changes (when ready)

In [None]:
# Fill in GitHub user info
git_username = ""
git_email = ""

In [None]:
while not git_username:
    git_username = input("Username left empty.\nGitHub username: ")
while not git_email:
    git_email = input("Email left empty.\nGitHub email: ")

!git config --global user.name $git_username
!git config --global user.email $git_email

git_access_token = getpass("Enter your GitHub access token: ")

git_push_url = f"https://{git_username}:{git_access_token}@{git_repo_url.replace('https://','')}"

commit_message = input("Commit message: ")
while not commit_message:
    commit_message = input("Commit message cannot be empty\nCommit message: ")

!git add .
!git commit -m "{commit_message}"
!git push $git_push_url $git_branch