Skip to content

Commit

Permalink
datacube/drivers/s3/storage/s3aio/s3lio.py
Browse files Browse the repository at this point in the history
  - added get_data_unlabelled function to retrieve array data sharded across S3 objects.

datacube/drivers/s3/storage/s3aio/s3aio.py
  - fix num_streams divisor.
  • Loading branch information
petewa committed May 11, 2017
1 parent 57a641e commit bd658e6
Show file tree
Hide file tree
Showing 2 changed files with 85 additions and 24 deletions.
6 changes: 5 additions & 1 deletion datacube/drivers/s3/storage/s3aio/s3aio.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,10 @@ def get_slice(self, array_slice, shape, dtype, s3_bucket, s3_key): # pylint: di
# - data size
# - data contiguity

# truncate array_slice to shape
# array_slice = [slice(max(0, s.start) - min(sh, s.stop)) for s, sh in zip(array_sliced, shape)]
array_slice = [slice(max(0, s.start), min(sh, s.stop)) for s, sh in zip(array_slice, shape)]

cdim = self.cdims(array_slice, shape)

try:
Expand Down Expand Up @@ -133,7 +137,7 @@ def work_get_slice(block, array_name, offset, s3_bucket, s3_key, shape, dtype):

num_processes = cpu_count()
pool = Pool(num_processes)
array_name = '_'.join(['SA3IO', str(uuid.uuid4()), str(os.getpid())])
array_name = '_'.join(['S3AIO', str(uuid.uuid4()), str(os.getpid())])
sa.create(array_name, shape=[s.stop - s.start for s in array_slice], dtype=dtype)
shared_array = sa.attach(array_name)

Expand Down
103 changes: 80 additions & 23 deletions datacube/drivers/s3/storage/s3aio/s3lio.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
import uuid
import hashlib
import numpy as np
from six import integer_types
from six.moves import map, zip
from itertools import repeat, product
from pathos.multiprocessing import ProcessingPool as Pool
Expand All @@ -30,23 +31,29 @@ class S3LIO(object):
def __init__(self, enable_s3=True, file_path=None):
self.s3aio = S3AIO(enable_s3, file_path)

def chunk_indices_1d(self, begin, end, step, bound_slice=None):
def chunk_indices_1d(self, begin, end, step, bound_slice=None, return_as_shape=False):
if bound_slice is None:
for i in range(begin, end, step):
yield slice(i, min(end, i + step))
if return_as_shape:
yield min(end, i + step) - i
else:
yield slice(i, min(end, i + step))
else:
bound_begin = bound_slice.start
bound_end = bound_slice.stop
end = min(end, bound_end)
for i in range(begin, end, step):
if i < bound_begin and i+step <= bound_begin:
continue
yield slice(max(i, bound_begin), min(end, i + step))
if return_as_shape:
yield min(end, i + step) - max(i, bound_begin)
else:
yield slice(max(i, bound_begin), min(end, i + step))

def chunk_indices_nd(self, shape, chunk, array_slice=None):
def chunk_indices_nd(self, shape, chunk, array_slice=None, return_as_shape=False):
if array_slice is None:
array_slice = repeat(None)
var1 = map(self.chunk_indices_1d, repeat(0), shape, chunk, array_slice)
var1 = map(self.chunk_indices_1d, repeat(0), shape, chunk, array_slice, repeat(return_as_shape))
return product(*var1)

def put_array_in_s3(self, array, chunk_size, base_name, bucket, spread=False):
Expand All @@ -56,6 +63,7 @@ def put_array_in_s3(self, array, chunk_size, base_name, bucket, spread=False):
if spread:
keys = [hashlib.md5(k.encode('utf-8')).hexdigest()[0:6] + '_' + k for k in keys]
self.shard_array_to_s3(array, idx, bucket, keys)
#s3_dataset_id, chunk, chunk_id
return list(zip(keys, idx, chunk_ids))

def put_array_in_s3_mp(self, array, chunk_size, base_name, bucket, spread=False):
Expand Down Expand Up @@ -107,19 +115,30 @@ def regular_index(self, query, dimension_range, shape, flatten=False):
# regular_index((-35+2*0.128, 149+2*0.128), ((-35,-34),(149,150)), (4000, 4000))
# regular_index((-35+0.128, 149+0.128), ((-35, -35+0.128),(149, 148+0.128)), (512, 512))

length = np.around([dr[1] - dr[0] for dr in dimension_range], S3LIO.DECIMAL_PLACES)
offset = [dr[0] for dr in dimension_range]
if all(isinstance(i, integer_types) for i in dimension_range):
length = dimension_range
offset = [0 for dr in dimension_range]
else:
length = np.around([dr[1] - dr[0] for dr in dimension_range], S3LIO.DECIMAL_PLACES)
offset = [dr[0] for dr in dimension_range]

point = np.around([q-o for q, o in zip(query, offset)], S3LIO.DECIMAL_PLACES)

result = np.floor([(p/l)*s for p, l, s in zip(point, length, shape)]).astype(int)
result = [min(r, s-1) for r, s in zip(result, shape)]

print(length, offset, point, result)

if flatten:
return self.s3aio.to_1d(tuple(result), shape)
# return self.s3aio.to_1d(tuple(result), shape)
macro_shape = tuple([(int(np.ceil(a/b))) for a, b in zip(dimension_range, shape)])
print(result, macro_shape)
return self.s3aio.to_1d(tuple(result), macro_shape)

return result

# labeled geo-coordinates data retrieval.
def get_data(self, base_location, macro_shape, micro_shape, dtype, labeled_slice, s3_bucket):
def get_data(self, base_location, dimension_range, micro_shape, dtype, labeled_slice, s3_bucket):
# shape and chunk are overloaded.
# should use macro_shape to mean shape of the array pre-chunking.
# should use micro_shape to mean chunk size of the array.
Expand All @@ -129,17 +148,55 @@ def get_data(self, base_location, macro_shape, micro_shape, dtype, labeled_slice
pass

# integer index data retrieval.
def get_data_unlabeled(self, base_location, macro_shape, micro_shape, dtype, array_slice, s3_bucket):
# shape and chunk are overloaded.
# should use macro_shape to mean shape of the array pre-chunking.
# should use micro_shape to mean chunk size of the array.
# slices = self.chunk_indices_nd(macro_shape, micro_shape, array_slice)
# chunk_ids = [np.ravel_multi_index(tuple([s.start for s in s]), macro_shape) for s in slices]
# keys = ['_'.join(["base_location", str(i)]) for i in chunk_ids]
# keys = [hashlib.md5(k.encode('utf-8')).hexdigest()[0:6] + '_' + k for k in keys]
# zipped = zip(keys, chunk_ids, slices)

# 1. create shared array of "macro_shape" size and of dtype
# 2. create 1 process per zipped task.
# 3. get slice from s3 and write into shared array.
pass
# pylint: disable=too-many-locals
def get_data_unlabeled(self, base_location, macro_shape, micro_shape, dtype, array_slice, s3_bucket,
use_hash=False):
# TODO(csiro):
# - use SharedArray for data
# - multiprocess the for loop depending on slice size.
# - not very efficient, redo
# - point retrieval via integer index instead of slicing operator.
#
# element_ids = [np.ravel_multi_index(tuple([s.start for s in s]), macro_shape) for s in slices]

# data slices for each chunk
slices = list(self.chunk_indices_nd(macro_shape, micro_shape, array_slice))

# chunk id's for each data slice
slice_starts = [tuple([s.start for s in s]) for s in slices]

chunk_ids = [self.s3aio.to_1d(tuple(np.floor([(p/float(s)) for p, s, in zip(c, micro_shape)]).astype(int)),
tuple([(int(np.ceil(a/float(b)))) for a, b in zip(macro_shape, micro_shape)]))
for c in slice_starts]

# chunk_sizes for each chunk
chunk_shapes = list(self.chunk_indices_nd(macro_shape, micro_shape, None, True))
chunk_shapes = [chunk_shapes[c] for c in chunk_ids]

# compute keys
keys = ['_'.join([base_location, str(i)]) for i in chunk_ids]
if use_hash:
keys = [hashlib.md5(k.encode('utf-8')).hexdigest()[0:6] + '_' + k for k in keys]

# create shared array
array_name = '_'.join(['S3LIO', str(uuid.uuid4()), str(os.getpid())])
sa.create(array_name, shape=[s.stop - s.start for s in array_slice], dtype=dtype)
data = sa.attach(array_name)

# calculate offsets
offset = tuple([i.start for i in array_slice])
# calculate data slices
data_slices = [tuple([slice(s.start-o, s.stop-o) for s, o in zip(s, offset)]) for s in slices]
local_slices = [tuple([slice((s.start % cs if s.start >= cs else s.start),
(s.stop % (cs+1)+1 if s.stop >= cs+1 else s.stop))
for s, cs in zip(s, micro_shape)]) for s in slices]

zipped = zip(keys, data_slices, local_slices, chunk_shapes, repeat(offset))

# get the slices and populate the data array.
for s3_key, data_slice, local_slice, shape, offset in zipped:
data[data_slice] = self.s3aio.get_slice(local_slice, shape, dtype, s3_bucket, s3_key)

sa.delete(array_name)

return data

0 comments on commit bd658e6

Please sign in to comment.