Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

PutManager initial implementation #255

Merged
merged 23 commits into from
Apr 7, 2016
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
public class RouterConfig {

/**
* Number of background threads to perform coordinator operations in CoordinatorBackedRouter.
* Number of independent scaling units for the router.
*/
@Config("router.scaling.unit.count")
@Default("1")
Expand Down Expand Up @@ -61,13 +61,41 @@ public class RouterConfig {
@Default("1000")
public final int routerConnectionCheckoutTimeoutMs;

/**
* Timeout for requests issued by the router to the network layer.
*/
@Config("router.request.timeout.ms")
@Default("2000")
public final int routerRequestTimeoutMs;

/**
* The max chunk size to be used for put operations.
*/
@Config("router.max.put.chunk.size.bytes")
@Default("4*1024*1024")
public final int routerMaxPutChunkSizeBytes;

/**
* The maximum number of parallel requests issued at a time by the put manager for a chunk.
*/
@Config("router.put.request.parallelism")
@Default("3")
public final int routerPutRequestParallelism;

/**
* The minimum number of successful responses required for a put operation.
*/
@Config("router.put.success.target")
@Default("2")
public final int routerPutSuccessTarget;

/**
* The maximum number of times to retry putting any chunk of a put operation
*/
@Config("router.max.slipped.put.attempts")
@Default("1")
public final int routerMaxSlippedPutAttempts;

/**
* Create a RouterConfig instance.
* @param verifiableProperties the properties map to refer to.
Expand All @@ -82,6 +110,10 @@ public RouterConfig(VerifiableProperties verifiableProperties) {
verifiableProperties.getIntInRange("router.scaling.unit.max.connections.per.port.ssl", 2, 1, 20);
routerConnectionCheckoutTimeoutMs =
verifiableProperties.getIntInRange("router.connection.checkout.timeout.ms", 1000, 1, 5000);
routerRequestTimeoutMs = verifiableProperties.getInt("router.request.timeout.ms", 2000);
routerMaxPutChunkSizeBytes = verifiableProperties.getInt("router.max.put.chunk.size.bytes", 4 * 1024 * 1024);
routerPutRequestParallelism = verifiableProperties.getInt("router.put.request.parallelism", 3);
routerPutSuccessTarget = verifiableProperties.getInt("router.put.success.target", 2);
routerMaxSlippedPutAttempts = verifiableProperties.getInt("router.max.slipped.put.attempts", 1);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,7 @@ public static RestServiceErrorCode getRestServiceErrorCode(RouterErrorCode route
case BlobTooLarge:
case InvalidBlobId:
case InvalidPutArgument:
case BadInputChannel:
return BadRequest;
case BlobDeleted:
case BlobExpired:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,10 @@ public enum RouterErrorCode {
* Blob is too large. Cannot store blob of such size.
*/
BlobTooLarge,
/**
* Unexpected error reading from the input channel for puts.
*/
BadInputChannel,
/**
* Insufficient capacity available in Ambry for object to be stored.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,4 +38,17 @@ public RouterException(Throwable e, RouterErrorCode errorCode) {
public RouterErrorCode getErrorCode() {
return this.errorCode;
}

@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}

RouterException that = (RouterException) o;
return this.errorCode == that.errorCode && this.getCause() == that.getCause();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,15 +16,14 @@
import com.github.ambry.clustermap.ClusterMap;
import com.github.ambry.clustermap.ReplicaEventType;
import com.github.ambry.clustermap.ReplicaId;

import com.github.ambry.network.ConnectionPoolTimeoutException;
import java.io.IOException;
import java.net.SocketException;


/**
* ResponseHandler can be used by components like the Coordinator whenever an operation encounters an error or
* an exception to delegate the responsibility of conveying appropriate replica related errors to the cluster map.
* ResponseHandler can be used by components whenever an operation encounters an error or an exception, to delegate
* the responsibility of conveying appropriate replica related errors to the cluster map.
* It can also be used to convey the information that a replica related operation was successful.
* The cluster map uses this information to set soft states and dynamically handle failures.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -618,7 +618,7 @@ public static BlobData deserializeBlobRecord(CrcInputStream crcStream)
* |(2 bytes)| (4 bytes) | | | ...... |
* | | | | | |
* - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
* version - The version of the blob property record
* version - The version of the metadata content record
*
* no of keys - total number of keys
*
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
/**
* Copyright 2015 LinkedIn Corp. All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
*/
package com.github.ambry.messageformat;

import com.github.ambry.store.StoreKey;
import com.github.ambry.store.StoreKeyFactory;
import com.github.ambry.utils.ByteBufferInputStream;
import java.io.DataInputStream;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.List;


/**
* A class to serialize and deserialize MetadataContent which forms the content of a Metadata Blob.
*/
public class MetadataContentSerDe {
/**
* Serialize the input list of keys that form the metadata content.
* @param keys the input list of keys that form the metadata content.
* @return a ByteBuffer containing the serialized output.
*/
public static ByteBuffer serializeMetadataContent(List<StoreKey> keys) {
int bufSize =
MessageFormatRecord.Metadata_Content_Format_V1.getMetadataContentSize(keys.get(0).sizeInBytes(), keys.size());
ByteBuffer outputBuf = ByteBuffer.allocate(bufSize);
MessageFormatRecord.Metadata_Content_Format_V1.serializeMetadataContentRecord(outputBuf, keys);
return outputBuf;
}

/**
* Deserialize the serialized metadata content in the input ByteBuffer using the given {@link StoreKeyFactory} as a
* reference.
* @param buf ByteBuffer containing the serialized metadata content.
* @param storeKeyFactory the {@link StoreKeyFactory} to use to deserialize the content.
* @return a list of {@link StoreKey} containing the deserialized output.
* @throws IOException if an IOException is encountered during deserialization.
* @throws MessageFormatException if an unknown version is encountered in the header of the serialized input.
*/
public static List<StoreKey> deserializeMetadataContentRecord(ByteBuffer buf, StoreKeyFactory storeKeyFactory)
throws IOException, MessageFormatException {
int version = buf.getShort();
switch (version) {
case MessageFormatRecord.Message_Header_Version_V1:
return MessageFormatRecord.Metadata_Content_Format_V1
.deserializeMetadataContentRecord(new DataInputStream(new ByteBufferInputStream(buf)), storeKeyFactory);
default:
throw new MessageFormatException("Unknown version encountered for MetadataContent: " + version,
MessageFormatErrorCodes.Unknown_Format_Version);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -34,14 +34,14 @@ class ConnectionTracker {
/**
* Instantiates a ConnectionTracker
* @param maxConnectionsPerPortPlainText the connection pool limit for plain text connections to a (host, port)
* @param maxConnectionsPerPortPlainSsl the connection pool limit for ssl connections to a (host, port)
* @param maxConnectionsPerPortSsl the connection pool limit for ssl connections to a (host, port)
*/
ConnectionTracker(int maxConnectionsPerPortPlainText, int maxConnectionsPerPortPlainSsl) {
ConnectionTracker(int maxConnectionsPerPortPlainText, int maxConnectionsPerPortSsl) {
hostPortToPoolManager = new HashMap<String, HostPortPoolManager>();
connectionIdToPoolManager = new HashMap<String, HostPortPoolManager>();
totalManagedConnectionsCount = 0;
this.maxConnectionsPerPortPlainText = maxConnectionsPerPortPlainText;
this.maxConnectionsPerPortSsl = maxConnectionsPerPortPlainSsl;
this.maxConnectionsPerPortSsl = maxConnectionsPerPortSsl;
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,16 +54,17 @@ public class NetworkClient implements Closeable {
/**
* Instantiates a NetworkClient.
* @param selector the {@link Selector} for this NetworkClient
* @param connectionTracker the {@link ConnectionTracker} for this NetworkClient
* @param maxConnectionsPerPortPlainText the maximum number of connections per node per plain text port
* @param maxConnectionsPerPortSsl the maximum number of connections per node per ssl port
* @param networkConfig the {@link NetworkConfig} for this NetworkClient
* @param checkoutTimeoutMs the maximum time a request should remain in this NetworkClient's pending queue waiting
* for an available connection to its destination.
* @param time The Time instance to use.
*/
public NetworkClient(Selector selector, ConnectionTracker connectionTracker, NetworkConfig networkConfig,
int checkoutTimeoutMs, Time time) {
public NetworkClient(Selector selector, NetworkConfig networkConfig, int maxConnectionsPerPortPlainText,
int maxConnectionsPerPortSsl, int checkoutTimeoutMs, Time time) {
this.selector = selector;
this.connectionTracker = connectionTracker;
this.connectionTracker = new ConnectionTracker(maxConnectionsPerPortPlainText, maxConnectionsPerPortSsl);
this.networkConfig = networkConfig;
this.checkoutTimeoutMs = checkoutTimeoutMs;
this.time = time;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,9 @@
* A factory class used to get new instances of a {@link NetworkClient}
*/
public class NetworkClientFactory {
private final NetworkMetrics networkMetrics;
private final NetworkConfig networkConfig;
private final SSLFactory sslFactory;
protected final NetworkMetrics networkMetrics;
protected final NetworkConfig networkConfig;
protected final SSLFactory sslFactory;
private final int maxConnectionsPerPortPlainText;
private final int maxConnectionsPerPortSsl;
private final int connectionCheckoutTimeoutMs;
Expand Down Expand Up @@ -58,9 +58,8 @@ public NetworkClientFactory(NetworkMetrics networkMetrics, NetworkConfig network
public NetworkClient getNetworkClient()
throws IOException {
Selector selector = new Selector(networkMetrics, time, sslFactory);
ConnectionTracker connectionTracker =
new ConnectionTracker(maxConnectionsPerPortPlainText, maxConnectionsPerPortSsl);
return new NetworkClient(selector, connectionTracker, networkConfig, connectionCheckoutTimeoutMs, time);
return new NetworkClient(selector, networkConfig, maxConnectionsPerPortPlainText, maxConnectionsPerPortSsl,
connectionCheckoutTimeoutMs, time);
}
}

Original file line number Diff line number Diff line change
Expand Up @@ -69,9 +69,9 @@ public NetworkClientTest()
VerifiableProperties vprops = new VerifiableProperties(props);
NetworkConfig networkConfig = new NetworkConfig(vprops);
selector = new MockSelector();
ConnectionTracker connectionTracker = new ConnectionTracker(MAX_PORTS_PLAIN_TEXT, MAX_PORTS_SSL);
time = new MockTime();
networkClient = new NetworkClient(selector, connectionTracker, networkConfig, CHECKOUT_TIMEOUT_MS, time);
networkClient =
new NetworkClient(selector, networkConfig, MAX_PORTS_PLAIN_TEXT, MAX_PORTS_SSL, CHECKOUT_TIMEOUT_MS, time);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,20 +14,34 @@
package com.github.ambry.router;

import com.github.ambry.clustermap.ClusterMap;
import com.github.ambry.commons.ResponseHandler;
import com.github.ambry.config.RouterConfig;
import com.github.ambry.network.RequestInfo;
import java.nio.ByteBuffer;
import com.github.ambry.network.ResponseInfo;
import com.github.ambry.notification.NotificationSystem;
import com.github.ambry.utils.Time;
import java.util.List;


/**
* DeleteManager handles Delete operations. This is just a template for now.
*/
class DeleteManager {
public DeleteManager(ClusterMap clusterMap) {
/**
* Create a DeleteManager
* @param clusterMap The {@link ClusterMap} of the cluster.
* @param responseHandler The {@link ResponseHandler} used to notify failures for failure detection.
* @param notificationSystem The {@link NotificationSystem} used for notifying blob creations.
* @param routerConfig The {@link RouterConfig} containing the configs for the PutManager.
* @param routerMetrics The {@link NonBlockingRouterMetrics} to be used for reporting metrics.
* @param time The {@link Time} instance to use.
*/
DeleteManager(ClusterMap clusterMap, ResponseHandler responseHandler, NotificationSystem notificationSystem,
RouterConfig routerConfig, NonBlockingRouterMetrics routerMetrics, Time time) {
//@todo
}

public FutureResult<Void> submitDeleteBlobOperation(long operationId, String blobId, FutureResult<Void> futureResult,
public FutureResult<Void> submitDeleteBlobOperation(String blobId, FutureResult<Void> futureResult,
Callback<Void> callback) {
//@todo
return null;
Expand All @@ -37,7 +51,11 @@ public void poll(List<RequestInfo> requests) {
//@todo
}

void handleResponse(ByteBuffer response) {
void handleResponse(ResponseInfo responseInfo) {
// @todo
}

void close() {
// @todo
}
}
26 changes: 21 additions & 5 deletions ambry-router/src/main/java/com.github.ambry.router/GetManager.java
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,12 @@
package com.github.ambry.router;

import com.github.ambry.clustermap.ClusterMap;
import com.github.ambry.commons.ResponseHandler;
import com.github.ambry.config.RouterConfig;
import com.github.ambry.messageformat.BlobInfo;
import com.github.ambry.network.RequestInfo;
import java.nio.ByteBuffer;
import com.github.ambry.network.ResponseInfo;
import com.github.ambry.utils.Time;
import java.util.List;


Expand All @@ -25,16 +28,25 @@
* These methods have to be thread safe.
*/
class GetManager {
GetManager(ClusterMap clusterMap) {
/**
* Create a GetManager
* @param clusterMap The {@link ClusterMap} of the cluster.
* @param responseHandler The {@link ResponseHandler} used to notify failures for failure detection.
* @param routerConfig The {@link RouterConfig} containing the configs for the PutManager.
* @param routerMetrics The {@link NonBlockingRouterMetrics} to be used for reporting metrics.
* @param time The {@link Time} instance to use.
*/
GetManager(ClusterMap clusterMap, ResponseHandler responseHandler, RouterConfig routerConfig,
NonBlockingRouterMetrics routerMetrics, Time time) {
//@todo
}

void submitGetBlobOperation(long operationId, String blobId, FutureResult<ReadableStreamChannel> futureResult,
void submitGetBlobOperation(String blobId, FutureResult<ReadableStreamChannel> futureResult,
Callback<ReadableStreamChannel> callback) {
//@todo
}

void submitGetBlobInfoOperation(long operationId, String blobId, FutureResult<BlobInfo> futureResult,
void submitGetBlobInfoOperation(String blobId, FutureResult<BlobInfo> futureResult,
Callback<BlobInfo> callback) {
//@todo
}
Expand All @@ -43,7 +55,11 @@ void poll(List<RequestInfo> requests) {
//@todo
}

void handleResponse(ByteBuffer response) {
void handleResponse(ResponseInfo responseInfo) {
// @todo
}

void close() {
// @todo
}
}
Expand Down
Loading