Skip to content

Commit

Permalink
NXP-30366: allow storage of S3 blobs in spread out subdirectories
Browse files Browse the repository at this point in the history
  • Loading branch information
efge committed Aug 27, 2021
1 parent 3adca75 commit 54c86ad
Show file tree
Hide file tree
Showing 10 changed files with 244 additions and 20 deletions.
Expand Up @@ -20,14 +20,17 @@

import static org.apache.commons.io.output.NullOutputStream.NULL_OUTPUT_STREAM;
import static org.apache.commons.lang3.StringUtils.isNotBlank;
import static org.nuxeo.ecm.blob.s3.S3BlobStoreConfiguration.DELIMITER;
import static org.nuxeo.ecm.core.blob.BlobProviderDescriptor.ALLOW_BYTE_RANGE;
import static org.nuxeo.ecm.core.blob.KeyStrategy.VER_SEP;

import java.io.File;
import java.io.IOException;
import java.io.InputStream;
import java.nio.file.FileSystems;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.time.Duration;
import java.util.Calendar;
import java.util.Collections;
Expand Down Expand Up @@ -55,6 +58,9 @@
import org.nuxeo.ecm.core.blob.KeyStrategy;
import org.nuxeo.ecm.core.blob.KeyStrategyDigest;
import org.nuxeo.ecm.core.blob.KeyStrategyDocId;
import org.nuxeo.ecm.core.blob.PathStrategy;
import org.nuxeo.ecm.core.blob.PathStrategyFlat;
import org.nuxeo.ecm.core.blob.PathStrategySubDirs;
import org.nuxeo.ecm.core.blob.binary.BinaryGarbageCollector;
import org.nuxeo.ecm.core.io.download.DownloadHelper;
import org.nuxeo.ecm.core.model.Repository;
Expand Down Expand Up @@ -112,6 +118,10 @@ public class S3BlobStore extends AbstractBlobStore {

protected final String bucketPrefix;

protected final PathStrategy pathStrategy;

protected final boolean pathSeparatorIsBackslash;

protected final boolean allowByteRange;

// note, we may choose to not use versions even in a versioned bucket
Expand All @@ -136,6 +146,14 @@ public S3BlobStore(String blobProviderId, String name, S3BlobStoreConfiguration
amazonS3 = config.amazonS3;
bucketName = config.bucketName;
bucketPrefix = config.bucketPrefix;
Path p = Paths.get(bucketPrefix);
int subDirsDepth = config.getSubDirsDepth();
if (subDirsDepth == 0) {
pathStrategy = new PathStrategyFlat(p);
} else {
pathStrategy = new PathStrategySubDirs(p, subDirsDepth);
}
pathSeparatorIsBackslash = FileSystems.getDefault().getSeparator().equals("\\");
allowByteRange = config.getBooleanProperty(ALLOW_BYTE_RANGE);
// don't use versions if we use deduplication (including managed case)
useVersion = isBucketVersioningEnabled() && keyStrategy instanceof KeyStrategyDocId;
Expand Down Expand Up @@ -180,6 +198,15 @@ protected boolean supportsAsyncDigest(Repository repository) {
return repository.hasCapability(Repository.CAPABILITY_QUERY_BLOB_KEYS);
}

protected String bucketKey(String key) {
String path = pathStrategy.getPathForKey(key).toString();
if (pathSeparatorIsBackslash) {
// correct for our abuse of Path under Windows
path = path.replace("\\", DELIMITER);
}
return path;
}

@Override
protected String writeBlobGeneric(BlobWriteContext blobWriteContext) throws IOException {
Path file;
Expand Down Expand Up @@ -241,7 +268,7 @@ protected String writeBlobGeneric(BlobWriteContext blobWriteContext) throws IOEx
/** Writes a file with the given key and returns its version id. */
protected String writeFile(String key, Path file, BlobContext blobContext, String fileTraceSource)
throws IOException {
String bucketKey = bucketPrefix + key;
String bucketKey = bucketKey(key);
long t0 = 0;
if (log.isDebugEnabled()) {
t0 = System.currentTimeMillis();
Expand Down Expand Up @@ -352,7 +379,7 @@ protected boolean exists(String bucketKey) {

/** @return object length, or -1 if missing */
protected long lengthOfBlob(String key) {
String bucketKey = bucketPrefix + key;
String bucketKey = bucketKey(key);
try {
logTrace("-->", "getObjectMetadata");
logTrace("hnote right: " + bucketKey);
Expand All @@ -377,11 +404,12 @@ public void clear() {
do {
if (list == null) {
logTrace("->", "listObjects");
// use delimiter to avoid useless listing of objects in "subdirectories"
ListObjectsRequest listObjectsRequest = //
new ListObjectsRequest().withBucketName(bucketName)
.withPrefix(bucketPrefix)
.withDelimiter(S3BlobStoreConfiguration.DELIMITER);
ListObjectsRequest listObjectsRequest = new ListObjectsRequest().withBucketName(bucketName)
.withPrefix(bucketPrefix);
if (config.getSubDirsDepth() == 0) {
// use delimiter to avoid useless listing of objects in "subdirectories"
listObjectsRequest.setDelimiter(DELIMITER);
}
list = amazonS3.listObjects(listObjectsRequest);
} else {
list = amazonS3.listNextBatchOfObjects(list);
Expand All @@ -401,10 +429,11 @@ public void clear() {
do {
if (vlist == null) {
logTrace("->", "listVersions");
ListVersionsRequest listVersionsRequest = //
new ListVersionsRequest().withBucketName(bucketName)
.withPrefix(bucketPrefix)
.withDelimiter(S3BlobStoreConfiguration.DELIMITER);
ListVersionsRequest listVersionsRequest = new ListVersionsRequest().withBucketName(bucketName)
.withPrefix(bucketPrefix);
if (config.getSubDirsDepth() == 0) {
listVersionsRequest.setDelimiter(DELIMITER);
}
vlist = amazonS3.listVersions(listVersionsRequest);
} else {
vlist = amazonS3.listNextBatchOfVersions(vlist);
Expand Down Expand Up @@ -444,7 +473,7 @@ public boolean readBlob(String key, Path dest) throws IOException {
objectKey = key;
versionId = null;
}
String bucketKey = bucketPrefix + objectKey;
String bucketKey = bucketKey(objectKey);
String debugKey = bucketKey + (versionId == null ? "" : "@" + versionId);
String debugObject = "s3://" + bucketName + "/" + debugKey;
try {
Expand Down Expand Up @@ -536,7 +565,7 @@ protected String copyOrMoveBlob(String key, S3BlobStore sourceBlobStore, String
sourceVersionId = sourceKey.substring(seppos + 1);
}
String sourceBucketName = sourceBlobStore.bucketName;
String sourceBucketKey = sourceBlobStore.bucketPrefix + sourceObjectKey;
String sourceBucketKey = sourceBlobStore.bucketKey(sourceObjectKey);

if (key == null) {
// fast digest compute or trigger async digest computation
Expand All @@ -553,7 +582,7 @@ protected String copyOrMoveBlob(String key, S3BlobStore sourceBlobStore, String
}
}

String bucketKey = bucketPrefix + key;
String bucketKey = bucketKey(key);

long t0 = 0;
if (log.isDebugEnabled()) {
Expand Down Expand Up @@ -726,7 +755,7 @@ public void writeBlobProperties(BlobUpdateContext blobUpdateContext) throws IOEx
objectKey = key.substring(0, seppos);
versionId = key.substring(seppos + 1);
}
String bucketKey = bucketPrefix + objectKey;
String bucketKey = bucketKey(objectKey);
try {
if (blobUpdateContext.updateRetainUntil != null) {
if (versionId == null) {
Expand Down Expand Up @@ -795,7 +824,7 @@ public void deleteBlob(String key) {
objectKey = key.substring(0, seppos);
versionId = key.substring(seppos + 1);
}
String bucketKey = bucketPrefix + objectKey;
String bucketKey = bucketKey(objectKey);
try {
if (versionId == null) {
logTrace("->", "deleteObject");
Expand Down Expand Up @@ -840,15 +869,22 @@ public void computeToDelete() {
logTrace("->", "listObjects");
do {
if (list == null) {
// use delimiter to avoid useless listing of objects in "subdirectories"
ListObjectsRequest listObjectsRequest = new ListObjectsRequest(bucketName, bucketPrefix, null,
S3BlobStoreConfiguration.DELIMITER, null);
ListObjectsRequest listObjectsRequest = new ListObjectsRequest().withBucketName(bucketName)
.withPrefix(bucketPrefix);
if (config.getSubDirsDepth() == 0) {
// use delimiter to avoid useless listing of objects in "subdirectories"
listObjectsRequest.setDelimiter(DELIMITER);
}
list = amazonS3.listObjects(listObjectsRequest);
} else {
list = amazonS3.listNextBatchOfObjects(list);
}
for (S3ObjectSummary summary : list.getObjectSummaries()) {
String key = summary.getKey().substring(prefixLength);
String path = summary.getKey().substring(prefixLength);
String key = pathStrategy.getKeyForPath(path);
if (key == null) {
continue;
}
if (useDeDuplication) {
if (!((KeyStrategyDigest) keyStrategy).isValidDigest(key)) {
// ignore files that cannot be digests, for safety
Expand Down
Expand Up @@ -81,6 +81,8 @@ public class S3BlobStoreConfiguration extends CloudBlobStoreConfiguration {

public static final String BUCKET_PREFIX_PROPERTY = "bucket_prefix";

public static final String BUCKET_SUB_DIRS_DEPTH_PROPERTY = "subDirsDepth";

public static final String BUCKET_REGION_PROPERTY = "region";

public static final String AWS_ID_PROPERTY = "awsid";
Expand Down Expand Up @@ -287,6 +289,14 @@ protected String getBucketPrefix() {
return value;
}

protected int getSubDirsDepth() {
int d = getIntProperty(BUCKET_SUB_DIRS_DEPTH_PROPERTY);
if (d < 0) {
d = 0;
}
return d;
}

protected AWSCredentialsProvider getAWSCredentialsProvider() {
String awsID = getProperty(AWS_ID_PROPERTY);
String awsSecret = getProperty(AWS_SECRET_PROPERTY);
Expand Down
@@ -0,0 +1,37 @@
/*
* (C) Copyright 2019 Nuxeo (http://nuxeo.com/) and others.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
* Contributors:
* Florent Guillaume
*/
package org.nuxeo.ecm.blob.s3;

import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;

import org.junit.Test;
import org.nuxeo.runtime.test.runner.Deploy;

@Deploy("org.nuxeo.ecm.core.storage.binarymanager.s3.tests:OSGI-INF/test-blob-provider-s3-subdirs.xml")
public class TestS3BlobStoreSubdirs extends TestS3BlobStoreAbstract {

@Test
public void testFlags() {
assertFalse(bp.isTransactional());
assertFalse(bp.isRecordMode());
assertTrue(bs.getKeyStrategy().useDeDuplication());
}

}
Expand Up @@ -99,6 +99,7 @@ public class TestS3BlobStoreTracing {
protected static final List<String> BLOB_PROVIDER_IDS = Arrays.asList( //
"s3", //
"s3-other", //
"s3-subdirs", //
"s3-sha256-async", //
"s3-nocache", //
"s3-managed", //
Expand Down Expand Up @@ -221,6 +222,17 @@ public void testWrite() throws IOException {
checkTrace("trace-write.txt");
}

@Test
public void testWriteSubDirs() throws IOException {
BlobProvider bp = getBlobProvider("s3-subdirs");

logTrace("== Write (subdirs) ==");
BlobContext blobContext = new BlobContext(new StringBlob(FOO), DOCID1, XPATH);
String key1 = bp.writeBlob(blobContext);
assertEquals(FOO_MD5, key1);
checkTrace("trace-write-subdirs.txt");
}

@Test
public void testWriteAlreadyCached() throws IOException {
BlobProvider bp = getBlobProvider("s3");
Expand Down Expand Up @@ -356,6 +368,20 @@ public void testRead() throws IOException {
checkTrace("trace-read.txt");
}

@Test
public void testReadSubdirs() throws IOException {
BlobProvider bp = getBlobProvider("s3-subdirs");
BlobContext blobContext = new BlobContext(new StringBlob(FOO), DOCID1, XPATH);
String key1 = bp.writeBlob(blobContext);
clearCache(bp);
clearTrace();

logTrace("== Read (subdirs) ==");
Blob blob = bp.readBlob(blobInfo(key1));
assertEquals(FOO, blob.getString());
checkTrace("trace-read-subdirs.txt");
}

@Test
public void testReadAlreadyCached() throws IOException {
BlobProvider bp = getBlobProvider("s3");
Expand Down
@@ -0,0 +1,15 @@
<?xml version="1.0"?>
<component name="org.nuxeo.ecm.blob.s3.test.subdirs" version="1.0.0">
<extension target="org.nuxeo.ecm.core.blob.BlobManager" point="configuration">
<blobprovider name="test">
<class>org.nuxeo.ecm.blob.s3.S3BlobProvider</class>
<property name="bucket_prefix">base/</property>
<property name="subDirsDepth">2</property>
</blobprovider>
<blobprovider name="other">
<class>org.nuxeo.ecm.blob.s3.S3BlobProvider</class>
<property name="bucket_prefix">other/</property>
<property name="subDirsDepth">3</property>
</blobprovider>
</extension>
</component>
Expand Up @@ -12,6 +12,12 @@
<property name="bucket_prefix">other/</property>
</blobprovider>

<blobprovider name="s3-subdirs">
<class>org.nuxeo.ecm.blob.s3.S3BlobProvider</class>
<property name="bucket_prefix">subdirs/</property>
<property name="subDirsDepth">2</property>
</blobprovider>

<blobprovider name="s3-sha256-async">
<class>org.nuxeo.ecm.blob.s3.S3BlobProvider</class>
<property name="bucket_prefix">sha256/</property>
Expand Down
@@ -0,0 +1,13 @@
@startuml
participant Nuxeo order 1
participant Cache order 2
participant S3 order 3
== Read (subdirs) ==
Nuxeo <-- Cache: missing
hnote right: acbd18db4cc2f85cedef654fccc4a4d8
Nuxeo <- S3: read 3 bytes
hnote right: subdirs/ac/bd/acbd18db4cc2f85cedef654fccc4a4d8
hnote right of Cache: ${TMP}
Cache --> Cache: rename
hnote right of Cache: acbd18db4cc2f85cedef654fccc4a4d8
@enduml
@@ -0,0 +1,20 @@
@startuml
participant Nuxeo order 1
participant Cache order 2
participant S3 order 3
== Write (subdirs) ==
Nuxeo -> Cache: write 3 bytes
hnote right: bin_${TMP1}.tmp
Cache --> Cache: rename
hnote right of Cache: ${TMP2}
Nuxeo <-- Cache: missing
hnote right: acbd18db4cc2f85cedef654fccc4a4d8
Nuxeo --> S3: getObjectMetadata
hnote right: subdirs/ac/bd/acbd18db4cc2f85cedef654fccc4a4d8
Nuxeo <-- S3: missing
Nuxeo -> S3: write 3 bytes
hnote right: subdirs/ac/bd/acbd18db4cc2f85cedef654fccc4a4d8
hnote right of Cache: ${TMP2}
Cache --> Cache: rename
hnote right of Cache: acbd18db4cc2f85cedef654fccc4a4d8
@enduml

0 comments on commit 54c86ad

Please sign in to comment.