Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Introduce PutBlobOptions in router API #983

Merged
merged 2 commits into from Jun 29, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Expand Up @@ -37,8 +37,8 @@ public class CallbackUtils {
* {@code failureCallback} will be called.
* @return the managed {@link Callback}.
*/
public static <T> Callback<T> chainCallback(AsyncOperationTracker asyncOperationTracker,
Callback<?> failureCallback, ThrowingConsumer<? super T> successAction) {
public static <T> Callback<T> chainCallback(AsyncOperationTracker asyncOperationTracker, Callback<?> failureCallback,
ThrowingConsumer<? super T> successAction) {
asyncOperationTracker.markOperationStart();
return (result, exception) -> {
try {
Expand Down
@@ -0,0 +1,26 @@
/*
* Copyright 2018 LinkedIn Corp. All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
*
*/

package com.github.ambry.router;

/**
* Represents any options associated with a putBlob request.
*/
public class PutBlobOptions {
@Override
public String toString() {
return "PutBlobOptions{}";
}
}
@@ -0,0 +1,28 @@
/*
* Copyright 2018 LinkedIn Corp. All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
*
*/

package com.github.ambry.router;

/**
* A builder for {@link PutBlobOptions} objects.
*/
public class PutBlobOptionsBuilder {
/**
* @return the {@link PutBlobOptions} built.
*/
public PutBlobOptions build() {
return new PutBlobOptions();
}
}
9 changes: 6 additions & 3 deletions ambry-api/src/main/java/com.github.ambry/router/Router.java
Expand Up @@ -43,11 +43,12 @@ public interface Router extends Closeable {
* channel is consumed fully, and the size of the blob is the number of bytes read from it.
* @param usermetadata Optional user metadata about the blob. This can be null.
* @param channel The {@link ReadableStreamChannel} that contains the content of the blob.
* @param options The {@link PutBlobOptions} associated with the request. This cannot be null.
* @param callback The {@link Callback} which will be invoked on the completion of the request .
* @return A future that would contain the BlobId eventually.
*/
Future<String> putBlob(BlobProperties blobProperties, byte[] usermetadata, ReadableStreamChannel channel,
Callback<String> callback);
PutBlobOptions options, Callback<String> callback);

/**
* Requests for a blob to be deleted asynchronously and invokes the {@link Callback} when the request completes.
Expand Down Expand Up @@ -101,10 +102,12 @@ default Future<GetBlobResult> getBlob(String blobId, GetBlobOptions options) {
* channel is consumed fully, and the size of the blob is the number of bytes read from it.
* @param usermetadata Optional user metadata about the blob. This can be null.
* @param channel The {@link ReadableStreamChannel} that contains the content of the blob.
* @param options The {@link PutBlobOptions} associated with the request. This cannot be null.
* @return A future that would contain the BlobId eventually.
*/
default Future<String> putBlob(BlobProperties blobProperties, byte[] usermetadata, ReadableStreamChannel channel) {
return putBlob(blobProperties, usermetadata, channel, null);
default Future<String> putBlob(BlobProperties blobProperties, byte[] usermetadata, ReadableStreamChannel channel,
PutBlobOptions options) {
return putBlob(blobProperties, usermetadata, channel, options, null);
}

/**
Expand Down
Expand Up @@ -29,7 +29,7 @@ public enum RouterErrorCode {
*/
InvalidBlobId, /**
* Caller passed in an illegal argument for
* {@link Router#putBlob(com.github.ambry.messageformat.BlobProperties, byte[], ReadableStreamChannel)}
* {@link Router#putBlob(com.github.ambry.messageformat.BlobProperties, byte[], ReadableStreamChannel, PutBlobOptions)}
* operation (and its variant).
*/
InvalidPutArgument, /**
Expand Down
Expand Up @@ -23,6 +23,7 @@
import com.github.ambry.router.GetBlobOptions;
import com.github.ambry.router.GetBlobOptionsBuilder;
import com.github.ambry.router.GetBlobResult;
import com.github.ambry.router.PutBlobOptionsBuilder;
import com.github.ambry.router.ReadableStreamChannel;
import com.github.ambry.router.Router;
import com.github.ambry.router.RouterException;
Expand Down Expand Up @@ -104,7 +105,7 @@ public void handlePost(RestRequest restRequest, RestResponseChannel restResponse
restRequest.setArg(RestUtils.InternalKeys.TARGET_CONTAINER_KEY, Container.UNKNOWN_CONTAINER);
BlobProperties blobProperties = RestUtils.buildBlobProperties(restRequest.getArgs());
byte[] usermetadata = RestUtils.buildUserMetadata(restRequest.getArgs());
router.putBlob(blobProperties, usermetadata, restRequest,
router.putBlob(blobProperties, usermetadata, restRequest, new PutBlobOptionsBuilder().build(),
new MockPostCallback(this, restRequest, restResponseChannel, blobProperties));
} catch (RestServiceException e) {
handleResponse(restRequest, restResponseChannel, null, e);
Expand Down
@@ -0,0 +1,34 @@
/*
* Copyright 2018 LinkedIn Corp. All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
*
*/

package com.github.ambry.router;

import org.junit.Test;

import static org.junit.Assert.*;


/**
* Tests for {@link PutBlobOptions}.
*/
public class PutBlobOptionsTest {
/**
* Test {@link PutBlobOptions#toString()}.
*/
@Test
public void testToString() {
assertEquals("Unexpected toString() result", "PutBlobOptions{}", new PutBlobOptions().toString());
}
}
Expand Up @@ -21,7 +21,6 @@
import com.github.ambry.commons.ByteBufferReadableStreamChannel;
import com.github.ambry.config.FrontendConfig;
import com.github.ambry.messageformat.BlobInfo;
import com.github.ambry.messageformat.BlobProperties;
import com.github.ambry.protocol.GetOption;
import com.github.ambry.rest.BlobStorageService;
import com.github.ambry.rest.ResponseStatus;
Expand All @@ -42,12 +41,8 @@
import com.github.ambry.router.RouterException;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.Arrays;
import java.util.Collections;
import java.util.GregorianCalendar;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ExecutionException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down
Expand Up @@ -204,8 +204,10 @@ public FrontendMetrics(MetricRegistry metricRegistry) {
new AsyncOperationTracker.Metrics(PostBlobHandler.class, "postSecurityProcessRequest", metricRegistry);
postSecurityPostProcessRequestMetrics =
new AsyncOperationTracker.Metrics(PostBlobHandler.class, "postSecurityPostProcessRequest", metricRegistry);
postRouterPutBlobMetrics = new AsyncOperationTracker.Metrics(PostBlobHandler.class, "postRouterPutBlob", metricRegistry);
postIdConversionMetrics = new AsyncOperationTracker.Metrics(PostBlobHandler.class, "postIdConversion", metricRegistry);
postRouterPutBlobMetrics =
new AsyncOperationTracker.Metrics(PostBlobHandler.class, "postRouterPutBlob", metricRegistry);
postIdConversionMetrics =
new AsyncOperationTracker.Metrics(PostBlobHandler.class, "postIdConversion", metricRegistry);
postSecurityProcessResponseMetrics =
new AsyncOperationTracker.Metrics(PostBlobHandler.class, "postSecurityProcessResponse", metricRegistry);

Expand Down
Expand Up @@ -21,6 +21,7 @@
import com.github.ambry.rest.RestUtils;
import com.github.ambry.router.Callback;
import com.github.ambry.router.CallbackUtils;
import com.github.ambry.router.PutBlobOptionsBuilder;
import com.github.ambry.router.ReadableStreamChannel;
import com.github.ambry.router.Router;
import com.github.ambry.utils.AsyncOperationTracker;
Expand Down Expand Up @@ -162,7 +163,7 @@ private Callback<Void> securityProcessRequestCallback(BlobInfo blobInfo) {
private Callback<Void> securityPostProcessRequestCallback(BlobInfo blobInfo) {
return buildCallback(frontendMetrics.postSecurityPostProcessRequestMetrics,
securityCheckResult -> router.putBlob(blobInfo.getBlobProperties(), blobInfo.getUserMetadata(), restRequest,
routerPutBlobCallback(blobInfo)));
new PutBlobOptionsBuilder().build(), routerPutBlobCallback(blobInfo)));
}

/**
Expand Down
Expand Up @@ -53,6 +53,8 @@
import com.github.ambry.router.GetBlobOptions;
import com.github.ambry.router.GetBlobResult;
import com.github.ambry.router.InMemoryRouter;
import com.github.ambry.router.PutBlobOptions;
import com.github.ambry.router.PutBlobOptionsBuilder;
import com.github.ambry.router.ReadableStreamChannel;
import com.github.ambry.router.Router;
import com.github.ambry.router.RouterErrorCode;
Expand Down Expand Up @@ -608,7 +610,8 @@ public void oldStyleUserMetadataTest() throws Exception {
new BlobProperties(0, "userMetadataTestOldStyleServiceID", Account.UNKNOWN_ACCOUNT_ID,
Container.UNKNOWN_CONTAINER_ID, false);
byte[] usermetadata = TestUtils.getRandomBytes(25);
String blobId = router.putBlob(blobProperties, usermetadata, new ByteBufferReadableStreamChannel(content)).get();
String blobId = router.putBlob(blobProperties, usermetadata, new ByteBufferReadableStreamChannel(content),
new PutBlobOptionsBuilder().build()).get();

RestUtils.SubResource[] subResources = {RestUtils.SubResource.UserMetadata, RestUtils.SubResource.BlobInfo};
for (RestUtils.SubResource subResource : subResources) {
Expand Down Expand Up @@ -2470,7 +2473,7 @@ public Future<GetBlobResult> getBlob(String blobId, GetBlobOptions options, Call

@Override
public Future<String> putBlob(BlobProperties blobProperties, byte[] usermetadata, ReadableStreamChannel channel,
Callback<String> callback) {
PutBlobOptions options, Callback<String> callback) {
return completeOperation(UtilsTest.getRandomString(10), callback, OpType.PutBlob);
}

Expand Down
Expand Up @@ -177,12 +177,13 @@ public void onCompletion(GetBlobResultInternal internalResult, Exception excepti
* channel is consumed fully, and the size of the blob is the number of bytes read from it.
* @param userMetadata Optional user metadata about the blob. This can be null.
* @param channel The {@link ReadableStreamChannel} that contains the content of the blob.
* @param options The {@link PutBlobOptions} associated with the request. This cannot be null.
* @param callback The {@link Callback} which will be invoked on the completion of the request .
* @return A future that would contain the BlobId eventually.
*/
@Override
public Future<String> putBlob(BlobProperties blobProperties, byte[] userMetadata, ReadableStreamChannel channel,
Callback<String> callback) {
PutBlobOptions options, Callback<String> callback) {
if (blobProperties == null || channel == null) {
throw new IllegalArgumentException("blobProperties or channel must not be null");
}
Expand Down
Expand Up @@ -161,7 +161,8 @@ networkClientFactory, new LoggingNotificationSystem(), mockClusterMap, kms, cryp
putContent = new byte[BLOB_SIZE];
random.nextBytes(putContent);
ReadableStreamChannel putChannel = new ByteBufferReadableStreamChannel(ByteBuffer.wrap(putContent));
String blobIdStr = router.putBlob(blobProperties, userMetadata, putChannel).get();
String blobIdStr =
router.putBlob(blobProperties, userMetadata, putChannel, new PutBlobOptionsBuilder().build()).get();
blobId = RouterUtils.getBlobIdFromString(blobIdStr, mockClusterMap);
networkClient = networkClientFactory.getNetworkClient();
router.close();
Expand Down
Expand Up @@ -219,7 +219,7 @@ private void doPut() throws Exception {
putContent = new byte[blobSize];
random.nextBytes(putContent);
ReadableStreamChannel putChannel = new ByteBufferReadableStreamChannel(ByteBuffer.wrap(putContent));
blobIdStr = router.putBlob(blobProperties, userMetadata, putChannel).get();
blobIdStr = router.putBlob(blobProperties, userMetadata, putChannel, new PutBlobOptionsBuilder().build()).get();
blobId = RouterUtils.getBlobIdFromString(blobIdStr, mockClusterMap);
}

Expand Down
Expand Up @@ -156,7 +156,8 @@ public void testRangeRequest() throws Exception {
private void testGetSuccess(int blobSize, GetBlobOptions options) throws Exception {
router = getNonBlockingRouter();
setOperationParams(blobSize, options);
String blobId = router.putBlob(putBlobProperties, putUserMetadata, putChannel).get();
String blobId =
router.putBlob(putBlobProperties, putUserMetadata, putChannel, new PutBlobOptionsBuilder().build()).get();
getBlobAndCompareContent(blobId);
// Test GetBlobInfoOperation, regardless of options passed in.
this.options = new GetBlobOptionsBuilder().operationType(GetBlobOptions.OperationType.BlobInfo).build();
Expand Down Expand Up @@ -227,7 +228,8 @@ private void testBadCallback(Callback<GetBlobResult> getBlobCallback, CountDownL
router = getNonBlockingRouter();
setOperationParams(chunkSize * 6 + 11, new GetBlobOptionsBuilder().build());
final CountDownLatch getBlobInfoCallbackCalled = new CountDownLatch(1);
String blobId = router.putBlob(putBlobProperties, putUserMetadata, putChannel).get();
String blobId =
router.putBlob(putBlobProperties, putUserMetadata, putChannel, new PutBlobOptionsBuilder().build()).get();
List<Future<GetBlobResult>> getBlobInfoFutures = new ArrayList<>();
List<Future<GetBlobResult>> getBlobDataFutures = new ArrayList<>();
GetBlobOptions infoOptions =
Expand Down Expand Up @@ -265,7 +267,7 @@ public void onCompletion(GetBlobResult result, Exception exception) {

// Test that GetManager is still operational
setOperationParams(chunkSize, new GetBlobOptionsBuilder().build());
blobId = router.putBlob(putBlobProperties, putUserMetadata, putChannel).get();
blobId = router.putBlob(putBlobProperties, putUserMetadata, putChannel, new PutBlobOptionsBuilder().build()).get();
getBlobAndCompareContent(blobId);
this.options = infoOptions;
getBlobAndCompareContent(blobId);
Expand All @@ -281,7 +283,8 @@ public void onCompletion(GetBlobResult result, Exception exception) {
public void testFailureOnAllPollThatSends() throws Exception {
router = getNonBlockingRouter();
setOperationParams(chunkSize, new GetBlobOptionsBuilder().build());
String blobId = router.putBlob(putBlobProperties, putUserMetadata, putChannel).get();
String blobId =
router.putBlob(putBlobProperties, putUserMetadata, putChannel, new PutBlobOptionsBuilder().build()).get();
mockSelectorState.set(MockSelectorState.ThrowExceptionOnSend);
Future future;
try {
Expand Down
Expand Up @@ -189,7 +189,7 @@ public Future<GetBlobResult> getBlob(String blobId, GetBlobOptions options, Call

@Override
public Future<String> putBlob(BlobProperties blobProperties, byte[] usermetadata, ReadableStreamChannel channel,
Callback<String> callback) {
PutBlobOptions options, Callback<String> callback) {
FutureResult<String> futureResult = new FutureResult<>();
handlePrechecks(futureResult, callback);
PostData postData = new PostData(blobProperties, usermetadata, channel, futureResult, callback);
Expand Down