Skip to content

Commit

Permalink
Loosen context type constraints between dispatcher and commands.
Browse files Browse the repository at this point in the history
  • Loading branch information
pferraro committed Nov 23, 2016
1 parent b9a4700 commit 5877899
Show file tree
Hide file tree
Showing 6 changed files with 23 additions and 21 deletions.
Expand Up @@ -43,7 +43,7 @@ public interface CommandDispatcher<C> extends AutoCloseable {
* @return the result of the command execution
* @throws CommandDispatcherException if the command could not be sent
*/
<R> CommandResponse<R> executeOnNode(Command<R, C> command, Node node) throws CommandDispatcherException;
<R> CommandResponse<R> executeOnNode(Command<R, ? super C> command, Node node) throws CommandDispatcherException;

/**
* Execute the specified command on all nodes in the group, excluding the specified nodes
Expand All @@ -54,7 +54,7 @@ public interface CommandDispatcher<C> extends AutoCloseable {
* @return a map of command execution results per node
* @throws CommandDispatcherException if the command could not be broadcast
*/
<R> Map<Node, CommandResponse<R>> executeOnCluster(Command<R, C> command, Node... excludedNodes) throws CommandDispatcherException;
<R> Map<Node, CommandResponse<R>> executeOnCluster(Command<R, ? super C> command, Node... excludedNodes) throws CommandDispatcherException;

/**
* Submits the specified command on the specified node for execution.
Expand All @@ -65,7 +65,7 @@ public interface CommandDispatcher<C> extends AutoCloseable {
* @return the result of the command execution
* @throws CommandDispatcherException if the command could not be sent
*/
<R> Future<R> submitOnNode(Command<R, C> command, Node node) throws CommandDispatcherException;
<R> Future<R> submitOnNode(Command<R, ? super C> command, Node node) throws CommandDispatcherException;

/**
* Submits the specified command on all nodes in the group, excluding the specified nodes.
Expand All @@ -76,7 +76,7 @@ public interface CommandDispatcher<C> extends AutoCloseable {
* @return a map of command execution results per node.
* @throws CommandDispatcherException if the command could not be broadcast
*/
<R> Map<Node, Future<R>> submitOnCluster(Command<R, C> command, Node... excludedNodes) throws CommandDispatcherException;
<R> Map<Node, Future<R>> submitOnCluster(Command<R, ? super C> command, Node... excludedNodes) throws CommandDispatcherException;

/**
* Closes any resources used by this dispatcher.
Expand Down
Expand Up @@ -94,7 +94,7 @@ public void close() {
}

@Override
public <R> Map<Node, CommandResponse<R>> executeOnCluster(Command<R, C> command, Node... excludedNodes) throws CommandDispatcherException {
public <R> Map<Node, CommandResponse<R>> executeOnCluster(Command<R, ? super C> command, Node... excludedNodes) throws CommandDispatcherException {
Message message = this.createMessage(command);
RequestOptions options = this.createRequestOptions(excludedNodes);
try {
Expand All @@ -116,7 +116,7 @@ public <R> Map<Node, CommandResponse<R>> executeOnCluster(Command<R, C> command,
}

@Override
public <R> Map<Node, Future<R>> submitOnCluster(Command<R, C> command, Node... excludedNodes) throws CommandDispatcherException {
public <R> Map<Node, Future<R>> submitOnCluster(Command<R, ? super C> command, Node... excludedNodes) throws CommandDispatcherException {
Map<Node, Future<R>> results = new ConcurrentHashMap<>();
FutureListener<RspList<R>> listener = future -> {
try {
Expand All @@ -129,8 +129,10 @@ public <R> Map<Node, Future<R>> submitOnCluster(Command<R, C> command, Node... e
Thread.currentThread().interrupt();
}
};
Message message = this.createMessage(command);
RequestOptions options = this.createRequestOptions(excludedNodes);
try {
Future<? extends Map<Address, Rsp<R>>> futureResponses = this.dispatcher.castMessageWithFuture(null, this.createMessage(command), this.createRequestOptions(excludedNodes), listener);
Future<? extends Map<Address, Rsp<R>>> futureResponses = this.dispatcher.castMessageWithFuture(null, message, options, listener);
Set<Node> excluded = (excludedNodes != null) ? new HashSet<>(Arrays.asList(excludedNodes)) : Collections.<Node>emptySet();
for (Address address: this.dispatcher.getChannel().getView().getMembers()) {
Node node = this.factory.createNode(address);
Expand Down Expand Up @@ -181,7 +183,7 @@ public boolean isDone() {
}

@Override
public <R> CommandResponse<R> executeOnNode(Command<R, C> command, Node node) throws CommandDispatcherException {
public <R> CommandResponse<R> executeOnNode(Command<R, ? super C> command, Node node) throws CommandDispatcherException {
// Bypass MessageDispatcher if target node is local
if (this.isLocal(node)) {
return this.localDispatcher.executeOnNode(command, node);
Expand All @@ -203,7 +205,7 @@ public <R> CommandResponse<R> executeOnNode(Command<R, C> command, Node node) th
}

@Override
public <R> Future<R> submitOnNode(Command<R, C> command, Node node) throws CommandDispatcherException {
public <R> Future<R> submitOnNode(Command<R, ? super C> command, Node node) throws CommandDispatcherException {
// Bypass MessageDispatcher if target node is local
if (this.isLocal(node)) {
return this.localDispatcher.submitOnNode(command, node);
Expand All @@ -217,11 +219,11 @@ public <R> Future<R> submitOnNode(Command<R, C> command, Node node) throws Comma
}
}

private <R> Message createMessage(Command<R, C> command) {
private <R> Message createMessage(Command<R, ? super C> command) {
return this.createMessage(command, null);
}

private <R> Message createMessage(Command<R, C> command, Node node) {
private <R> Message createMessage(Command<R, ? super C> command, Node node) {
try {
return new Message(getAddress(node), this.getLocalAddress(), this.marshaller.marshal(command));
} catch (IOException e) {
Expand Down
Expand Up @@ -46,7 +46,7 @@ public CommandDispatcherMarshaller(MarshallingContext context, Object id) {
}

@Override
public <R> byte[] marshal(Command<R, C> command) throws IOException {
public <R> byte[] marshal(Command<R, ? super C> command) throws IOException {
int version = this.context.getCurrentVersion();
ByteArrayOutputStream bytes = new ByteArrayOutputStream();
try (DataOutputStream output = new DataOutputStream(bytes)) {
Expand Down
Expand Up @@ -38,5 +38,5 @@ public interface CommandMarshaller<C> {
* @return a serialized command.
* @throws IOException if marshalling fails.
*/
<R> byte[] marshal(Command<R, C> command) throws IOException;
<R> byte[] marshal(Command<R, ? super C> command) throws IOException;
}
Expand Up @@ -73,7 +73,7 @@ public LocalCommandDispatcher(Node node, C context, ExecutorService executor) {
}

@Override
public <R> CommandResponse<R> executeOnNode(Command<R, C> command, Node node) {
public <R> CommandResponse<R> executeOnNode(Command<R, ? super C> command, Node node) {
if (!this.node.equals(node)) {
throw new UnreachableException((Address) null);
}
Expand All @@ -85,7 +85,7 @@ public <R> CommandResponse<R> executeOnNode(Command<R, C> command, Node node) {
}

@Override
public <R> Map<Node, CommandResponse<R>> executeOnCluster(Command<R, C> command, Node... excludedNodes) {
public <R> Map<Node, CommandResponse<R>> executeOnCluster(Command<R, ? super C> command, Node... excludedNodes) {
Map<Node, CommandResponse<R>> results = new HashMap<>();
if ((excludedNodes == null) || (excludedNodes.length == 0) || !Arrays.asList(excludedNodes).contains(this.node)) {
results.put(this.node, this.executeOnNode(command, this.node));
Expand All @@ -94,7 +94,7 @@ public <R> Map<Node, CommandResponse<R>> executeOnCluster(Command<R, C> command,
}

@Override
public <R> Future<R> submitOnNode(final Command<R, C> command, Node node) {
public <R> Future<R> submitOnNode(final Command<R, ? super C> command, Node node) {
Callable<R> task = new Callable<R>() {
@Override
public R call() throws Exception {
Expand All @@ -105,7 +105,7 @@ public R call() throws Exception {
}

@Override
public <R> Map<Node, Future<R>> submitOnCluster(Command<R, C> command, Node... excludedNodes) {
public <R> Map<Node, Future<R>> submitOnCluster(Command<R, ? super C> command, Node... excludedNodes) {
Map<Node, Future<R>> results = new HashMap<>();
if ((excludedNodes == null) || (excludedNodes.length == 0) || !Arrays.asList(excludedNodes).contains(this.node)) {
results.put(this.node, this.submitOnNode(command, this.node));
Expand Down
Expand Up @@ -36,22 +36,22 @@ public void destroy() {
}

@Override
public <R> CommandResponse<R> executeOnNode(Command<R, Node> command, Node node) throws CommandDispatcherException {
public <R> CommandResponse<R> executeOnNode(Command<R, ? super Node> command, Node node) throws CommandDispatcherException {
return this.dispatcher.executeOnNode(command, node);
}

@Override
public <R> Map<Node, CommandResponse<R>> executeOnCluster(Command<R, Node> command, Node... excludedNodes) throws CommandDispatcherException {
public <R> Map<Node, CommandResponse<R>> executeOnCluster(Command<R, ? super Node> command, Node... excludedNodes) throws CommandDispatcherException {
return this.dispatcher.executeOnCluster(command, excludedNodes);
}

@Override
public <R> Future<R> submitOnNode(Command<R, Node> command, Node node) throws CommandDispatcherException {
public <R> Future<R> submitOnNode(Command<R, ? super Node> command, Node node) throws CommandDispatcherException {
return this.dispatcher.submitOnNode(command, node);
}

@Override
public <R> Map<Node, Future<R>> submitOnCluster(Command<R, Node> command, Node... excludedNodes) throws CommandDispatcherException {
public <R> Map<Node, Future<R>> submitOnCluster(Command<R, ? super Node> command, Node... excludedNodes) throws CommandDispatcherException {
return this.dispatcher.submitOnCluster(command, excludedNodes);
}

Expand Down

0 comments on commit 5877899

Please sign in to comment.