Skip to content

Commit

Permalink
Merge pull request #11 from davidbrochart/aws
Browse files Browse the repository at this point in the history
Add support for AWS S3
  • Loading branch information
JohanMabille committed Nov 20, 2020
2 parents d297c92 + 26709fa commit b0fb930
Show file tree
Hide file tree
Showing 9 changed files with 388 additions and 14 deletions.
4 changes: 2 additions & 2 deletions .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ install:
# Install mamba
- conda install mamba -c conda-forge
# Install host dependencies
- mamba create -n test nlohmann_json google-cloud-cpp cpp-filesystem zlib xtensor-io zarray python fsspec numpy numcodecs sh -c conda-forge
- mamba create -n test nlohmann_json google-cloud-cpp aws-sdk-cpp cpp-filesystem zlib xtensor-io zarray python fsspec numpy numcodecs sh -c conda-forge
# Install build dependencies
- mamba install -n test cmake -c conda-forge
- if [[ "$TRAVIS_OS_NAME" == "linux" ]]; then
Expand All @@ -44,7 +44,7 @@ install:
# Activate test environment
- source activate test
# Install development version of zarrita
- mkdir dev_tmp
- mkdir dev_tmp
- cd dev_tmp
- wget https://github.com/alimanfoo/zarrita/archive/master.tar.gz -O zarrita.tar.gz -q
- mkdir zarrita
Expand Down
24 changes: 24 additions & 0 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ set(XTENSOR_ZARR_HEADERS
${XTENSOR_ZARR_INCLUDE_DIR}/xtensor-zarr/xzarr_array.hpp
${XTENSOR_ZARR_INCLUDE_DIR}/xtensor-zarr/xzarr_file_system_store.hpp
${XTENSOR_ZARR_INCLUDE_DIR}/xtensor-zarr/xzarr_gcs_store.hpp
${XTENSOR_ZARR_INCLUDE_DIR}/xtensor-zarr/xzarr_aws_store.hpp
${XTENSOR_ZARR_INCLUDE_DIR}/xtensor-zarr/xzarr_common.hpp
${XTENSOR_ZARR_INCLUDE_DIR}/xtensor-zarr/xzarr_compressor.hpp
${XTENSOR_ZARR_INCLUDE_DIR}/xtensor-zarr/xzarr_chunked_array.hpp
Expand Down Expand Up @@ -104,6 +105,29 @@ else()
message(WARNING "Google Cloud Storage not found - install google-cloud-cpp for GCS IO handler support")
endif()

if (NOT DEFINED BUILD_SHARED_LIBS)
set (BUILD_SHARED_LIBS ON)
find_package(AWSSDK COMPONENTS s3)
unset (BUILD_SHARED_LIBS)
else ()
find_package(AWSSDK COMPONENTS s3)
endif ()
message(STATUS "Trying to find AWS SDK for AWS S3 IO handler support")
if(${AWSSDK_FOUND})
message(STATUS "AWSSDK ${AWSSDK_VERSION} found, AWS S3 IO handler support enabled")
target_include_directories(xtensor-zarr
INTERFACE
${AWSSDK_INCLUDE_DIRS}
)
target_link_libraries(xtensor-zarr
INTERFACE
${AWSSDK_LINK_LIBRARIES}
)
else()
message(WARNING "AWSSDK not found - install aws-sdk-cpp for AWS S3 IO handler support")
endif()
message(STATUS "AWSSDK_LINK_LIBRARIES ${AWSSDK_LINK_LIBRARIES}")

if(DOWNLOAD_GTEST OR GTEST_SRC_DIR)
set(BUILD_TESTS ON)
endif()
Expand Down
5 changes: 3 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -23,14 +23,15 @@ conda install xtensor-zarr -c conda-forge

- `xtensor-zarr` depends on `xtensor` `^0.21.9`, `xtensor` `^0.8` and `nlohmann_json`.

- `google-cloud-cpp`, `cpp-filesystem`, `zlib`, and `blosc` are optional dependencies to `xtensor-zarr`.
- `google-cloud-cpp`, `aws-sdk-cpp`, `cpp-filesystem`, `zlib`, and `blosc` are optional dependencies to `xtensor-zarr`.

- `google-cloud-cpp` is required to access a store in Google Cloud Storage.
- `aws-sdk-cpp` is required to access a store in AWS S3.
- `cpp-filesystem` is required to access a store on the local file system.
- `zlib` is required for the GZip compressor.
- `blosc` is required for the Blosc compressor.

All four libraries are available for the conda package manager.
All five libraries are available for the conda package manager.

You can also install `xtensor-zarr` from source:

Expand Down
283 changes: 283 additions & 0 deletions include/xtensor-zarr/xzarr_aws_store.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,283 @@
/***************************************************************************
* Copyright (c) Wolf Vollprecht, Sylvain Corlay and Johan Mabille *
* Copyright (c) QuantStack *
* *
* Distributed under the terms of the BSD 3-Clause License. *
* *
* The full license is in the file LICENSE, distributed with this software. *
****************************************************************************/

#ifndef XTENSOR_ZARR_AWS_STORE_HPP
#define XTENSOR_ZARR_AWS_STORE_HPP

#include <iomanip>
#include <fstream>
#include <iostream>
#include <vector>
#include <string>

#include "xtensor-io/xio_aws_handler.hpp"
#include <aws/s3/model/DeleteObjectRequest.h>
#include <aws/s3/model/ListObjectsRequest.h>
#include <aws/s3/model/Object.h>

namespace xt
{
class xzarr_aws_stream
{
public:
xzarr_aws_stream(const Aws::String& path, const Aws::String& bucket, const Aws::S3::S3Client& client);
operator std::string() const;
xzarr_aws_stream& operator=(const std::vector<char>& value);
xzarr_aws_stream& operator=(const std::string& value);

private:
void assign(const char* value, std::size_t size);

Aws::String m_path;
Aws::String m_bucket;
const Aws::S3::S3Client& m_client;
};

class xzarr_aws_store
{
public:
template <class C>
using io_handler = xio_aws_handler<C>;

xzarr_aws_store(const std::string& root, const Aws::S3::S3Client& client);
xzarr_aws_stream operator[](const std::string& key) const;
void set(const std::string& key, const std::vector<char>& value);
void set(const std::string& key, const std::string& value);
std::string get(const std::string& key) const;
void list_dir(const std::string& prefix, std::vector<std::string>& keys, std::vector<std::string>& prefixes) const;
std::vector<std::string> list() const;
std::vector<std::string> list_prefix(const std::string& prefix) const;
void erase(const std::string& key);
void erase_prefix(const std::string& prefix);
const std::string& get_root() const;
xio_aws_config get_io_config() const;

private:
std::string m_root;
Aws::String m_bucket;
const Aws::S3::S3Client& m_client;
};

/***********************************
* xzarr_aws_stream implementation *
***********************************/

xzarr_aws_stream::xzarr_aws_stream(const Aws::String& path, const Aws::String& bucket, const Aws::S3::S3Client& client)
: m_path(path)
, m_bucket(bucket)
, m_client(client)
{
}

xzarr_aws_stream::operator std::string() const
{
Aws::S3::Model::GetObjectRequest object_request;
object_request.SetBucket(m_bucket);
object_request.SetKey(m_path);

Aws::S3::Model::GetObjectOutcome outcome = m_client.GetObject(object_request);

if (!outcome.IsSuccess())
{
auto err = outcome.GetError();
XTENSOR_THROW(std::runtime_error, std::string("Error: GetObject: ") + err.GetExceptionName().c_str() + ": " + err.GetMessage().c_str());
}

auto& reader = outcome.GetResultWithOwnership().GetBody();
std::string bytes(std::istreambuf_iterator<char>(reader), {});

return bytes;
}

xzarr_aws_stream& xzarr_aws_stream::operator=(const std::vector<char>& value)
{
assign(value.data(), value.size());
return *this;
}

xzarr_aws_stream& xzarr_aws_stream::operator=(const std::string& value)
{
assign(value.c_str(), value.size());
return *this;
}

void xzarr_aws_stream::assign(const char* value, std::size_t size)
{
Aws::S3::Model::PutObjectRequest request;
request.SetBucket(m_bucket);
request.SetKey(m_path);

std::shared_ptr<Aws::IOStream> writer = Aws::MakeShared<Aws::FStream>("SampleAllocationTag", m_path.c_str(), std::ios_base::in | std::ios_base::binary);
writer->write(value, size);
writer->flush();

request.SetBody(writer);

Aws::S3::Model::PutObjectOutcome outcome = m_client.PutObject(request);

if (!outcome.IsSuccess())
{
auto err = outcome.GetError();
XTENSOR_THROW(std::runtime_error, std::string("Error: PutObject: ") + err.GetExceptionName().c_str() + ": " + err.GetMessage().c_str());
}
}

/**********************************
* xzarr_aws_store implementation *
**********************************/

xzarr_aws_store::xzarr_aws_store(const std::string& root, const Aws::S3::S3Client& client)
: m_root(root)
, m_client(client)
{
if (m_root.empty())
{
XTENSOR_THROW(std::runtime_error, "Root directory cannot be empty");
}
while (m_root.back() == '/')
{
m_root.pop_back();
}
std::size_t i = m_root.find('/');
if (i == std::string::npos)
{
m_bucket = m_root.c_str();
m_root = "";
}
else
{
m_bucket = m_root.substr(0, i).c_str();
m_root = m_root.substr(i + 1);
}
}

xzarr_aws_stream xzarr_aws_store::operator[](const std::string& key) const
{
return xzarr_aws_stream((m_root + '/' + key).c_str(), m_bucket, m_client);
}

void xzarr_aws_store::set(const std::string& key, const std::vector<char>& value)
{
xzarr_aws_stream((m_root + '/' + key).c_str(), m_bucket, m_client) = value;
}

void xzarr_aws_store::set(const std::string& key, const std::string& value)
{
xzarr_aws_stream((m_root + '/' + key).c_str(), m_bucket, m_client) = value;
}

std::string xzarr_aws_store::get(const std::string& key) const
{
return xzarr_aws_stream((m_root + '/' + key).c_str(), m_bucket, m_client);
}

void xzarr_aws_store::list_dir(const std::string& prefix, std::vector<std::string>& keys, std::vector<std::string>& prefixes) const
{
Aws::S3::Model::ListObjectsRequest request;
std::string full_prefix = prefix;
if (!m_root.empty())
{
full_prefix = m_root + '/' + prefix;
}
request.WithBucket(m_bucket).WithPrefix(full_prefix.c_str());
auto outcome = m_client.ListObjects(request);
if (!outcome.IsSuccess())
{
auto err = outcome.GetError();
XTENSOR_THROW(std::runtime_error, std::string("Error: ListObjects: ") + err.GetExceptionName().c_str() + ": " + err.GetMessage().c_str());
}
Aws::Vector<Aws::S3::Model::Object> objects = outcome.GetResult().GetContents();

for (Aws::S3::Model::Object& object: objects)
{
auto key = object.GetKey();
std::size_t i = key.find('/');
if (i == std::string::npos)
{
keys.push_back(key.c_str());
}
else
{
key = key.substr(0, i + 1);
if (prefixes.empty())
{
prefixes.push_back(key.c_str());
}
else
{
if (prefixes.back() != key.c_str())
{
prefixes.push_back(key.c_str());
}
}
}
}
}

std::vector<std::string> xzarr_aws_store::list() const
{
return list_prefix("");
}

std::vector<std::string> xzarr_aws_store::list_prefix(const std::string& prefix) const
{
std::string full_prefix = prefix;
if (!m_root.empty())
{
full_prefix = m_root + '/' + prefix;
}
Aws::S3::Model::ListObjectsRequest request;
request.WithBucket(m_bucket).WithPrefix(full_prefix.c_str());
auto outcome = m_client.ListObjects(request);
if (!outcome.IsSuccess())
{
auto err = outcome.GetError();
XTENSOR_THROW(std::runtime_error, std::string("Error: ListObjects: ") + err.GetExceptionName().c_str() + ": " + err.GetMessage().c_str());
}
Aws::Vector<Aws::S3::Model::Object> objects = outcome.GetResult().GetContents();

std::vector<std::string> keys(objects.size());
std::transform(objects.begin(), objects.end(), keys.begin(), [](const auto& k) { return k.GetKey().c_str(); });
return keys;
}

void xzarr_aws_store::erase(const std::string& key)
{
Aws::S3::Model::DeleteObjectRequest request;
request.WithKey((m_root + '/' + key).c_str()).WithBucket(m_bucket);
Aws::S3::Model::DeleteObjectOutcome outcome = m_client.DeleteObject(request);
if (!outcome.IsSuccess())
{
auto err = outcome.GetError();
XTENSOR_THROW(std::runtime_error, std::string("Error: DeleteObject: ") + err.GetExceptionName().c_str() + ": " + err.GetMessage().c_str());
}
}

void xzarr_aws_store::erase_prefix(const std::string& prefix)
{
for (const auto& key: list_prefix(prefix))
{
erase(key);
}
}

const std::string& xzarr_aws_store::get_root() const
{
return m_root;
}

xio_aws_config xzarr_aws_store::get_io_config() const
{
xio_aws_config c = {m_client, m_bucket};
return c;
}

}

#endif
6 changes: 3 additions & 3 deletions include/xtensor-zarr/xzarr_file_system_store.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ namespace xt
std::vector<std::string> list();
std::vector<std::string> list_prefix(const std::string& prefix);
void erase(const std::string& key);
void delete_prefix(const std::string& prefix);
void erase_prefix(const std::string& prefix);
void set(const std::string& key, const std::vector<char>& value);
void set(const std::string& key, const std::string& value);
std::string get(const std::string& key);
Expand Down Expand Up @@ -255,10 +255,10 @@ namespace xt
}

/**
* Delete all the keys with the given prefix from the store.
* Erase all the keys with the given prefix from the store.
* @param prefix the prefix
*/
void xzarr_file_system_store::delete_prefix(const std::string& prefix)
void xzarr_file_system_store::erase_prefix(const std::string& prefix)
{
fs::remove_all(m_root + '/' + prefix);
}
Expand Down
4 changes: 2 additions & 2 deletions include/xtensor-zarr/xzarr_gcs_store.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ namespace xt
std::vector<std::string> list_prefix(const std::string& prefix);
void list_dir(const std::string& prefix, std::vector<std::string>& keys, std::vector<std::string>& prefixes);
void erase(const std::string& key);
void delete_prefix(const std::string& prefix);
void erase_prefix(const std::string& prefix);
void set(const std::string& key, const std::vector<char>& value);
void set(const std::string& key, const std::string& value);
std::string get(const std::string& key);
Expand Down Expand Up @@ -209,7 +209,7 @@ namespace xt
}
}

void xzarr_gcs_store::delete_prefix(const std::string& prefix)
void xzarr_gcs_store::erase_prefix(const std::string& prefix)
{
for (const auto& key: list_prefix(prefix))
{
Expand Down

0 comments on commit b0fb930

Please sign in to comment.