In [None]:
%load_ext autoreload
%autoreload 2

In [None]:
# specify substep parameters for interactive run
# this cell will be replaced during job run with the parameters from json within params subfolder
substep_params={}

In [None]:
# load pipeline and step parameters - do not edit
from sinara.substep import get_pipeline_params, get_step_params
pipeline_params = get_pipeline_params(pprint=True)
step_params = get_step_params(pprint=True)

In [None]:
# define substep interface
from sinara.substep import NotebookSubstep, ENV_NAME, PIPELINE_NAME, ZONE_NAME, STEP_NAME, RUN_ID, ENTITY_NAME, ENTITY_PATH, SUBSTEP_NAME

substep = NotebookSubstep(pipeline_params, step_params, substep_params)

substep.interface(
    inputs =
    [ 
      { STEP_NAME: "data_prep", ENTITY_NAME: "train_dataset"}, # train dataset from data_prep step
      { STEP_NAME: "data_prep", ENTITY_NAME: "val_dataset"}, # val dataset from data_prep step
    ],
    tmp_entities = 
    [       
       { ENTITY_NAME: "train_dataset" }, # temporary datasets for train on next substep
       { ENTITY_NAME: "val_dataset" }, # temporary datasets for val on next substep
       { ENTITY_NAME: "classifier_train_work_dir"}, # temporary working dir for next substep
       { ENTITY_NAME: "classifier_inference_files"} # temporarily stored classifier files 
    ],
    outputs = 
    [
        { ENTITY_NAME: "classifier_inference_files"} # stored classifier files
    ]
)

substep.print_interface_info()

substep.exit_in_visualize_mode()

In [None]:
# specify all notebook wide libraries imports here
# Sinara lib imports is left in the place of their usage
import os.path as osp
import os
from pathlib import Path
import glob
import json
import shutil

import numpy as np
import torch
import torchvision
import torchvision.transforms as transforms
import torch.nn as nn
import torch.optim as optim

from utils.models import ResNet18
from utils.train_eval import init_params
from utils.train_eval import train, test

print(f"{torch.__version__=}")
print(f"{torchvision.__version__=}")

In [None]:
# Checking the version of libraries and checking the availability of the cuda kernel
assert torch.cuda.is_available(), f"Cuda not available"

device_id = torch.cuda.current_device()
device_name = torch.cuda.get_device_name(device_id)
print(f"{device_name=}")
print(f"{torch.cuda.device_count()=}")

In [None]:
# run spark
from sinara.spark import SinaraSpark
from sinara.archive import SinaraArchive

spark = SinaraSpark.run_session(0)
archive = SinaraArchive(spark)
SinaraSpark.ui_url()

### Loading cifar10 train and val datasets of images (from the previous step data_load)

In [None]:
data_prep_inputs = substep.inputs(step_name = "data_prep")
tmp_entities = substep.tmp_entities()

archive.unpack_files_from_store_to_tmp(store_path=data_prep_inputs.train_dataset, tmp_entity_dir=tmp_entities.train_dataset)
archive.unpack_files_from_store_to_tmp(store_path=data_prep_inputs.val_dataset, tmp_entity_dir=tmp_entities.val_dataset)

## Setting up the training and valuate model

### Defining basic variables for train and valuate

In [None]:
train_params = step_params["train_params"]
val_params = step_params["val_params"]

MAX_IMAGE_SIZE = train_params['MAX_IMAGE_SIZE']
MEAN_NORMALIZE = train_params['NORMALIZE']["mean"]
STD_NORMALIZE  = train_params['NORMALIZE']["std"]

EPOCH_COUNT       = train_params['EPOCH_COUNT']
BATCH_TRAIN       = train_params['BATCH']
WORKERS_TRAIN     = train_params['WORKERS']
BATCH_VAL         = val_params['BATCH']
WORKERS_VAL       = val_params['WORKERS']

OPTIMIZER_LR           = train_params['OPTIMIZER_SGD']["learning_rate"]
OPTIMIZER_WEIGHT_DECAY = train_params['OPTIMIZER_SGD']["weight_decay"]
OPTIMIZER_MOMENTUM     = train_params['OPTIMIZER_SGD']["momentum"]

CHECKPOINT_INTERVAL = 5
DEVICE = "cuda"

### Setting trasform augmentation

In [None]:
torch_transform_train = transforms.Compose([
    transforms.Resize(size = (MAX_IMAGE_SIZE, MAX_IMAGE_SIZE)),
    transforms.RandomAdjustSharpness(sharpness_factor = 2),
    transforms.RandomRotation(degrees = 45),
    transforms.RandomVerticalFlip(),
    transforms.RandomHorizontalFlip(),
    transforms.ToTensor(),
    transforms.Normalize(mean=torch.tensor(MEAN_NORMALIZE)/255, std=torch.tensor(STD_NORMALIZE)/255),
])

torch_transform_val = transforms.Compose([
    transforms.Resize(size = (MAX_IMAGE_SIZE, MAX_IMAGE_SIZE)),
    transforms.ToTensor(),
    transforms.Normalize(mean=torch.tensor(MEAN_NORMALIZE)/255, std=torch.tensor(STD_NORMALIZE)/255),
])

### Setting pytorch dataloader

In [None]:
torch_train_dataset = torchvision.datasets.ImageFolder(root=tmp_entities.train_dataset, transform=torch_transform_train)
torch_val_dataset = torchvision.datasets.ImageFolder(root=tmp_entities.val_dataset, transform=torch_transform_val)

train_loader = torch.utils.data.DataLoader(
    torch_train_dataset, batch_size=BATCH_TRAIN, shuffle=True, num_workers=WORKERS_TRAIN)

val_loader = torch.utils.data.DataLoader(
    torch_val_dataset, batch_size=BATCH_VAL, shuffle=False, num_workers=WORKERS_VAL)

class_names = train_loader.dataset.class_to_idx

### Setting model

In [None]:
net_classifier = ResNet18(num_classes = len(class_names))
net_classifier = net_classifier.to(DEVICE)

### Setting loss function and optimizer and Initializing classifier training

In [None]:
criterion = nn.CrossEntropyLoss()
optimizer = optim.SGD(net_classifier.parameters(), lr=OPTIMIZER_LR,
                      momentum=0.9, weight_decay=OPTIMIZER_WEIGHT_DECAY)
scheduler = torch.optim.lr_scheduler.CosineAnnealingLR(optimizer, T_max=200)
init_params(net_classifier)

## Start classifier training 

In [None]:
best_acc = 0.0
for epoch in range(EPOCH_COUNT):
    
    train(net=net_classifier, 
          criterion=criterion, 
          optimizer=optimizer, 
          loader=train_loader, 
          epoch=epoch,
          device=DEVICE)
    
    val_acc = test(net=net_classifier, 
                    criterion=criterion, 
                    optimizer=optimizer, 
                    loader=val_loader, 
                    epoch=epoch,
                    device=DEVICE)
    
    scheduler.step()

    # Save checkpoint.
    if val_acc > best_acc:
        print(f"Saving best weights with acc={round(val_acc, 2)}")
        torch.save(net_classifier, osp.join(tmp_entities.classifier_train_work_dir, "best_ckpt.pth"))
        best_acc = val_acc

    if (epoch % (CHECKPOINT_INTERVAL-1) == 0) and (epoch > 0):
        print(f"Saving weights for epoch {epoch}")
        torch.save(net_classifier, osp.join(tmp_entities.classifier_train_work_dir, f"ckpt_{epoch}.pth"))
        if osp.exists(osp.join(tmp_entities.classifier_train_work_dir, f"latest_checkpoint.pth")):
            os.remove(osp.join(tmp_entities.classifier_train_work_dir, f"latest_checkpoint.pth"))
        os.symlink(osp.join(tmp_entities.classifier_train_work_dir, f"ckpt_{epoch}.pth"),
                   osp.join(tmp_entities.classifier_train_work_dir, f"latest_checkpoint.pth"))

### Collecting obj_detect_inference_files

#### Collecting test image from a validation dataset

In [None]:
image_index = np.random.randint(0, len(torch_val_dataset))

src_test_image_file_name = torch_val_dataset.imgs[image_index][0] 
if not osp.exists(src_test_image_file_name):
    raise FileNotFoundError(f"{src_test_image_file_name} was not found")

test_image_file_extension = Path(src_test_image_file_name).suffix
dst_test_image_file_name = osp.join(tmp_entities.classifier_inference_files, f"test{test_image_file_extension}")

shutil.copy(src_test_image_file_name, dst_test_image_file_name)

#### Collecting train results
(weights, config, test image) for subsequent transfer to other components

Since during the training process intermediate weights of the neural network can be created (for example, for epochs 10, 20, 30, etc.)
then it doesn't make much sense to copy all the intermediate files to another step in the pipeline.
Therefore, we will copy the weights and the necessary configs into a separate directory and we will copy these files to outputs

In [None]:
# copy files - last and best model weights and config model to finished dir
best_chekpoint = osp.join(tmp_entities.classifier_train_work_dir, 'best_ckpt.pth')
shutil.copy(best_chekpoint, 
            osp.join(tmp_entities.classifier_inference_files, osp.basename(best_chekpoint))
           )

last_checkpoint = osp.join(tmp_entities.classifier_train_work_dir, 'latest_checkpoint.pth')
out_last_checkpoint = osp.join(tmp_entities.classifier_inference_files, "latest_checkpoint.pth")
shutil.copy(last_checkpoint, out_last_checkpoint)

with open(osp.join(tmp_entities.classifier_inference_files, 'categories.json'), 'w', encoding='utf-8') as f:
    json.dump(class_names, f, ensure_ascii=False, indent=4)

### Save collected obj_detect_inference_files

In [None]:
outputs = substep.outputs()
archive.pack_files_from_tmp_to_store(tmp_entity_dir=tmp_entities.classifier_inference_files, store_path=outputs.classifier_inference_files)

In [None]:
# stop spark
SinaraSpark.stop_session()