Skip to content
Permalink
Browse files

Integrate S3 storage driver with HTTP client cache.

This allows copying from one S3 object to another.  We generally try to avoid doing this but there are a few cases where it is needed and the tests do it quite a bit.

One thing to look out for here is that reads require the http client to be explicitly released by calling httpClientDone().  This means than clients could grow if they are not released properly.  The http statistics will hopefully alert us if this is happening.
  • Loading branch information...
dwsteele committed Jun 11, 2019
1 parent ced42d6 commit fdd375b63d3962845efbb38a7020d852143b97fd
@@ -517,7 +517,7 @@ storage/remote/write.o: storage/remote/write.c build.auto.h common/assert.h comm
storage/s3/read.o: storage/s3/read.c build.auto.h common/assert.h common/debug.h common/error.auto.h common/error.h common/io/filter/filter.h common/io/filter/group.h common/io/http/client.h common/io/http/header.h common/io/http/query.h common/io/read.h common/io/read.intern.h common/io/write.h common/io/write.intern.h common/log.h common/logLevel.h common/macro.h common/memContext.h common/object.h common/stackTrace.h common/time.h common/type/buffer.h common/type/convert.h common/type/keyValue.h common/type/string.h common/type/stringList.h common/type/variant.h common/type/variantList.h storage/info.h storage/read.h storage/read.intern.h storage/s3/read.h storage/s3/storage.h storage/s3/storage.intern.h storage/storage.h storage/storage.intern.h storage/write.h storage/write.intern.h version.h
$(CC) $(CPPFLAGS) $(CFLAGS) $(CMAKE) -c storage/s3/read.c -o storage/s3/read.o

storage/s3/storage.o: storage/s3/storage.c build.auto.h common/assert.h common/crypto/hash.h common/debug.h common/encode.h common/error.auto.h common/error.h common/io/filter/filter.h common/io/filter/group.h common/io/http/client.h common/io/http/common.h common/io/http/header.h common/io/http/query.h common/io/read.h common/io/read.intern.h common/io/write.h common/io/write.intern.h common/log.h common/logLevel.h common/macro.h common/memContext.h common/object.h common/regExp.h common/stackTrace.h common/time.h common/type/buffer.h common/type/convert.h common/type/keyValue.h common/type/string.h common/type/stringList.h common/type/variant.h common/type/variantList.h common/type/xml.h storage/info.h storage/read.h storage/read.intern.h storage/s3/read.h storage/s3/storage.h storage/s3/storage.intern.h storage/s3/write.h storage/storage.h storage/storage.intern.h storage/write.h storage/write.intern.h version.h
storage/s3/storage.o: storage/s3/storage.c build.auto.h common/assert.h common/crypto/hash.h common/debug.h common/encode.h common/error.auto.h common/error.h common/io/filter/filter.h common/io/filter/group.h common/io/http/cache.h common/io/http/client.h common/io/http/common.h common/io/http/header.h common/io/http/query.h common/io/read.h common/io/read.intern.h common/io/write.h common/io/write.intern.h common/log.h common/logLevel.h common/macro.h common/memContext.h common/object.h common/regExp.h common/stackTrace.h common/time.h common/type/buffer.h common/type/convert.h common/type/keyValue.h common/type/string.h common/type/stringList.h common/type/variant.h common/type/variantList.h common/type/xml.h storage/info.h storage/read.h storage/read.intern.h storage/s3/read.h storage/s3/storage.h storage/s3/storage.intern.h storage/s3/write.h storage/storage.h storage/storage.intern.h storage/write.h storage/write.intern.h version.h
$(CC) $(CPPFLAGS) $(CFLAGS) $(CMAKE) -c storage/s3/storage.c -o storage/s3/storage.o

storage/s3/write.o: storage/s3/write.c build.auto.h common/assert.h common/debug.h common/error.auto.h common/error.h common/io/filter/filter.h common/io/filter/group.h common/io/http/client.h common/io/http/header.h common/io/http/query.h common/io/read.h common/io/read.intern.h common/io/write.h common/io/write.intern.h common/log.h common/logLevel.h common/macro.h common/memContext.h common/object.h common/stackTrace.h common/time.h common/type/buffer.h common/type/convert.h common/type/keyValue.h common/type/string.h common/type/stringList.h common/type/variant.h common/type/variantList.h common/type/xml.h storage/info.h storage/read.h storage/read.intern.h storage/s3/storage.h storage/s3/storage.intern.h storage/s3/write.h storage/storage.h storage/storage.intern.h storage/write.h storage/write.intern.h version.h
@@ -18,6 +18,9 @@ S3 Storage Read
/***********************************************************************************************************************************
Object type
***********************************************************************************************************************************/
#define STORAGE_READ_S3_TYPE StorageReadS3
#define STORAGE_READ_S3_PREFIX storageReadS3

typedef struct StorageReadS3
{
MemContext *memContext; // Object mem context
@@ -35,6 +38,15 @@ Macros for function logging
#define FUNCTION_LOG_STORAGE_READ_S3_FORMAT(value, buffer, bufferSize) \
objToLog(value, "StorageReadS3", buffer, bufferSize)

/***********************************************************************************************************************************
Mark http client as done so it can be reused
***********************************************************************************************************************************/
OBJECT_DEFINE_FREE_RESOURCE_BEGIN(STORAGE_READ_S3, LOG, logLevelTrace)
{
httpClientDone(this->httpClient);
}
OBJECT_DEFINE_FREE_RESOURCE_END(LOG);

/***********************************************************************************************************************************
Open the file
***********************************************************************************************************************************/
@@ -53,14 +65,13 @@ storageReadS3Open(THIS_VOID)
bool result = false;

// Request the file
storageS3Request(this->storage, HTTP_VERB_GET_STR, this->interface.name, NULL, NULL, false, true);

// On success
this->httpClient = storageS3HttpClient(this->storage);
this->httpClient = storageS3Request(this->storage, HTTP_VERB_GET_STR, this->interface.name, NULL, NULL, false, true).httpClient;

if (httpClientResponseCodeOk(this->httpClient))
{
memContextCallbackSet(this->memContext, storageReadS3FreeResource, this);
result = true;

}
// Else error unless ignore missing
else if (!this->interface.ignoreMissing)
THROW_FMT(FileMissingError, "unable to open '%s': No such file or directory", strPtr(this->interface.name));
@@ -89,6 +100,28 @@ storageReadS3(THIS_VOID, Buffer *buffer, bool block)
FUNCTION_LOG_RETURN(SIZE, ioRead(httpClientIoRead(this->httpClient), buffer));
}

/***********************************************************************************************************************************
Close the file
***********************************************************************************************************************************/
static void
storageReadS3Close(THIS_VOID)
{
THIS(StorageReadS3);

FUNCTION_LOG_BEGIN(logLevelTrace);
FUNCTION_LOG_PARAM(STORAGE_READ_S3, this);
FUNCTION_LOG_END();

ASSERT(this != NULL);
ASSERT(this->httpClient != NULL);

memContextCallbackClear(this->memContext);
storageReadS3FreeResource(this);
this->httpClient = NULL;

FUNCTION_LOG_RETURN_VOID();
}

/***********************************************************************************************************************************
Has file reached EOF?
***********************************************************************************************************************************/
@@ -137,6 +170,7 @@ storageReadS3New(StorageS3 *storage, const String *name, bool ignoreMissing)

.ioInterface = (IoReadInterface)
{
.close = storageReadS3Close,
.eof = storageReadS3Eof,
.open = storageReadS3Open,
.read = storageReadS3,
@@ -9,6 +9,7 @@ S3 Storage
#include "common/crypto/hash.h"
#include "common/encode.h"
#include "common/debug.h"
#include "common/io/http/cache.h"
#include "common/io/http/common.h"
#include "common/log.h"
#include "common/memContext.h"
@@ -81,7 +82,7 @@ Object type
struct StorageS3
{
MemContext *memContext;
HttpClient *httpClient; // Http client to service requests
HttpClientCache *httpClientCache; // Http client cache to service requests
const StringList *headerRedactList; // List of headers to redact from logging

const String *bucket; // Bucket to store data in
@@ -91,7 +92,7 @@ struct StorageS3
const String *securityToken; // Security token, if any
size_t partSize; // Part size for multi-part upload
unsigned int deleteMax; // Maximum objects that can be deleted in one request
const String *host; // Defaults to {bucket}.{endpoint}
const String *bucketEndpoint; // Set to {bucket}.{endpoint}
unsigned int port; // Host port

// Current signing key and date it is valid for
@@ -157,7 +158,7 @@ storageS3Auth(
// Set required headers
httpHeaderPut(httpHeader, S3_HEADER_CONTENT_SHA256_STR, payloadHash);
httpHeaderPut(httpHeader, S3_HEADER_DATE_STR, dateTime);
httpHeaderPut(httpHeader, S3_HEADER_HOST_STR, this->host);
httpHeaderPut(httpHeader, S3_HEADER_HOST_STR, this->bucketEndpoint);

if (this->securityToken != NULL)
httpHeaderPut(httpHeader, S3_HEADER_TOKEN_STR, this->securityToken);
@@ -274,17 +275,20 @@ storageS3Request(
STRDEF("e3b0c44298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b7852b855") :
bufHex(cryptoHashOne(HASH_TYPE_SHA256_STR, body)));

// Get an http client
HttpClient *httpClient = httpClientCacheGet(this->httpClientCache);

// Process request
Buffer *response = httpClientRequest(this->httpClient, verb, uri, query, requestHeader, body, returnContent);
Buffer *response = httpClientRequest(httpClient, verb, uri, query, requestHeader, body, returnContent);

// Error if the request was not successful
if (!httpClientResponseCodeOk(this->httpClient) &&
(!allowMissing || httpClientResponseCode(this->httpClient) != HTTP_RESPONSE_CODE_NOT_FOUND))
if (!httpClientResponseCodeOk(httpClient) &&
(!allowMissing || httpClientResponseCode(httpClient) != HTTP_RESPONSE_CODE_NOT_FOUND))
{
// General error message
String *error = strNewFmt(
"S3 request failed with %u: %s", httpClientResponseCode(this->httpClient),
strPtr(httpClientResponseMessage(this->httpClient)));
"S3 request failed with %u: %s", httpClientResponseCode(httpClient),
strPtr(httpClientResponseMessage(httpClient)));

// Output uri/query
strCat(error, "\n*** URI/Query ***:");
@@ -310,7 +314,7 @@ storageS3Request(
}

// Output response headers
const HttpHeader *responseHeader = httpClientReponseHeader(this->httpClient);
const HttpHeader *responseHeader = httpClientReponseHeader(httpClient);
const StringList *responseHeaderList = httpHeaderList(responseHeader);

if (strLstSize(responseHeaderList) > 0)
@@ -332,7 +336,8 @@ storageS3Request(
}

// On success move the buffer to the calling context
result.responseHeader = httpHeaderMove(httpHeaderDup(httpClientReponseHeader(this->httpClient), NULL), MEM_CONTEXT_OLD());
result.httpClient = httpClient;
result.responseHeader = httpHeaderMove(httpHeaderDup(httpClientReponseHeader(httpClient), NULL), MEM_CONTEXT_OLD());
result.response = bufMove(response, MEM_CONTEXT_OLD());
}
MEM_CONTEXT_TEMP_END();
@@ -490,8 +495,7 @@ storageS3Exists(THIS_VOID, const String *file)

MEM_CONTEXT_TEMP_BEGIN()
{
storageS3Request(this, HTTP_VERB_HEAD_STR, file, NULL, NULL, false, true);
result = httpClientResponseCodeOk(this->httpClient);
result = httpClientResponseCodeOk(storageS3Request(this, HTTP_VERB_HEAD_STR, file, NULL, NULL, true, true).httpClient);
}
MEM_CONTEXT_TEMP_END();

@@ -518,10 +522,10 @@ storageS3Info(THIS_VOID, const String *file, bool followLink)
StorageInfo result = {0};

// Attempt to get file info
StorageS3RequestResult httpResult = storageS3Request(this, HTTP_VERB_HEAD_STR, file, NULL, NULL, false, true);
StorageS3RequestResult httpResult = storageS3Request(this, HTTP_VERB_HEAD_STR, file, NULL, NULL, true, true);

// On success load info into a structure
if (httpClientResponseCodeOk(this->httpClient))
if (httpClientResponseCodeOk(httpResult.httpClient))
{
result.exists = true;
result.size = cvtZToUInt64(strPtr(httpHeaderGet(httpResult.responseHeader, HTTP_HEADER_CONTENT_LENGTH_STR)));
@@ -844,26 +848,11 @@ storageS3Remove(THIS_VOID, const String *file, bool errorOnMissing)
ASSERT(file != NULL);
ASSERT(!errorOnMissing);

storageS3Request(this, HTTP_VERB_DELETE_STR, file, NULL, NULL, false, false);
storageS3Request(this, HTTP_VERB_DELETE_STR, file, NULL, NULL, true, false);

FUNCTION_LOG_RETURN_VOID();
}

/***********************************************************************************************************************************
Get http client
***********************************************************************************************************************************/
HttpClient *
storageS3HttpClient(const StorageS3 *this)
{
FUNCTION_TEST_BEGIN();
FUNCTION_LOG_PARAM(STORAGE_S3, this);
FUNCTION_TEST_END();

ASSERT(this != NULL);

FUNCTION_TEST_RETURN(this->httpClient);
}

/***********************************************************************************************************************************
New object
***********************************************************************************************************************************/
@@ -914,14 +903,15 @@ storageS3New(
driver->securityToken = strDup(securityToken);
driver->partSize = partSize;
driver->deleteMax = deleteMax;
driver->host = strNewFmt("%s.%s", strPtr(bucket), strPtr(endPoint));
driver->bucketEndpoint = strNewFmt("%s.%s", strPtr(bucket), strPtr(endPoint));
driver->port = port;

// Force the signing key to be generated on the first run
driver->signingKeyDate = YYYYMMDD_STR;

// Create the http client used to service requests
driver->httpClient = httpClientNew(host == NULL ? driver->host : host, driver->port, timeout, verifyPeer, caFile, caPath);
// Create the http client cache used to service requests
driver->httpClientCache = httpClientCacheNew(
host == NULL ? driver->bucketEndpoint : host, driver->port, timeout, verifyPeer, caFile, caPath);
driver->headerRedactList = strLstAdd(strLstNew(), S3_HEADER_AUTHORIZATION_STR);

this = storageNewP(
@@ -22,6 +22,7 @@ Perform an S3 Request

typedef struct StorageS3RequestResult
{
HttpClient *httpClient;
HttpHeader *responseHeader;
Buffer *response;
} StorageS3RequestResult;
@@ -30,11 +31,6 @@ StorageS3RequestResult storageS3Request(
StorageS3 *this, const String *verb, const String *uri, const HttpQuery *query, const Buffer *body, bool returnContent,
bool allowMissing);

/***********************************************************************************************************************************
Getters
***********************************************************************************************************************************/
HttpClient *storageS3HttpClient(const StorageS3 *this);

/***********************************************************************************************************************************
Macros for function logging
***********************************************************************************************************************************/
@@ -151,7 +151,7 @@ storageWriteS3(THIS_VOID, const Buffer *buffer)
// Continue until the write buffer has been exhausted
do
{
// Copy an many bytes as possible into the part buffer
// Copy as many bytes as possible into the part buffer
size_t bytesNext = bufRemains(this->partBuffer) > bufUsed(buffer) - bytesTotal ?
bufUsed(buffer) - bytesTotal : bufRemains(this->partBuffer);
bufCatSub(this->partBuffer, buffer, bytesTotal, bytesNext);
@@ -208,13 +208,13 @@ storageWriteS3Close(THIS_VOID)
// Finalize the multi-part upload
storageS3Request(
this->storage, HTTP_VERB_POST_STR, this->interface.name,
httpQueryAdd(httpQueryNew(), S3_QUERY_UPLOAD_ID_STR, this->uploadId), xmlDocumentBuf(partList), false, false);
httpQueryAdd(httpQueryNew(), S3_QUERY_UPLOAD_ID_STR, this->uploadId), xmlDocumentBuf(partList), true, false);
}
// Else upload all the data in a single put
else
{
storageS3Request(
this->storage, HTTP_VERB_PUT_STR, this->interface.name, NULL, this->partBuffer, false, false);
this->storage, HTTP_VERB_PUT_STR, this->interface.name, NULL, this->partBuffer, true, false);
}

bufFree(this->partBuffer);

0 comments on commit fdd375b

Please sign in to comment.
You can’t perform that action at this time.