Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

GetManager implementation with GetBlobInfo support #276

Merged
merged 6 commits into from
May 10, 2016
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 24 additions & 0 deletions ambry-api/src/main/java/com.github.ambry/config/RouterConfig.java
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,27 @@ public class RouterConfig {
@Default("2")
public final int routerDeleteSuccessTarget;

/**
* The maximum number of parallel requests issued at a time by the get manager for a get operation on a chunk.
*/
@Config("router.get.request.parallelism")
@Default("2")
public final int routerGetRequestParallelism;

/**
* The minimum number of successful responses required for a get operation on a chunk.
*/
@Config("router.get.success.target")
@Default("1")
public final int routerGetSuccessTarget;

/**
* Indicates whether get operations are allowed to make requests to nodes in remote data centers.
*/
@Config("router.get.cross.dc.enabled")
@Default("true")
public final boolean routerGetCrossDcEnabled;

/**
* Create a RouterConfig instance.
* @param verifiableProperties the properties map to refer to.
Expand All @@ -131,5 +152,8 @@ public RouterConfig(VerifiableProperties verifiableProperties) {
routerMaxSlippedPutAttempts = verifiableProperties.getInt("router.max.slipped.put.attempts", 1);
routerDeleteRequestParallelism = verifiableProperties.getInt("router.delete.request.parallelism", 12);
routerDeleteSuccessTarget = verifiableProperties.getInt("router.delete.success.target", 2);
routerGetRequestParallelism = verifiableProperties.getInt("router.get.request.parallelism", 2);
routerGetSuccessTarget = verifiableProperties.getInt("router.get.success.target", 1);
routerGetCrossDcEnabled = verifiableProperties.getBoolean("router.get.cross.dc.enabled", true);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,10 @@
import com.github.ambry.messageformat.BlobOutput;
import com.github.ambry.messageformat.BlobProperties;
import com.github.ambry.messageformat.MessageFormatRecord;
import com.github.ambry.network.Send;
import com.github.ambry.network.BlockingChannel;
import com.github.ambry.network.ByteBufferSend;
import com.github.ambry.network.ChannelOutput;
import com.github.ambry.network.Send;
import com.github.ambry.protocol.DeleteRequest;
import com.github.ambry.protocol.DeleteResponse;
import com.github.ambry.protocol.GetRequest;
Expand All @@ -31,7 +32,6 @@
import com.github.ambry.protocol.RequestOrResponse;
import com.github.ambry.store.MessageInfo;
import com.github.ambry.utils.Crc32;

import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,7 @@
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
*/
package com.github.ambry.coordinator;

import com.github.ambry.network.Send;
package com.github.ambry.network;

import java.io.IOException;
import java.nio.ByteBuffer;
Expand All @@ -24,7 +22,7 @@
* A byte buffer version of Send that sends a materialized byte buffer. This breaks the contract of Send (only
* materialize onto the network) and so is only suitable for use in tests.
*/
class ByteBufferSend implements Send {
public class ByteBufferSend implements Send {
private final ByteBuffer buffer;

public ByteBufferSend(ByteBuffer byteBuffer)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,12 +31,12 @@
public class PartitionRequestInfo {

private final PartitionId partitionId;
private final ArrayList<BlobId> blobIds;
private final List<BlobId> blobIds;
private long totalIdSize;

private static final int Blob_Id_Count_Size_InBytes = 4;

public PartitionRequestInfo(PartitionId partitionId, ArrayList<BlobId> blobIds) {
public PartitionRequestInfo(PartitionId partitionId, List<BlobId> blobIds) {
this.partitionId = partitionId;
this.blobIds = blobIds;
totalIdSize = 0;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,11 +25,9 @@
import com.github.ambry.utils.Time;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicBoolean;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -43,7 +41,6 @@ class DeleteManager {
private final HashMap<Integer, DeleteOperation> correlationIdToDeleteOperation;
private final NotificationSystem notificationSystem;
private final Time time;
private final AtomicBoolean isOpen = new AtomicBoolean(true);
private final ResponseHandler responseHandler;
private final NonBlockingRouterMetrics routerMetrics;
private final ClusterMap clusterMap;
Expand All @@ -54,7 +51,7 @@ class DeleteManager {
/**
* Used by a {@link DeleteOperation} to associate a {@code CorrelationId} to a {@link DeleteOperation}.
*/
private class DeleteRequestRegistrationCallbackImpl implements DeleteRequestRegistrationCallback {
private class DeleteRequestRegistrationCallbackImpl implements RequestRegistrationCallback<DeleteOperation> {
private List<RequestInfo> requestListToFill;

@Override
Expand Down Expand Up @@ -112,30 +109,28 @@ void submitDeleteBlobOperation(String blobIdString, FutureResult<Void> futureRes
* @param requestListToFill list to be filled with the requests created.
*/
public void poll(List<RequestInfo> requestListToFill) {
Iterator<DeleteOperation> deleteOperationIterator = deleteOperations.iterator();
requestRegistrationCallback.requestListToFill = requestListToFill;
while (deleteOperationIterator.hasNext()) {
DeleteOperation op = deleteOperationIterator.next();
for (DeleteOperation op : deleteOperations) {
op.poll(requestRegistrationCallback);
if (op.isOperationComplete()) {
deleteOperationIterator.remove();
if (op.isOperationComplete() && deleteOperations.remove(op)) {
// In order to ensure that an operation is completed only once, call onComplete() only at the place where the
// operation actually gets removed from the set of operations. See comment within close().
onComplete(op);
}
}
}

/**
* Handles responses received for each of the {@link DeleteOperation} within this delete manager.
* @param responseInfo A response from {@link com.github.ambry.network.NetworkClient}
* @param responseInfo the {@link ResponseInfo} containing the response.
*/
void handleResponse(ResponseInfo responseInfo) {
int correlationId = ((DeleteRequest) responseInfo.getRequest()).getCorrelationId();
DeleteOperation deleteOperation = correlationIdToDeleteOperation.remove(correlationId);
// If it is still an active operation, hand over the response. Otherwise, ignore.
if (deleteOperations.contains(deleteOperation)) {
deleteOperation.handleResponse(responseInfo);
if (deleteOperation.isOperationComplete()) {
deleteOperations.remove(deleteOperation);
if (deleteOperation.isOperationComplete() && deleteOperations.remove(deleteOperation)) {
onComplete(deleteOperation);
}
}
Expand All @@ -144,7 +139,7 @@ void handleResponse(ResponseInfo responseInfo) {
/**
* Called when the delete operation is completed. The {@code DeleteManager} also finishes the delete operation
* by performing the callback and notification.
* @param op The {@lilnk DeleteOperation} that has completed.
* @param op The {@link DeleteOperation} that has completed.
*/
void onComplete(DeleteOperation op) {
if (op.getOperationException() == null) {
Expand All @@ -159,13 +154,13 @@ void onComplete(DeleteOperation op) {
* will have no effect.
*/
void close() {
if (isOpen.compareAndSet(true, false)) {
Iterator<DeleteOperation> iter = deleteOperations.iterator();
while (iter.hasNext()) {
DeleteOperation deleteOperation = iter.next();
NonBlockingRouter.completeOperation(deleteOperation.getFutureResult(), deleteOperation.getCallback(), null,
for (DeleteOperation op : deleteOperations) {
// There is a rare scenario where the operation gets removed from this set and gets completed concurrently by
// the RequestResponseHandler thread when it is in poll() or handleResponse(). In order to avoid the completion
// from happening twice, complete it here only if the remove was successful.
if (deleteOperations.remove(op)) {
NonBlockingRouter.completeOperation(op.getFutureResult(), op.getCallback(), null,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why this if condition is removed?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

close() can only be called once. See the caller.

new RouterException("Aborted operation because Router is closed.", RouterErrorCode.RouterClosed));
iter.remove();
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -95,15 +95,21 @@ class DeleteOperation {

/**
* Gets a list of {@link DeleteRequest} for sending to replicas.
* @param requestFillCallback the {@link DeleteRequestRegistrationCallback} to call for every request
* @param requestRegistrationCallback the {@link RequestRegistrationCallback} to call for every request
* that gets created as part of this poll operation.
*/
void poll(DeleteRequestRegistrationCallback requestFillCallback) {
void poll(RequestRegistrationCallback<DeleteOperation> requestRegistrationCallback) {
cleanupExpiredInflightRequests();
checkAndMaybeComplete();
if (isOperationComplete()) {
return;
if (!isOperationComplete()) {
fetchRequests(requestRegistrationCallback);
}
}

/**
* Fetch {@link DeleteRequest}s to send for the operation.
*/
private void fetchRequests(RequestRegistrationCallback<DeleteOperation> requestRegistrationCallback) {
Iterator<ReplicaId> replicaIterator = operationTracker.getReplicaIterator();
while (replicaIterator.hasNext()) {
ReplicaId replica = replicaIterator.next();
Expand All @@ -112,7 +118,7 @@ void poll(DeleteRequestRegistrationCallback requestFillCallback) {
DeleteRequest deleteRequest = createDeleteRequest();
inflightRequestInfos.put(deleteRequest.getCorrelationId(), new InflightRequestInfo(time.milliseconds(), replica));
RequestInfo requestInfo = new RequestInfo(hostname, port, deleteRequest);
requestFillCallback.registerRequestToSend(this, requestInfo);
requestRegistrationCallback.registerRequestToSend(this, requestInfo);
replicaIterator.remove();
}
}
Expand Down Expand Up @@ -178,8 +184,8 @@ void handleResponse(ResponseInfo responseInfo) {
* A wrapper class that is used to check if a request has been expired.
*/
private class InflightRequestInfo {
private final long submissionTime;
private final ReplicaId replica;
final long submissionTime;
final ReplicaId replica;

InflightRequestInfo(long submissionTime, ReplicaId replica) {
this.submissionTime = submissionTime;
Expand Down

This file was deleted.

Loading