Skip to content
This repository has been archived by the owner on Dec 3, 2019. It is now read-only.

Commit

Permalink
Check supported server protocols during endpoint discovery
Browse files Browse the repository at this point in the history
  • Loading branch information
venkat1109 committed Mar 22, 2017
1 parent 0f14017 commit 3305233
Show file tree
Hide file tree
Showing 9 changed files with 178 additions and 385 deletions.
16 changes: 9 additions & 7 deletions lib/src/main/java/com/uber/cherami/client/CheramiClient.java
Expand Up @@ -40,6 +40,7 @@
import com.uber.cherami.ListConsumerGroupResult;
import com.uber.cherami.ListDestinationsRequest;
import com.uber.cherami.ListDestinationsResult;
import com.uber.cherami.ReadConsumerGroupHostsResult;
import com.uber.cherami.ReadConsumerGroupRequest;
import com.uber.cherami.ReadDestinationRequest;
import com.uber.cherami.ReadPublisherOptionsResult;
Expand All @@ -58,24 +59,24 @@
*
* Examples:
*
* Prod client that uses Hyperbahn for discovery:
* Prod client that uses auto service discovery:
*
* <code>
* MetricsClient client = new M3Client(m3Scope);
* MetricsClient client = new FooBarMetricsClient();
* ClientOptions options = new ClientOptions().setMetricsClient(client);
* CheramiClient client = CheramiClient.Builder().setClientOptions(options).build();
* </code>
*
* Staging client that uses hyperbahn for discovery: <code>
* Staging client that uses auto service discovery: <code>
* ClientOptions options = new ClientOptions();
* options.setMetricsClient(new M3Client(m3Scope));
* options.setMetricsClient(new FooBarMetricsClient());
* options.setDeploymentStr("staging");
* CheramiClient client = CheramiClient.Builder().setClientOptions(options).build();
* </code>
*
* Staging client that uses a specific ip:port: <code>
* ClientOptions options = new ClientOptions();
* options.setMetricsClient(new M3Client(m3Scope));
* options.setMetricsClient(new FooBarMetricsClient());
* options.setDeploymentStr("staging");
* CheramiClient client = CheramiClient.Builder(host, port).setClientOptions(options).build();
* </code>
Expand Down Expand Up @@ -377,7 +378,8 @@ ReadPublisherOptionsResult readPublisherOptions(String path)
* Destination path.
* @param consumerGroupName
* Consumer group name that needs to receive messages.
* @return List of host addresses, representing the streaming endpoints.
* @return ReadConsumerGroupHostsResult containing of the
* [protocol,hostAddrs] tuple.
* @throws BadRequestError
* If the request is bad.
* @throws EntityNotExistsError
Expand All @@ -387,7 +389,7 @@ ReadPublisherOptionsResult readPublisherOptions(String path)
* @throws IOException
* Any other network I/O error.
*/
List<HostAddress> readConsumerGroupHosts(String path, String consumerGroupName)
ReadConsumerGroupHostsResult readConsumerGroupHosts(String path, String consumerGroupName)
throws BadRequestError, EntityNotExistsError, EntityDisabledError, IOException;

/**
Expand Down
Expand Up @@ -77,6 +77,7 @@
import com.uber.cherami.ListDestinationsRequest;
import com.uber.cherami.ListDestinationsResult;
import com.uber.cherami.ReadConsumerGroupHostsRequest;
import com.uber.cherami.ReadConsumerGroupHostsResult;
import com.uber.cherami.ReadConsumerGroupRequest;
import com.uber.cherami.ReadDestinationHostsRequest;
import com.uber.cherami.ReadDestinationRequest;
Expand Down Expand Up @@ -746,7 +747,7 @@ public ReadPublisherOptionsResult readPublisherOptions(String path)
}

@Override
public List<HostAddress> readConsumerGroupHosts(String path, String consumerGroupName)
public ReadConsumerGroupHostsResult readConsumerGroupHosts(String path, String consumerGroupName)
throws BadRequestError, EntityNotExistsError, EntityDisabledError, IOException {

if (path.isEmpty()) {
Expand All @@ -766,7 +767,7 @@ public List<HostAddress> readConsumerGroupHosts(String path, String consumerGrou
thriftResponse = doRemoteCall(thriftRequest);
readConsumerGroupHosts_result result = thriftResponse.getBody(readConsumerGroupHosts_result.class);
if (thriftResponse.getResponseCode() == ResponseCode.OK) {
return result.getSuccess().getHostAddresses();
return result.getSuccess();
}
if (result != null && result.isSetRequestError()) {
throw result.getRequestError();
Expand Down
53 changes: 40 additions & 13 deletions lib/src/main/java/com/uber/cherami/client/CheramiConsumerImpl.java
Expand Up @@ -39,6 +39,9 @@
import com.uber.cherami.EntityDisabledError;
import com.uber.cherami.EntityNotExistsError;
import com.uber.cherami.HostAddress;
import com.uber.cherami.HostProtocol;
import com.uber.cherami.Protocol;
import com.uber.cherami.ReadConsumerGroupHostsResult;
import com.uber.cherami.client.ConnectionManager.ConnectionFactory;
import com.uber.cherami.client.ConnectionManager.ConnectionManagerClosedException;
import com.uber.cherami.client.ConnectionManager.EndpointFinder;
Expand All @@ -58,9 +61,6 @@ public class CheramiConsumerImpl implements CheramiConsumer, Reconfigurable {
private static final String OUTPUT_SERVICE_NAME = "cherami-outputhost";
private static final String OPEN_CONSUMER_API = "open_consumer_stream";

private static final int TCHANNEL_STREAMING_PORT = 4254;
private static final int WEBSOCKET_STREAMING_PORT = 6190;

private final String path;
private final String consumerGroupName;

Expand Down Expand Up @@ -243,10 +243,40 @@ private Pair<OutputHostConnection, DeliveryID> getAcknowledger(String token) {
return new Pair<OutputHostConnection, DeliveryID>(connMap.get(id.getAcknowledgerID()), id);
}

private EndpointsInfo toEndpointsInfo(ReadConsumerGroupHostsResult result) throws IOException {
int rpcPort = 0;
List<HostAddress> streamingEndpoints = null;

for (HostProtocol protocol : result.getHostProtocols()) {
if (protocol.getProtocol() == Protocol.TCHANNEL) {
if (protocol.getHostAddresses().size() > 0) {
// there is an implicit assumption that the TChannel
// endpoint must be supported by all output hosts. The
// tchannel/rpc port is used for sending ack/nacks.
rpcPort = protocol.getHostAddresses().get(0).getPort();
}
continue;
}
if (protocol.getProtocol() == Protocol.WS) {
// these endpoints must be used for data streaming
streamingEndpoints = protocol.getHostAddresses();
}
}

if (streamingEndpoints == null || streamingEndpoints.size() == 0) {
throw new IOException("readConsumerGroupHosts failed to return a websocket streaming endpoint");
}
if (rpcPort == 0) {
throw new IOException("readConsumerGroupHosts failed to return a rpc/tChannel endpoint");
}

return new EndpointsInfo(rpcPort, streamingEndpoints, ChecksumOption.CRC32IEEE);
}

private EndpointsInfo findConsumeEndpoints() throws IOException {
try {
List<HostAddress> hostAddrs = client.readConsumerGroupHosts(path, consumerGroupName);
return new EndpointsInfo(hostAddrs, ChecksumOption.CRC32IEEE);
ReadConsumerGroupHostsResult result = client.readConsumerGroupHosts(path, consumerGroupName);
return toEndpointsInfo(result);
} catch (EntityNotExistsError | EntityDisabledError e) {
throw new IllegalStateException(e);
} catch (BadRequestError e) {
Expand All @@ -258,14 +288,11 @@ private EndpointsInfo findConsumeEndpoints() throws IOException {
}
}

private OutputHostConnection newOutputHostConnection(String host, int port, ChecksumOption checksumOption)
private OutputHostConnection newOutputHostConnection(String host, int dataPort, int rpcPort,
ChecksumOption checksumOption)
throws IOException, InterruptedException {
// TODO: This will be gone once the server side is updated to not
// support TChannel streaming at all, which means
// the only port will be for websockets
int dataPort = (port == TCHANNEL_STREAMING_PORT) ? WEBSOCKET_STREAMING_PORT : port;
String wsUrl = String.format("ws://%s:%d/%s", host, dataPort, OPEN_CONSUMER_API);
OutputHostConnection connection = new OutputHostConnection(wsUrl, host, port, path, consumerGroupName,
OutputHostConnection connection = new OutputHostConnection(wsUrl, host, rpcPort, path, consumerGroupName,
subChannel, options, this, deliveryQueue, metricsReporter);
connection.open();
return connection;
Expand All @@ -274,9 +301,9 @@ private OutputHostConnection newOutputHostConnection(String host, int port, Chec
private ConnectionManager<OutputHostConnection> newConnectionManager() {
ConnectionFactory<OutputHostConnection> factory = new ConnectionFactory<OutputHostConnection>() {
@Override
public OutputHostConnection create(String host, int port, ChecksumOption checksumOption)
public OutputHostConnection create(String host, int dataPort, int rpcPort, ChecksumOption checksumOption)
throws IOException, InterruptedException {
return newOutputHostConnection(host, port, checksumOption);
return newOutputHostConnection(host, dataPort, rpcPort, checksumOption);
}
};
EndpointFinder finder = new EndpointFinder() {
Expand Down
52 changes: 39 additions & 13 deletions lib/src/main/java/com/uber/cherami/client/CheramiPublisherImpl.java
Expand Up @@ -38,6 +38,9 @@
import com.uber.cherami.ChecksumOption;
import com.uber.cherami.EntityDisabledError;
import com.uber.cherami.EntityNotExistsError;
import com.uber.cherami.HostAddress;
import com.uber.cherami.HostProtocol;
import com.uber.cherami.Protocol;
import com.uber.cherami.PutMessage;
import com.uber.cherami.ReadPublisherOptionsResult;
import com.uber.cherami.client.ConnectionManager.ConnectionFactory;
Expand All @@ -60,8 +63,6 @@ public class CheramiPublisherImpl implements CheramiPublisher, Reconfigurable {

private static final String OPEN_PUBLISHER_ENDPOINT = "open_publisher_stream";

private static final int CHERAMI_INPUTHOST_SERVER_PORT = 6189;

private final String path;
private final long msgIdPrefix;
private final AtomicLong msgCounter;
Expand Down Expand Up @@ -200,10 +201,40 @@ private PutMessageRequest createMessageRequest(PublisherMessage message, String
return new PutMessageRequest(msg, new CheramiFuture<SendReceipt>());
}

private EndpointsInfo toEndpointsInfo(ReadPublisherOptionsResult result) throws IOException {
int rpcPort = 0;
List<HostAddress> streamingEndpoints = null;

for (HostProtocol protocol : result.getHostProtocols()) {
if (protocol.getProtocol() == Protocol.TCHANNEL) {
if (protocol.getHostAddresses().size() > 0) {
// there is an implicit assumption that the TChannel
// endpoint will be supported by all input hosts. The
// tchannel/rpc port is used for sending ack/nacks.
rpcPort = protocol.getHostAddresses().get(0).getPort();
}
continue;
}
if (protocol.getProtocol() == Protocol.WS) {
// these endpoints must be used for data streaming
streamingEndpoints = protocol.getHostAddresses();
}
}

if (streamingEndpoints == null || streamingEndpoints.size() == 0) {
throw new IOException("readPublisherOptions failed to return a websocket streaming endpoint");
}
if (rpcPort == 0) {
logger.warn("readPublisherOptions failed to return a rpc/tChannel endpoint");
}

return new EndpointsInfo(rpcPort, streamingEndpoints, result.getChecksumOption());
}

private EndpointsInfo findPublishEndpoints() throws IOException {
try {
ReadPublisherOptionsResult result = client.readPublisherOptions(path);
return new EndpointsInfo(result.getHostAddresses(), result.getChecksumOption());
return toEndpointsInfo(result);
} catch (EntityNotExistsError | EntityDisabledError e) {
throw new IllegalStateException(e);
} catch (BadRequestError e) {
Expand Down Expand Up @@ -234,9 +265,9 @@ private List<InputHostConnection> getAvailableConnections() {
private ConnectionManager<InputHostConnection> newConnectionManager() {
ConnectionFactory<InputHostConnection> factory = new ConnectionFactory<InputHostConnection>() {
@Override
public InputHostConnection create(String host, int port, ChecksumOption checksumOption)
public InputHostConnection create(String host, int dataPort, int rpcPort, ChecksumOption checksumOption)
throws IOException, InterruptedException {
return newInputHostConnection(host, port, checksumOption);
return newInputHostConnection(host, dataPort, rpcPort, checksumOption);
}
};
EndpointFinder finder = new EndpointFinder() {
Expand All @@ -248,15 +279,10 @@ public EndpointsInfo find() throws IOException {
return new ConnectionManager<InputHostConnection>("publisher", factory, finder);
}

private InputHostConnection newInputHostConnection(String host, int port, ChecksumOption checksumOption)
private InputHostConnection newInputHostConnection(String host, int dataPort, int rpcPort,
ChecksumOption checksumOption)
throws IOException, InterruptedException {
// TODO: This will be gone once the server side is updated to not
// support TChannel streaming at all, which means
// the only port will be for websockets
if (port == 4240) {
port = CHERAMI_INPUTHOST_SERVER_PORT;
}
String serverUrl = String.format("ws://%s:%d/%s", host, port, OPEN_PUBLISHER_ENDPOINT);
String serverUrl = String.format("ws://%s:%d/%s", host, dataPort, OPEN_PUBLISHER_ENDPOINT);
InputHostConnection connection = new InputHostConnection(serverUrl, path, checksumOption, options, this,
messageQueue, metricsReporter);
connection.open();
Expand Down

0 comments on commit 3305233

Please sign in to comment.