In [1]:
from PyTorchEstimator import PyTorchEstimator
from azureml.core.workspace import Workspace
from pyspark.context import SparkContext
from pyspark.sql.session import SparkSession
sc = SparkContext('local')
spark = SparkSession(sc)

In [None]:
# Get the CIFAR10 dataset as Python dictionary
import os, tarfile, pickle
import urllib.request
cdnURL = "https://amldockerdatasets.azureedge.net"
# Please note that this is a copy of the CIFAR10 dataset originally found here:
# http://www.cs.toronto.edu/~kriz/cifar-10-python.tar.gz
dataFile = "cifar-10-python.tar.gz"
dataURL = cdnURL + "/CIFAR10/" + dataFile
if not os.path.isfile(dataFile):
    urllib.request.urlretrieve(dataURL, dataFile)
with tarfile.open(dataFile, "r:gz") as f:
    test_dict = pickle.load(f.extractfile("cifar-10-batches-py/test_batch"),
                            encoding="latin1")

In [None]:
# Create the images with labels from CIFAR dataset,
# reformat the labels using OneHotEncoder
import array
from pyspark.sql.functions import udf
from pyspark.ml.linalg import Vectors, VectorUDT
from pyspark.ml.feature import OneHotEncoderEstimator
from pyspark.sql.functions import col
from pyspark.sql.types import *

def reshape_image(record):
    image, label, filename = record
    data = [float(x) for x in image.reshape(3,32,32).flatten()]
    return data, label, filename

convert_to_double = udf(lambda x: x, ArrayType(DoubleType()))

image_rdd = zip(test_dict["data"], test_dict["labels"], test_dict["filenames"])
image_rdd = spark.sparkContext.parallelize(image_rdd).map(reshape_image)

imagesWithLabels = image_rdd.toDF(["images", "labels", "filename"])

list_to_vector_udf = udf(lambda l: Vectors.dense(l), VectorUDT())

imagesWithLabels = imagesWithLabels.withColumn(
                       "images",
                       list_to_vector_udf(convert_to_double(col("images")))) \
                       .select("images", "labels")

ohe = OneHotEncoderEstimator() \
        .setInputCols(["labels"]).setOutputCols(["tmplabels"]) \
        .setDropLast(False)
imagesWithLabels = ohe.fit(imagesWithLabels) \
                      .transform(imagesWithLabels) \
                      .select("images", "tmplabels") \
                      .withColumnRenamed("tmplabels", "labels")

imagesWithLabels.printSchema()

imagesWithLabels.cache()
print(imagesWithLabels.count())

In [None]:
from pyspark.sql.functions import udf
from pyspark.sql.types import StringType

def array_to_string(my_list):
    return '[' + ','.join([str(elem) for elem in my_list]) + ']'

array_to_string_udf = udf(array_to_string,StringType())

df = imagesWithLabels.withColumn('images',array_to_string_udf(imagesWithLabels.images))
df = df.withColumn('labels',array_to_string_udf(df.labels))
df.show(5)
df.printSchema()

In [None]:
imagesWithLabels.write.mode('overwrite').parquet('file:///tmp/data/data.parquet')
df.write.mode('overwrite').parquet('file:///tmp/data/str_data.parquet')

In [None]:
df.printSchema()
df.show(3)

In [2]:
from petastorm.pytorch import DataLoader
from petastorm import make_batch_reader, TransformSpec
from petastorm.codecs import ScalarCodec, CompressedImageCodec, NdarrayCodec
from petastorm.etl.dataset_metadata import materialize_dataset
from petastorm.unischema import dict_to_spark_row, Unischema, UnischemaField
import numpy as np

In [1]:
dataset_url = 'file:///tmp/data/str_data.parquet'
reader = make_batch_reader(dataset_url)
#with DataLoader(reader) as train_loader:
 #   pass
    #sample = next(iter(train_loader))
    #print(sample)

NameError: name 'make_batch_reader' is not defined

In [None]:
import pandas as pd
pq_data = pd.read_parquet('data/str_data.parquet', engine='pyarrow')
pq_data.loc[0, 'labels']

df_try = spark.read.parquet('data/data.parquet')
pd_try = df_try.toPandas()
type(pd_try.loc[0, 'images'])

In [None]:
print("done.")