In [None]:
%load_ext autoreload
%autoreload 2

In [None]:
# specify substep parameters for interactive run
# this cell will be replaced during job run with the parameters from json within params subfolder
substep_params={
    "cifar10_datasets_url": "https://www.cs.toronto.edu/~kriz/cifar-10-python.tar.gz"  
}

In [None]:
# load pipeline and step parameters - do not edit
from sinara.substep import get_pipeline_params, get_step_params
pipeline_params = get_pipeline_params(pprint=True)
step_params = get_step_params(pprint=True)

In [None]:
# specify all notebook wide libraries imports here
# Sinara lib imports is left in the place of their usage
import os
import os.path as osp
import json

In [None]:
# define substep interface
from sinara.substep import NotebookSubstep, ENV_NAME, PIPELINE_NAME, ZONE_NAME, STEP_NAME, RUN_ID, ENTITY_NAME, ENTITY_PATH, SUBSTEP_NAME

substep = NotebookSubstep(pipeline_params, step_params, substep_params)

substep.interface(
    tmp_entities = 
    [
        { ENTITY_NAME: "downloaded_archives"}, # temporarily dowloaded zip archives
        { ENTITY_NAME: "raw_cifar10_datasets"}, # extracted temporary raw files from downloaded_archives zip
        { ENTITY_NAME: "cifar10_datasets"}, # extracted temporary images from raw_cifar10_datasets
        { ENTITY_NAME: "meta_cifar10_datasets"} # extracted meta info of cifar10 dataset
    ],

    outputs =
    [
        { ENTITY_NAME: "cifar10_datasets"}, # images stored for using in next steps
        { ENTITY_NAME: "meta_cifar10_datasets"} # meta info stored for using in next steps
    ]
)

substep.print_interface_info()

substep.exit_in_visualize_mode()

In [None]:
# run spark
from sinara.spark import SinaraSpark
from sinara.archive import SinaraArchive

spark = SinaraSpark.run_session(0)
archive = SinaraArchive(spark)
SinaraSpark.ui_url()

### Loading and unpacking cifar_datasets tar archive 

In [None]:
tmp_entities = substep.tmp_entities()
cifar10_datasets_url = substep_params["cifar10_datasets_url"]

In [None]:
# Download cifar_datasets
!wget {cifar10_datasets_url} -O {osp.join(tmp_entities.downloaded_archives, osp.basename(cifar10_datasets_url))}

In [None]:
# unzip
!tar -zxf {osp.join(tmp_entities.downloaded_archives, osp.basename(cifar10_datasets_url))} --directory {tmp_entities.raw_cifar10_datasets}

### Extract raw cifar10 dataset to images

In [None]:
# The cifar10 dataset is divided into 5 batches of 50,000 images and one test batch with 10,000 images
from utils.extract_cifar10 import load_cifar_pickle, unpickle, save_cifar_image

count_batch_cifar10 = 5 
filepath_meta_cifar10 = osp.join(tmp_entities.raw_cifar10_datasets, "cifar-10-batches-py/batches.meta")
filepath_test_batch_cifar10 = osp.join(tmp_entities.raw_cifar10_datasets, "cifar-10-batches-py/test_batch")

cifar10_meta = unpickle(filepath_meta_cifar10)
label_names = cifar10_meta["label_names"]
label_names = {label_id: label_name for label_id, label_name in enumerate(label_names)}

dir_train_cifar10_dataset = osp.join(tmp_entities.cifar10_datasets, "train")
dir_test_cifar10_dataset = osp.join(tmp_entities.cifar10_datasets, "test")    

# create directory for train and test dataset
os.makedirs(dir_train_cifar10_dataset, exist_ok=True)
os.makedirs(dir_test_cifar10_dataset, exist_ok=True)
for label_id in label_names.keys():
    os.makedirs(osp.join(dir_train_cifar10_dataset, str(label_id)), exist_ok=True)
    os.makedirs(osp.join(dir_test_cifar10_dataset, str(label_id)), exist_ok=True)

# export batch to train image
image_id = 0
for batch_id in range(1, count_batch_cifar10+1):
    filename_train_batch_cifar10 = f"cifar-10-batches-py/data_batch_{batch_id}"
    filepath_train_batch_cifar10 = osp.join(tmp_entities.raw_cifar10_datasets, filename_train_batch_cifar10)
    train_images, train_labels = load_cifar_pickle(filepath_train_batch_cifar10)
    for i, label_id in enumerate(train_labels):
        out_dir = osp.join(dir_train_cifar10_dataset, str(label_id))
        save_cifar_image(train_images[i], os.path.join(out_dir, f"image_{image_id}.png")) 
        image_id += 1
        
# export batch to test images
test_images, test_labels = load_cifar_pickle(filepath_test_batch_cifar10)
for i, label_id in enumerate(test_labels):
    out_dir = osp.join(dir_test_cifar10_dataset, str(label_id))
    save_cifar_image(test_images[i], os.path.join(out_dir, f"image_{i}.png")) 
    
# save meta info for cifar10 datasets
with open(osp.join(tmp_entities.meta_cifar10_datasets, 'meta_cifar10_datasets.json'), 'w') as f:
    json.dump(label_names, f)

### Archiving cifar10_datasets and meta_cifar10_datasets for next step

In [61]:
# Save tmp_entities.cifar10_datasets and tmp_entities.meta_cifar10_datasets to outputs of step data_load
outputs = substep.outputs()

archive.pack_files_from_tmp_to_store(tmp_dir=tmp_entities.cifar10_datasets, store_path=outputs.cifar10_datasets)
archive.pack_files_from_tmp_to_store(tmp_dir=tmp_entities.meta_cifar10_datasets, store_path=outputs.meta_cifar10_datasets)

                                                                                

In [62]:
# Stop spark
SinaraSpark.stop_session()