# Wicker Hello World

In this tutorial we demonstrate how to write and read a dataset given only a S3 bucket as infrastructure

### Environment Setup
Install wicker with the spark plugin into your a new virtual environment.
For this tutorial you should also have numpy and pillow installed

In [None]:
#!pip install wicker[spark], numpy, pillow

Download and untar the cifar10 dataset

In [None]:
# curl and untar the Cifar dataset
# curl https://s3.amazonaws.com/fast-ai-imageclas/cifar10.tgz --output cifar10.tgz
# tar -xvf cifar10.tgz

### Writing the Dataset

This simple dataset consists of 10 classes and 60k images partitioned into 50k train and 10k test. 
Each image is 32,32,3 so we can define the dataset schema with a string field for the label and a numpy field for the image. Note that we would also have defined an object field or ... for the image field

In [None]:
from wicker import schema

DATASET_SCHEMA = schema.DatasetSchema(
    fields=[
        schema.StringField("label", description="ground truth label of our image"),
        schema.NumpyField("image", shape=(-1, -1, 3), dtype="uint8", description="image as a numpy array"),
    ],
    primary_keys=["label"],
)


Additionally we supply a name and version for the dataset

In [None]:
DATASET_NAME = "cifar10"
DATASET_VERSION = "0.0.2"

Next let's organize the data so we can more easily persist it in our dataset. 
This dataset is relatively small (<200 Mb) so we can just read everything into local memory

In [None]:
import os
import numpy as np
from PIL import Image
path = os.walk('/code/cifar10')
dataset = []

for root, directories, files in path:
    for file in files:
        root_split = root.split('/')
        partition = root_split[-2]
        label = root_split[-1]
        im = np.asarray(Image.open(root+'/'+file))
        dataset.append({'partition_name': partition, 'raw_data':{'label': label, 'image': im}})

In this example we'll use the spark plugin and a local spark cluster to persist and shuffle our data.
To use the spark API you will just need to supply an RDD containing:

1. The dataset partition
2. A dictionary of the data to be persisted

i.e. pyspark.rdd.RDD[Tuple[str, Dict[str, Any]]]

Let's look at the example below using a local spark cluster:

In [None]:
from wicker.plugins.spark import persist_wicker_dataset
from pyspark.sql import SparkSession
import copy

In [None]:
# create a local spark session
spark_session = SparkSession.builder.appName("test").master("local[*]")
spark = spark_session.getOrCreate()
sc = spark.sparkContext

In [None]:
# create RDD containing partition name and raw data
# note that if you are running a local spark cluster you will 
# likely have to play with the number of partitions so your stages don't become too big.
# here we use 256
rdd = sc.parallelize(copy.deepcopy(dataset), numSlices=256)
data_rdd = rdd.map(lambda data_dict: data_dict["raw_data"])
partition_name_rdd = rdd.map(lambda data_dict: data_dict["partition_name"])
partition_rdd = partition_name_rdd.zip(data_rdd)
partition_rdd.cache()

In [None]:
persist_wicker_dataset(
    DATASET_NAME,
    DATASET_VERSION,
    DATASET_SCHEMA,
    partition_rdd,
)

That's it! Our data has been shuffled and is now in our S3 bucket

## Reading the Dataset

Now let's read from the dataset

In [None]:
from wicker.core.datasets import S3Dataset

ds = S3Dataset(DATASET_NAME, DATASET_VERSION, "train")

In [None]:
len(ds)

In [None]:
# first read O(seconds)
%%time
x0 = ds[0]

In [None]:
# access to contiguous indices is fast
%%time
x1 = ds[1]

In [None]:
# in fact entire train dataset was loaded into memory since it was small
# by default chunks of X Mb are loaded at a time
%%time
x2 = ds[-1]