Skip to content

Commit

Permalink
datacube/drivers/s3/storage/s3aio/__init__.py
Browse files Browse the repository at this point in the history
datacube/drivers/s3/storage/s3aio/s3lio.py
  - Added S3 Labelled Array IO class
    - labelled array reads/writes that spans multiple s3 objects.
    - stubs for get_data

datacube/drivers/s3/storage/s3aio/s3aio.py
  - moved functionality that spans multiple s3 objects to s3lio
  - function to return s3io
  - comments

datacube/drivers/s3/storage/s3aio/s3io.py
  - comments
  • Loading branch information
petewa committed Mar 28, 2017
1 parent 90a728d commit 7763b0c
Show file tree
Hide file tree
Showing 4 changed files with 109 additions and 50 deletions.
3 changes: 2 additions & 1 deletion datacube/drivers/s3/storage/s3aio/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,8 @@
"""
from __future__ import absolute_import

from .s3lio import S3LIO
from .s3aio import S3AIO
from .s3io import S3IO

__all__ = ['S3AIO', 'S3IO']
__all__ = ['S3LIO', 'S3AIO', 'S3IO']
52 changes: 4 additions & 48 deletions datacube/drivers/s3/storage/s3aio/s3aio.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
'''
S3AIO Class
Array wrapper class
Array access to a single S3 object
'''

Expand All @@ -25,6 +25,9 @@ class S3AIO(object):
def __init__(self, enable_s3=True, file_path=None):
self.s3io = S3IO(enable_s3, file_path)

def s3io(self):
return self.s3io

def bytes_to_array(self, data, shape, dtype):
array = np.empty(shape=shape, dtype=dtype)
array.data[0:len(data)] = data
Expand All @@ -33,53 +36,6 @@ def bytes_to_array(self, data, shape, dtype):
def copy_bytes_to_shared_array(self, shared_array, start, end, data):
shared_array.data[start:end] = data

def chunks_indices_1d(self, begin, end, step):
for i in range(begin, end, step):
yield slice(i, min(end, i + step))

def chunk_indices_nd(self, shape, chunk):
var1 = map(self.chunks_indices_1d, itertools.repeat(0), shape, chunk)
return itertools.product(*var1)

def put_array_in_s3(self, array, chunk_size, base_name, bucket):
idx = list(self.chunk_indices_nd(array.shape, chunk_size))
keys = [base_name+'_'+str(i) for i in range(len(idx))]
self.shard_array_to_s3_mp(array, idx, bucket, keys)
return list(zip(keys, idx))

def shard_array_to_s3(self, array, indices, s3_bucket, s3_keys):
# todo: multiprocess put_bytes or if large put_bytes_mpu
for s3_key, index in zip(s3_keys, indices):
self.s3io.put_bytes(s3_bucket, s3_key, bytes(array[index].data))

def work_shard_array_to_s3(self, args):
return self.work_shard_array_to_s3_impl(*args)

def work_shard_array_to_s3_impl(self, s3_key, index, array_name, s3_bucket):
array = sa.attach(array_name)
self.s3io.put_bytes(s3_bucket, s3_key, bytes(array[index].data))

def shard_array_to_s3_mp(self, array, indices, s3_bucket, s3_keys):
num_processes = cpu_count()
pool = Pool(num_processes)
array_name = '_'.join(['SA3IO', str(uuid.uuid4()), str(os.getpid())])
sa.create(array_name, shape=array.shape, dtype=array.dtype)
shared_array = sa.attach(array_name)
shared_array[:] = array

pool.map_async(self.work_shard_array_to_s3, zip(s3_keys, indices, repeat(array_name), repeat(s3_bucket)))
pool.close()
pool.join()
sa.delete(array_name)

def assemble_array_from_s3(self, array, indices, s3_bucket, s3_keys, dtype):
for s3_key, index in zip(s3_keys, indices):
b = self.s3io.get_bytes(s3_bucket, s3_key)
m = memoryview(b)
shape = tuple((i.stop - i.start) for i in index)
array[index] = np.ndarray(shape, buffer=m, dtype=dtype)
return array

def to_1d(self, index, shape):
return np.ravel_multi_index(index, shape)

Expand Down
2 changes: 1 addition & 1 deletion datacube/drivers/s3/storage/s3aio/s3io.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
'''
S3IO Class
Low level byte read/writes to S3
Low level byte read/writes to a single S3 object
Single-threaded. set new_session = False
Multi-threaded. set new_session = True
Expand Down
102 changes: 102 additions & 0 deletions datacube/drivers/s3/storage/s3aio/s3lio.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
'''
S3LIO Class
Labeled Array access, backed by multiple S3 objects.
'''

import os
import uuid
import itertools
import numpy as np
from itertools import repeat
from multiprocessing import Pool, freeze_support, cpu_count
import SharedArray as sa
try:
from StringIO import StringIO
except ImportError:
from io import StringIO
from pprint import pprint
from .s3aio import S3AIO


class S3LIO(object):

DECIMAL_PLACES = 6

def __init__(self, enable_s3=True, file_path=None):
self.s3aio = S3AIO(enable_s3, file_path)

def chunks_indices_1d(self, begin, end, step):
for i in range(begin, end, step):
yield slice(i, min(end, i + step))

def chunk_indices_nd(self, shape, chunk):
var1 = map(self.chunks_indices_1d, itertools.repeat(0), shape, chunk)
return itertools.product(*var1)

def put_array_in_s3(self, array, chunk_size, base_name, bucket):
idx = list(self.chunk_indices_nd(array.shape, chunk_size))
keys = [base_name+'_'+str(i) for i in range(len(idx))]
self.shard_array_to_s3_mp(array, idx, bucket, keys)
return list(zip(keys, idx))

def shard_array_to_s3(self, array, indices, s3_bucket, s3_keys):
# todo: multiprocess put_bytes or if large put_bytes_mpu
for s3_key, index in zip(s3_keys, indices):
self.s3aio.s3io.put_bytes(s3_bucket, s3_key, bytes(array[index].data))

def work_shard_array_to_s3(self, args):
return self.work_shard_array_to_s3_impl(*args)

def work_shard_array_to_s3_impl(self, s3_key, index, array_name, s3_bucket):
array = sa.attach(array_name)
self.s3aio.s3io.put_bytes(s3_bucket, s3_key, bytes(array[index].data))

def shard_array_to_s3_mp(self, array, indices, s3_bucket, s3_keys):
num_processes = cpu_count()
pool = Pool(num_processes)
array_name = '_'.join(['SA3IO', str(uuid.uuid4()), str(os.getpid())])
sa.create(array_name, shape=array.shape, dtype=array.dtype)
shared_array = sa.attach(array_name)
shared_array[:] = array

pool.map_async(self.work_shard_array_to_s3, zip(s3_keys, indices, repeat(array_name), repeat(s3_bucket)))
pool.close()
pool.join()
sa.delete(array_name)

def assemble_array_from_s3(self, array, indices, s3_bucket, s3_keys, dtype):
# TODO: parallelize this
for s3_key, index in zip(s3_keys, indices):
b = self.s3aio.s3io.get_bytes(s3_bucket, s3_key)
m = memoryview(b)
shape = tuple((i.stop - i.start) for i in index)
array[index] = np.ndarray(shape, buffer=m, dtype=dtype)
return array

# converts positional(spatial/temporal) coordinates to array integer coordinates
def regular_index(self, query, dimension_range, shape):
# regular_index((-35+2*0.128, 149+2*0.128), ((-35,-34),(149,150)), (4000, 4000))
# regular_index((-35+0.128, 149+0.128), ((-35, -35+0.128),(149, 148+0.128)), (512, 512))

length = np.around([dr[1] - dr[0] for dr in dimension_range], S3LIO.DECIMAL_PLACES)
offset = [dr[0] for dr in dimension_range]
point = np.around([q-o for q, o in zip(query, offset)], S3LIO.DECIMAL_PLACES)

result = np.floor([(p/l)*s for p, l, s in zip(point, length, shape)])
return result

# labeled geo-coordinates data retrieval.
def get_data(self, base_location, macro_shape, micro_shape, dtype, slice, s3_bucket):
# shape and chunk are overloaded.
# should use macro_shape to mean shape of the array pre-chunking.
# should use micro_shape to mean chunk size of the array.
pass

# integer index data retrieval.
def get_data_unlabeled(self, base_location, macro_shape, micro_shape, dtype, slice, s3_bucket):
# shape and chunk are overloaded.
# should use macro_shape to mean shape of the array pre-chunking.
# should use micro_shape to mean chunk size of the array.
pass

0 comments on commit 7763b0c

Please sign in to comment.