In [1]:
!pip install pyarrow==0.15.*



In [2]:
!pip install petastorm[opencv,tf_gpu] albumentations





In [6]:
import os
root = '/home/jovyan/work'

import numpy as np
import pyarrow as pa
from petastorm.unischema import dict_to_spark_row, Unischema, UnischemaField
from petastorm.codecs import ScalarCodec, CompressedImageCodec
from petastorm.etl.dataset_metadata import get_schema_from_dataset_url, materialize_dataset

In [34]:
import pyspark
from pyspark.sql import *
from pyspark.sql.types import *

# Based on `core-site.xml` in https://github.com/GoogleCloudPlatform/bigdata-interop/blob/master/gcs/INSTALL.md doc
spark = SparkSession.builder \
    .master("local") \
    .appName("big_earth") \
    .config("spark.driver.extraClassPath", root + "/spark_dependencies/gcs-connector-hadoop2-latest.jar") \
    .config("fs.AbstractFileSystem.gs.impl", "com.google.cloud.hadoop.fs.gcs.GoogleHadoopFS") \
    .config("fs.gs.project.id", "big-earth-252219") \
    .config("google.cloud.auth.service.account.enable", "true") \
    .config("google.cloud.auth.service.account.json.keyfile", root + "/.gcs/big-earth-252219-fb2e5c109f78.json") \
    .getOrCreate()
sc = spark.sparkContext

In [32]:
output_url = 'file://' + root + '/data/hello_world_dataset'

In [59]:
# arrow_image_schema = pa.schema([
#     ('image_name', pa.string()),
#     ('image_bytes', pa.binary())
# ])
# arrow_image_struct = pa.struct([
#     pa.field('image_name', pa.string()),
#     pa.field('image_bytes', pa.binary())
# ])

# spark_image_schema = from_arrow_schema(arrow_image_schema)

fields = [StructField("image_name", StringType()), StructField("image_bytes", BinaryType())]
spark_image_schema = StructType(fields)


def row_generator(x):
    """Returns a single entry in the generated dataset. Return a bunch of random values as an example."""
    arr =  np.random.randint(0, 4000, dtype=np.uint16, size=(120, 120, 3))
#     p = pa.array([{'image_name': x, 'image_bytes': arr.tobytes()}], type=arrow_image_struct)
    return [x, bytearray(arr)]


def generate_hello_world_dataset(spark, output_url):
    rows_count = 10000
    filenames = ["name_{}".format(id) for id in range(rows_count)]
    rowgroup_size_mb = 256

    rows_rdd = sc.parallelize(filenames)\
       .map(row_generator)

    spark.createDataFrame(rows_rdd, spark_image_schema) \
       .coalesce(10) \
       .write \
       .mode('overwrite') \
       .parquet(output_url)
    
    
generate_hello_world_dataset(spark, output_url)

In [60]:
import pyarrow.parquet as pq


parquet_files = [root + "/data/hello_world_dataset/part-00000-a91053c4-cc36-41d2-bc08-578264106695-c000.snappy.parquet"]
parquet_file_row_groups = []
total_num_rows = 0
for parquet_file in parquet_files:
    open_parquet_handle = pq.ParquetFile(parquet_file)
    print(dir(open_parquet_handle))
    for row_group in range(open_parquet_handle.num_row_groups):
        parquet_file_row_groups.append(
            (open_parquet_handle, row_group)
        )
#     total_num_rows += open_parquet_handle.num_rows

['__class__', '__delattr__', '__dict__', '__dir__', '__doc__', '__eq__', '__format__', '__ge__', '__getattribute__', '__gt__', '__hash__', '__init__', '__init_subclass__', '__le__', '__lt__', '__module__', '__ne__', '__new__', '__reduce__', '__reduce_ex__', '__repr__', '__setattr__', '__sizeof__', '__str__', '__subclasshook__', '__weakref__', '_build_nested_paths', '_get_column_indices', '_nested_paths_by_prefix', 'common_metadata', 'metadata', 'num_row_groups', 'read', 'read_row_group', 'read_row_groups', 'reader', 'scan_contents', 'schema']


In [82]:
parquet_file, row_group = parquet_file_row_groups[0]
table = parquet_file.read_row_group(row_group)

def deserialize_to_np(image_bytes):
    return np.frombuffer(image_bytes, dtype=np.uint16).reshape(120, 120, 3)

df = table.to_pandas()
df['image_bytes'] = df['image_bytes'].apply(deserialize_to_np)
df

Unnamed: 0,image_name,image_bytes
0,name_0,"[[[3060, 3203, 2367], [640, 1081, 1414], [1299..."
1,name_1,"[[[2134, 3184, 1565], [3816, 672, 3673], [2322..."
2,name_2,"[[[3190, 747, 3926], [1821, 3602, 892], [28, 1..."
3,name_3,"[[[222, 3103, 1], [2272, 1603, 3605], [2253, 1..."
4,name_4,"[[[3392, 1654, 3643], [1955, 2045, 226], [1921..."
...,...,...
1547,name_1547,"[[[2479, 2462, 905], [1010, 3360, 2925], [714,..."
1548,name_1548,"[[[3867, 884, 959], [380, 918, 3609], [371, 32..."
1549,name_1549,"[[[399, 218, 304], [3253, 3508, 267], [3429, 3..."
1550,name_1550,"[[[66, 2911, 3166], [3341, 1728, 1932], [404, ..."


In [40]:
import tensorflow as tf
from decimal import Decimal


_NUMPY_TO_TF_DTYPES_MAPPING = {
    np.bool: tf.bool,
    np.int8: tf.int8,
    np.int16: tf.int16,
    np.int32: tf.int32,
    np.int64: tf.int64,
    np.uint8: tf.uint8,
    np.uint16: tf.int32,
    np.uint32: tf.int64,
    np.float32: tf.float32,
    np.float64: tf.float64,
    np.string_: tf.string,
    np.unicode_: tf.string,
    np.str_: tf.string,
    np.bool_: tf.bool,
    Decimal: tf.string,
    np.datetime64: tf.int64,
}

def date_to_nsec_from_epoch(dt):
    return timegm(dt.timetuple()) * 1000000000


_date_to_nsec_from_epoch_vectorized = np.vectorize(date_to_nsec_from_epoch)


def _sanitize_field_tf_types(sample):
    """Takes a named tuple and casts/promotes types unknown to TF to the types that are known.
    Three casts that are currently implemented
      - Decimal to string
      - uint16 to int32
      - np.datetime64 to int64, as nanoseconds since unix epoch
    :param sample: named tuple or a dictionary
    :return: same type as the input with values casted to types supported by Tensorflow
    """
    next_sample_dict = sample._asdict()

    for k, v in next_sample_dict.items():
        if v is None:
            raise RuntimeError('Encountered "{}"=None. Tensorflow does not support None values as a tensor.'
                               'Consider filtering out these rows using a predicate.'.format(k))
        # Assuming conversion to the same numpy type is trivial and dirty cheap
        if isinstance(v, Decimal):
            # Normalizing decimals only to get rid of the trailing zeros (makes testing easier, assuming has
            # no other effect)
            next_sample_dict[k] = str(v.normalize())
        elif isinstance(v, np.ndarray) and np.issubdtype(v.dtype, np.datetime64):
            # Convert to nanoseconds from POSIX epoch
            next_sample_dict[k] = (v - np.datetime64('1970-01-01T00:00:00.0')) \
                .astype('timedelta64[ns]').astype(np.int64)
        elif isinstance(v, np.ndarray) and v.dtype == np.uint16:
            next_sample_dict[k] = v.astype(np.int32)
        elif isinstance(v, np.ndarray) and v.dtype == np.uint32:
            next_sample_dict[k] = v.astype(np.int64)
        elif isinstance(v, np.ndarray) and v.dtype.type in (np.bytes_, np.unicode_):
            if v.size != 0:
                next_sample_dict[k] = v.tolist()
        elif isinstance(v, np.ndarray) and v.dtype.kind == 'O' and isinstance(v[0], datetime.date):
            # Pyarrow 0.12.1 started returning python datetime.date when parquet column is a DateType() column.
            # Convert values in such column into nsec from epoch int64.
            next_sample_dict[k] = _date_to_nsec_from_epoch_vectorized(v)

    # Construct object of the same type as the input
    return sample.__class__(**next_sample_dict)

In [42]:
from tensorflow.keras.preprocessing.image import ImageDataGenerator

ImageDataGenerator

In [None]:
from petastorm import make_batch_reader
from petastorm.tf_utils import make_petastorm_dataset
from albumentations import (
    Compose, Flip, VerticalFlip, Resize, Rotate, ToFloat
)

augmentations_train = Compose([
    Flip(p=0.5),
    Rotate(limit=(0, 360), p=0.5)
])


def foo(img):
    print(img)
    return img
    
with make_batch_reader(output_url, num_epochs=2) as reader:
    print(dir(reader))
#     dataset = make_petastorm_dataset(reader).map(foo)
#     for data in dataset:
#         break
        
        

['__class__', '__delattr__', '__dict__', '__dir__', '__doc__', '__enter__', '__eq__', '__exit__', '__format__', '__ge__', '__getattribute__', '__gt__', '__hash__', '__init__', '__init_subclass__', '__iter__', '__le__', '__lt__', '__module__', '__ne__', '__new__', '__next__', '__reduce__', '__reduce_ex__', '__repr__', '__setattr__', '__sizeof__', '__str__', '__subclasshook__', '__weakref__', '_apply_predicate_to_row_groups', '_apply_row_group_selector', '_create_ventilator', '_filter_row_groups', '_normalize_shuffle_options', '_partition_row_groups', '_results_queue_reader', '_workers_pool', 'batched_output', 'dataset', 'diagnostics', 'is_batched_reader', 'join', 'last_row_consumed', 'next', 'ngram', 'reset', 'schema', 'stop', 'stopped', 'ventilator']


In [None]:
img