In [None]:
%load_ext autoreload
%autoreload 2

In [None]:
# specify substep parameters for interactive run
# this cell will be replaced during job run with the parameters from json within params subfolder
substep_params={
}

In [None]:
# load pipeline and step parameters - do not edit
from sinara.substep import get_pipeline_params, get_step_params
pipeline_params = get_pipeline_params(pprint=True)
step_params = get_step_params(pprint=True)

In [None]:
# define substep interface
from sinara.substep import NotebookSubstep, ENV_NAME, PIPELINE_NAME, ZONE_NAME, STEP_NAME, RUN_ID, ENTITY_NAME, ENTITY_PATH, SUBSTEP_NAME

substep = NotebookSubstep(pipeline_params, step_params, substep_params)

substep.interface(
    inputs =
    [
        {STEP_NAME: "data_load", ENTITY_NAME: "cifar10_datasets"}, # images from data_load step
        {STEP_NAME: "data_load", ENTITY_NAME: "meta_cifar10_datasets"}, # meta information of dataset from data_load step
    ],
    tmp_entities =
    [    
        { ENTITY_NAME: "cifar10_datasets"}, # extracted temporary images from Sinara Archive
        { ENTITY_NAME: "meta_cifar10_datasets"}, # extracted temporary meta information of dataset from data_load step
        { ENTITY_NAME: "cifar10_train_dataset"}, # temporary cifar10 dataset for classificator train
        { ENTITY_NAME: "cifar10_val_dataset"}, # temporary cifar10 dataset for classificator eval
        { ENTITY_NAME: "cifar10_test_dataset"}, # temporary cifar10 dataset for classificator test
    ],
    outputs = 
    [
        { ENTITY_NAME: "cifar10_train_dataset"}, # cifar10 archived for classificator train
        { ENTITY_NAME: "cifar10_val_dataset"}, # cifar10 archived  for classificator eval
        { ENTITY_NAME: "cifar10_test_dataset"}, # cifar10 archived  for classificator test
    ]
)

substep.print_interface_info()

substep.exit_in_visualize_mode()

In [None]:
# specify all notebook wide libraries imports here
# Sinara lib imports is left in the place of their usage
from sklearn.model_selection import train_test_split
import numpy as np
import os.path as osp
import os
import cv2
import matplotlib.pyplot as plt
import plotly.express as px
import json
import shutil
from tqdm import tqdm

In [None]:
# run spark
from sinara.spark import SinaraSpark
from sinara.archive import SinaraArchive

spark = SinaraSpark.run_session(0)
archive = SinaraArchive(spark)
SinaraSpark.ui_url()

### Loading cifar10_datasets_images (from the previous step data_load)

In [None]:
inputs = substep.inputs(step_name = "data_load")
tmp_entities = substep.tmp_entities()

# copy data from previos step to tmp_entities
archive.unpack_files_from_store_to_tmp(store_path=inputs.cifar10_datasets, tmp_dir=tmp_entities.cifar10_datasets)
archive.unpack_files_from_store_to_tmp(store_path=inputs.meta_cifar10_datasets, tmp_dir=tmp_entities.meta_cifar10_datasets)

## Get image pathes for cifar10 dataset

In [None]:
dir_train_cifar10_dataset = osp.join(tmp_entities.cifar10_datasets, "train")
dir_test_cifar10_dataset = osp.join(tmp_entities.cifar10_datasets, "test")

class_ids = os.listdir(dir_train_cifar10_dataset)
train_cifar10_dataset = []
test_cifar10_dataset = []
for class_id in class_ids:
    # Get images from train dataset
    for img_name in os.listdir(osp.join(dir_train_cifar10_dataset, class_id)):        
        img_path = osp.join(dir_train_cifar10_dataset, class_id, img_name)
        if osp.isdir(img_path):
            continue
        train_cifar10_dataset.append(img_path)
        
    # Get images from test dataset
    for img_name in os.listdir(osp.join(dir_test_cifar10_dataset, class_id)):        
        img_path = osp.join(dir_test_cifar10_dataset, class_id, img_name)
        if osp.isdir(img_path):
            continue
        test_cifar10_dataset.append(img_path)

### Split Cifar10 Dataset to Train, Valid and Test

In [None]:
# split to train, valid and test parts
train_cifar10_images, val_cifar10_images = train_test_split(train_cifar10_dataset, test_size=0.33, random_state=42)

### Get meta information cifar10 dataset

In [None]:
with open(osp.join(tmp_entities.meta_cifar10_datasets, 'meta_cifar10_datasets.json'), 'r') as f:
   label_names = json.load(f)

## Review Cifar10 Datasets

In [None]:
# Let's view more images in a grid format
# Define the dimensions of the plot grid 
W_grid = 5
H_grid = 5

# fig, axes = plt.subplots(L_grid, W_grid)
# subplot return the figure object and axes object
# we can use the axes object to plot specific figures at various locations
fig, axes = plt.subplots(H_grid, W_grid, figsize = (10,10))

axes = axes.ravel() # flaten the 15 x 15 matrix into 225 array

n_train = len(train_cifar10_images) # get the length of the train dataset

# Select a random number from 0 to n_train
for i in range(W_grid * H_grid): # create evenly spaces variables 
    # Select a random number
    image_index = np.random.randint(0, n_train)
    # read and display an image with the selected index
    img_path = train_cifar10_images[image_index]
    label_index = osp.basename(osp.dirname(img_path))
    img = cv2.imread(img_path)
    img = cv2.cvtColor(img, cv2.COLOR_BGR2RGB)
    axes[i].imshow(img)
    axes[i].set_title(label_names.get(label_index), fontsize = 8)
    axes[i].axis('off')

plt.subplots_adjust(hspace=0.4)

### Overview of the distribution of labeled data from train, valid and test dataset

In [None]:
def plot_barh_labels(cifar10_images: list, title_name: str = "", num_fig = 0):
    cifar10_labels = [osp.basename(osp.dirname(fpath)) for fpath in cifar10_images]
    classes, counts = np.unique(cifar10_labels, return_counts=True)
    fig = plt.figure()
    plt.barh([label_names.get(class_id) for class_id in classes], counts)
    plt.title(title_name)
    plt.show()

# distribution of labeled data from train dataset
plot_barh_labels(train_cifar10_images, 'Class distribution in training dataset', 0)

# distribution of labeled data from valid dataset
plot_barh_labels(val_cifar10_images, 'Class distribution in validation dataset', 1)

# # distribution of labeled data from test dataset
plot_barh_labels(test_cifar10_dataset, 'Class distribution in testing dataset')

### Save temporarily train, validation and test cifar10 datasets

In [None]:
# Save images for train, validation and test cifar10 datasets to tmp_entities
def prepare_cifar10_dataset_images(cifar10_data, dest_img_folder: str):
    pack = []
    for source_img_path in tqdm(cifar10_data):
        label_img = osp.basename(osp.dirname(source_img_path))
        dest_img_path = osp.join(dest_img_folder, label_img, osp.basename(source_img_path))
        os.makedirs(osp.dirname(dest_img_path), exist_ok=True)
        shutil.copyfile(source_img_path, dest_img_path)

prepare_cifar10_dataset_images(train_cifar10_images, dest_img_folder=tmp_entities.cifar10_train_dataset )
prepare_cifar10_dataset_images(val_cifar10_images, dest_img_folder=tmp_entities.cifar10_val_dataset)
prepare_cifar10_dataset_images(test_cifar10_dataset, dest_img_folder=tmp_entities.cifar10_test_dataset)

### Archiving train, validation and test cifar10 datasets to Sinara Storage

In [None]:
# save tmp_entities (cifar10_train_dataset, cifar10_val_dataset, cifar10_test_dataset) to outputs of step data_prep
outputs = substep.outputs()

archive.pack_files_from_tmp_to_store(tmp_dir=tmp_entities.cifar10_train_dataset, store_path=outputs.cifar10_train_dataset)
archive.pack_files_from_tmp_to_store(tmp_dir=tmp_entities.cifar10_val_dataset, store_path=outputs.cifar10_val_dataset)
archive.pack_files_from_tmp_to_store(tmp_dir=tmp_entities.cifar10_test_dataset, store_path=outputs.cifar10_test_dataset)

In [None]:
# stop spark
SinaraSpark.stop_session()