Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

NXP-30366: allow storage of S3 blobs in spread out subdirectories (10.10) #4923

Merged
merged 1 commit into from Aug 27, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Expand Up @@ -20,14 +20,17 @@

import static org.apache.commons.io.output.NullOutputStream.NULL_OUTPUT_STREAM;
import static org.apache.commons.lang3.StringUtils.isNotBlank;
import static org.nuxeo.ecm.blob.s3.S3BlobStoreConfiguration.DELIMITER;
import static org.nuxeo.ecm.core.blob.BlobProviderDescriptor.ALLOW_BYTE_RANGE;
import static org.nuxeo.ecm.core.blob.KeyStrategy.VER_SEP;

import java.io.File;
import java.io.IOException;
import java.io.InputStream;
import java.nio.file.FileSystems;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.time.Duration;
import java.util.Calendar;
import java.util.Collections;
Expand Down Expand Up @@ -55,6 +58,9 @@
import org.nuxeo.ecm.core.blob.KeyStrategy;
import org.nuxeo.ecm.core.blob.KeyStrategyDigest;
import org.nuxeo.ecm.core.blob.KeyStrategyDocId;
import org.nuxeo.ecm.core.blob.PathStrategy;
import org.nuxeo.ecm.core.blob.PathStrategyFlat;
import org.nuxeo.ecm.core.blob.PathStrategySubDirs;
import org.nuxeo.ecm.core.blob.binary.BinaryGarbageCollector;
import org.nuxeo.ecm.core.io.download.DownloadHelper;
import org.nuxeo.ecm.core.model.Repository;
Expand Down Expand Up @@ -112,6 +118,10 @@ public class S3BlobStore extends AbstractBlobStore {

protected final String bucketPrefix;

protected final PathStrategy pathStrategy;

protected final boolean pathSeparatorIsBackslash;

protected final boolean allowByteRange;

// note, we may choose to not use versions even in a versioned bucket
Expand All @@ -136,6 +146,14 @@ public S3BlobStore(String blobProviderId, String name, S3BlobStoreConfiguration
amazonS3 = config.amazonS3;
bucketName = config.bucketName;
bucketPrefix = config.bucketPrefix;
Path p = Paths.get(bucketPrefix);
int subDirsDepth = config.getSubDirsDepth();
if (subDirsDepth == 0) {
pathStrategy = new PathStrategyFlat(p);
} else {
pathStrategy = new PathStrategySubDirs(p, subDirsDepth);
}
pathSeparatorIsBackslash = FileSystems.getDefault().getSeparator().equals("\\");
allowByteRange = config.getBooleanProperty(ALLOW_BYTE_RANGE);
// don't use versions if we use deduplication (including managed case)
useVersion = isBucketVersioningEnabled() && keyStrategy instanceof KeyStrategyDocId;
Expand Down Expand Up @@ -180,6 +198,15 @@ protected boolean supportsAsyncDigest(Repository repository) {
return repository.hasCapability(Repository.CAPABILITY_QUERY_BLOB_KEYS);
}

protected String bucketKey(String key) {
String path = pathStrategy.getPathForKey(key).toString();
if (pathSeparatorIsBackslash) {
// correct for our abuse of Path under Windows
path = path.replace("\\", DELIMITER);
}
return path;
}

@Override
protected String writeBlobGeneric(BlobWriteContext blobWriteContext) throws IOException {
Path file;
Expand Down Expand Up @@ -241,7 +268,7 @@ protected String writeBlobGeneric(BlobWriteContext blobWriteContext) throws IOEx
/** Writes a file with the given key and returns its version id. */
protected String writeFile(String key, Path file, BlobContext blobContext, String fileTraceSource)
throws IOException {
String bucketKey = bucketPrefix + key;
String bucketKey = bucketKey(key);
long t0 = 0;
if (log.isDebugEnabled()) {
t0 = System.currentTimeMillis();
Expand Down Expand Up @@ -352,7 +379,7 @@ protected boolean exists(String bucketKey) {

/** @return object length, or -1 if missing */
protected long lengthOfBlob(String key) {
String bucketKey = bucketPrefix + key;
String bucketKey = bucketKey(key);
try {
logTrace("-->", "getObjectMetadata");
logTrace("hnote right: " + bucketKey);
Expand All @@ -377,11 +404,12 @@ public void clear() {
do {
if (list == null) {
logTrace("->", "listObjects");
// use delimiter to avoid useless listing of objects in "subdirectories"
ListObjectsRequest listObjectsRequest = //
new ListObjectsRequest().withBucketName(bucketName)
.withPrefix(bucketPrefix)
.withDelimiter(S3BlobStoreConfiguration.DELIMITER);
ListObjectsRequest listObjectsRequest = new ListObjectsRequest().withBucketName(bucketName)
.withPrefix(bucketPrefix);
if (config.getSubDirsDepth() == 0) {
// use delimiter to avoid useless listing of objects in "subdirectories"
listObjectsRequest.setDelimiter(DELIMITER);
}
list = amazonS3.listObjects(listObjectsRequest);
} else {
list = amazonS3.listNextBatchOfObjects(list);
Expand All @@ -401,10 +429,11 @@ public void clear() {
do {
if (vlist == null) {
logTrace("->", "listVersions");
ListVersionsRequest listVersionsRequest = //
new ListVersionsRequest().withBucketName(bucketName)
.withPrefix(bucketPrefix)
.withDelimiter(S3BlobStoreConfiguration.DELIMITER);
ListVersionsRequest listVersionsRequest = new ListVersionsRequest().withBucketName(bucketName)
.withPrefix(bucketPrefix);
if (config.getSubDirsDepth() == 0) {
listVersionsRequest.setDelimiter(DELIMITER);
}
vlist = amazonS3.listVersions(listVersionsRequest);
} else {
vlist = amazonS3.listNextBatchOfVersions(vlist);
Expand Down Expand Up @@ -444,7 +473,7 @@ public boolean readBlob(String key, Path dest) throws IOException {
objectKey = key;
versionId = null;
}
String bucketKey = bucketPrefix + objectKey;
String bucketKey = bucketKey(objectKey);
String debugKey = bucketKey + (versionId == null ? "" : "@" + versionId);
String debugObject = "s3://" + bucketName + "/" + debugKey;
try {
Expand Down Expand Up @@ -536,7 +565,7 @@ protected String copyOrMoveBlob(String key, S3BlobStore sourceBlobStore, String
sourceVersionId = sourceKey.substring(seppos + 1);
}
String sourceBucketName = sourceBlobStore.bucketName;
String sourceBucketKey = sourceBlobStore.bucketPrefix + sourceObjectKey;
String sourceBucketKey = sourceBlobStore.bucketKey(sourceObjectKey);

if (key == null) {
// fast digest compute or trigger async digest computation
Expand All @@ -553,7 +582,7 @@ protected String copyOrMoveBlob(String key, S3BlobStore sourceBlobStore, String
}
}

String bucketKey = bucketPrefix + key;
String bucketKey = bucketKey(key);

long t0 = 0;
if (log.isDebugEnabled()) {
Expand Down Expand Up @@ -726,7 +755,7 @@ public void writeBlobProperties(BlobUpdateContext blobUpdateContext) throws IOEx
objectKey = key.substring(0, seppos);
versionId = key.substring(seppos + 1);
}
String bucketKey = bucketPrefix + objectKey;
String bucketKey = bucketKey(objectKey);
try {
if (blobUpdateContext.updateRetainUntil != null) {
if (versionId == null) {
Expand Down Expand Up @@ -795,7 +824,7 @@ public void deleteBlob(String key) {
objectKey = key.substring(0, seppos);
versionId = key.substring(seppos + 1);
}
String bucketKey = bucketPrefix + objectKey;
String bucketKey = bucketKey(objectKey);
try {
if (versionId == null) {
logTrace("->", "deleteObject");
Expand Down Expand Up @@ -840,15 +869,22 @@ public void computeToDelete() {
logTrace("->", "listObjects");
do {
if (list == null) {
// use delimiter to avoid useless listing of objects in "subdirectories"
ListObjectsRequest listObjectsRequest = new ListObjectsRequest(bucketName, bucketPrefix, null,
S3BlobStoreConfiguration.DELIMITER, null);
ListObjectsRequest listObjectsRequest = new ListObjectsRequest().withBucketName(bucketName)
.withPrefix(bucketPrefix);
if (config.getSubDirsDepth() == 0) {
// use delimiter to avoid useless listing of objects in "subdirectories"
listObjectsRequest.setDelimiter(DELIMITER);
}
list = amazonS3.listObjects(listObjectsRequest);
} else {
list = amazonS3.listNextBatchOfObjects(list);
}
for (S3ObjectSummary summary : list.getObjectSummaries()) {
String key = summary.getKey().substring(prefixLength);
String path = summary.getKey().substring(prefixLength);
String key = pathStrategy.getKeyForPath(path);
if (key == null) {
continue;
}
if (useDeDuplication) {
if (!((KeyStrategyDigest) keyStrategy).isValidDigest(key)) {
// ignore files that cannot be digests, for safety
Expand Down
Expand Up @@ -81,6 +81,8 @@ public class S3BlobStoreConfiguration extends CloudBlobStoreConfiguration {

public static final String BUCKET_PREFIX_PROPERTY = "bucket_prefix";

public static final String BUCKET_SUB_DIRS_DEPTH_PROPERTY = "subDirsDepth";

public static final String BUCKET_REGION_PROPERTY = "region";

public static final String AWS_ID_PROPERTY = "awsid";
Expand Down Expand Up @@ -287,6 +289,14 @@ protected String getBucketPrefix() {
return value;
}

protected int getSubDirsDepth() {
int d = getIntProperty(BUCKET_SUB_DIRS_DEPTH_PROPERTY);
if (d < 0) {
d = 0;
}
return d;
}

protected AWSCredentialsProvider getAWSCredentialsProvider() {
String awsID = getProperty(AWS_ID_PROPERTY);
String awsSecret = getProperty(AWS_SECRET_PROPERTY);
Expand Down
@@ -0,0 +1,37 @@
/*
* (C) Copyright 2019 Nuxeo (http://nuxeo.com/) and others.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
* Contributors:
* Florent Guillaume
*/
package org.nuxeo.ecm.blob.s3;

import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;

import org.junit.Test;
import org.nuxeo.runtime.test.runner.Deploy;

@Deploy("org.nuxeo.ecm.core.storage.binarymanager.s3.tests:OSGI-INF/test-blob-provider-s3-subdirs.xml")
public class TestS3BlobStoreSubdirs extends TestS3BlobStoreAbstract {

@Test
public void testFlags() {
assertFalse(bp.isTransactional());
assertFalse(bp.isRecordMode());
assertTrue(bs.getKeyStrategy().useDeDuplication());
}

}
Expand Up @@ -99,6 +99,7 @@ public class TestS3BlobStoreTracing {
protected static final List<String> BLOB_PROVIDER_IDS = Arrays.asList( //
"s3", //
"s3-other", //
"s3-subdirs", //
"s3-sha256-async", //
"s3-nocache", //
"s3-managed", //
Expand Down Expand Up @@ -221,6 +222,17 @@ public void testWrite() throws IOException {
checkTrace("trace-write.txt");
}

@Test
public void testWriteSubDirs() throws IOException {
BlobProvider bp = getBlobProvider("s3-subdirs");

logTrace("== Write (subdirs) ==");
BlobContext blobContext = new BlobContext(new StringBlob(FOO), DOCID1, XPATH);
String key1 = bp.writeBlob(blobContext);
assertEquals(FOO_MD5, key1);
checkTrace("trace-write-subdirs.txt");
}

@Test
public void testWriteAlreadyCached() throws IOException {
BlobProvider bp = getBlobProvider("s3");
Expand Down Expand Up @@ -356,6 +368,20 @@ public void testRead() throws IOException {
checkTrace("trace-read.txt");
}

@Test
public void testReadSubdirs() throws IOException {
BlobProvider bp = getBlobProvider("s3-subdirs");
BlobContext blobContext = new BlobContext(new StringBlob(FOO), DOCID1, XPATH);
String key1 = bp.writeBlob(blobContext);
clearCache(bp);
clearTrace();

logTrace("== Read (subdirs) ==");
Blob blob = bp.readBlob(blobInfo(key1));
assertEquals(FOO, blob.getString());
checkTrace("trace-read-subdirs.txt");
}

@Test
public void testReadAlreadyCached() throws IOException {
BlobProvider bp = getBlobProvider("s3");
Expand Down
@@ -0,0 +1,15 @@
<?xml version="1.0"?>
<component name="org.nuxeo.ecm.blob.s3.test.subdirs" version="1.0.0">
<extension target="org.nuxeo.ecm.core.blob.BlobManager" point="configuration">
<blobprovider name="test">
<class>org.nuxeo.ecm.blob.s3.S3BlobProvider</class>
<property name="bucket_prefix">base/</property>
<property name="subDirsDepth">2</property>
</blobprovider>
<blobprovider name="other">
<class>org.nuxeo.ecm.blob.s3.S3BlobProvider</class>
<property name="bucket_prefix">other/</property>
<property name="subDirsDepth">3</property>
</blobprovider>
</extension>
</component>
Expand Up @@ -12,6 +12,12 @@
<property name="bucket_prefix">other/</property>
</blobprovider>

<blobprovider name="s3-subdirs">
<class>org.nuxeo.ecm.blob.s3.S3BlobProvider</class>
<property name="bucket_prefix">subdirs/</property>
<property name="subDirsDepth">2</property>
</blobprovider>

<blobprovider name="s3-sha256-async">
<class>org.nuxeo.ecm.blob.s3.S3BlobProvider</class>
<property name="bucket_prefix">sha256/</property>
Expand Down
@@ -0,0 +1,13 @@
@startuml
participant Nuxeo order 1
participant Cache order 2
participant S3 order 3
== Read (subdirs) ==
Nuxeo <-- Cache: missing
hnote right: acbd18db4cc2f85cedef654fccc4a4d8
Nuxeo <- S3: read 3 bytes
hnote right: subdirs/ac/bd/acbd18db4cc2f85cedef654fccc4a4d8
hnote right of Cache: ${TMP}
Cache --> Cache: rename
hnote right of Cache: acbd18db4cc2f85cedef654fccc4a4d8
@enduml
@@ -0,0 +1,20 @@
@startuml
participant Nuxeo order 1
participant Cache order 2
participant S3 order 3
== Write (subdirs) ==
Nuxeo -> Cache: write 3 bytes
hnote right: bin_${TMP1}.tmp
Cache --> Cache: rename
hnote right of Cache: ${TMP2}
Nuxeo <-- Cache: missing
hnote right: acbd18db4cc2f85cedef654fccc4a4d8
Nuxeo --> S3: getObjectMetadata
hnote right: subdirs/ac/bd/acbd18db4cc2f85cedef654fccc4a4d8
Nuxeo <-- S3: missing
Nuxeo -> S3: write 3 bytes
hnote right: subdirs/ac/bd/acbd18db4cc2f85cedef654fccc4a4d8
hnote right of Cache: ${TMP2}
Cache --> Cache: rename
hnote right of Cache: acbd18db4cc2f85cedef654fccc4a4d8
@enduml