Skip to content

Commit

Permalink
Specify CommandDispatcher timeout as a Duration.
Browse files Browse the repository at this point in the history
  • Loading branch information
pferraro committed Aug 24, 2018
1 parent 2df5c44 commit 60aa086
Show file tree
Hide file tree
Showing 4 changed files with 16 additions and 12 deletions.
Expand Up @@ -22,6 +22,7 @@
package org.wildfly.clustering.server.dispatcher; package org.wildfly.clustering.server.dispatcher;


import java.io.IOException; import java.io.IOException;
import java.time.Duration;
import java.util.Arrays; import java.util.Arrays;
import java.util.Collections; import java.util.Collections;
import java.util.HashSet; import java.util.HashSet;
Expand Down Expand Up @@ -68,12 +69,12 @@ public boolean needMoreResponses() {
private final MessageDispatcher dispatcher; private final MessageDispatcher dispatcher;
private final CommandMarshaller<C> marshaller; private final CommandMarshaller<C> marshaller;
private final Group<Address> group; private final Group<Address> group;
private final long timeout; private final Duration timeout;
private final CommandDispatcher<C> localDispatcher; private final CommandDispatcher<C> localDispatcher;
private final Runnable closeTask; private final Runnable closeTask;
private final Address localAddress; private final Address localAddress;


public ChannelCommandDispatcher(MessageDispatcher dispatcher, CommandMarshaller<C> marshaller, Group<Address> group, long timeout, CommandDispatcher<C> localDispatcher, Runnable closeTask) { public ChannelCommandDispatcher(MessageDispatcher dispatcher, CommandMarshaller<C> marshaller, Group<Address> group, Duration timeout, CommandDispatcher<C> localDispatcher, Runnable closeTask) {
this.dispatcher = dispatcher; this.dispatcher = dispatcher;
this.marshaller = marshaller; this.marshaller = marshaller;
this.group = group; this.group = group;
Expand Down Expand Up @@ -145,7 +146,7 @@ private <R> Buffer createBuffer(Command<R, ? super C> command) {
} }


private RequestOptions createRequestOptions() { private RequestOptions createRequestOptions() {
return new RequestOptions(ResponseMode.GET_ALL, this.timeout, false, FILTER, Message.Flag.DONT_BUNDLE, Message.Flag.OOB); return new RequestOptions(ResponseMode.GET_ALL, this.timeout.toMillis(), false, FILTER, Message.Flag.DONT_BUNDLE, Message.Flag.OOB);
} }


private static class PruneCancellationTask<T> implements BiConsumer<T, Throwable> { private static class PruneCancellationTask<T> implements BiConsumer<T, Throwable> {
Expand Down
Expand Up @@ -25,6 +25,7 @@
import java.io.DataInputStream; import java.io.DataInputStream;
import java.net.InetSocketAddress; import java.net.InetSocketAddress;
import java.security.PrivilegedAction; import java.security.PrivilegedAction;
import java.time.Duration;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Optional; import java.util.Optional;
Expand Down Expand Up @@ -95,7 +96,7 @@ private static ThreadFactory createThreadFactory(Class<?> targetClass) {
private final AtomicReference<View> view = new AtomicReference<>(); private final AtomicReference<View> view = new AtomicReference<>();
private final MarshallingContext marshallingContext; private final MarshallingContext marshallingContext;
private final MessageDispatcher dispatcher; private final MessageDispatcher dispatcher;
private final long timeout; private final Duration timeout;


@SuppressWarnings("resource") @SuppressWarnings("resource")
public ChannelCommandDispatcherFactory(ChannelCommandDispatcherFactoryConfiguration config) { public ChannelCommandDispatcherFactory(ChannelCommandDispatcherFactoryConfiguration config) {
Expand All @@ -118,7 +119,7 @@ public ChannelCommandDispatcherFactory(ChannelCommandDispatcherFactoryConfigurat
public void run() { public void run() {
this.executorService.shutdownNow(); this.executorService.shutdownNow();
try { try {
this.executorService.awaitTermination(this.timeout, TimeUnit.MILLISECONDS); this.executorService.awaitTermination(this.timeout.toMillis(), TimeUnit.MILLISECONDS);
} catch (InterruptedException e) { } catch (InterruptedException e) {
Thread.currentThread().interrupt(); Thread.currentThread().interrupt();
} }
Expand Down Expand Up @@ -204,7 +205,7 @@ private void unregister(GroupListener listener) {
if (executor != null) { if (executor != null) {
executor.shutdownNow(); executor.shutdownNow();
try { try {
executor.awaitTermination(this.timeout, TimeUnit.MILLISECONDS); executor.awaitTermination(this.timeout.toMillis(), TimeUnit.MILLISECONDS);
} catch (InterruptedException e) { } catch (InterruptedException e) {
Thread.currentThread().interrupt(); Thread.currentThread().interrupt();
} }
Expand Down
Expand Up @@ -21,6 +21,8 @@
*/ */
package org.wildfly.clustering.server.dispatcher; package org.wildfly.clustering.server.dispatcher;


import java.time.Duration;

import org.jgroups.JChannel; import org.jgroups.JChannel;
import org.wildfly.clustering.jgroups.spi.ChannelFactory; import org.wildfly.clustering.jgroups.spi.ChannelFactory;
import org.wildfly.clustering.marshalling.jboss.MarshallingContext; import org.wildfly.clustering.marshalling.jboss.MarshallingContext;
Expand All @@ -33,5 +35,5 @@ public interface ChannelCommandDispatcherFactoryConfiguration {
ChannelFactory getChannelFactory(); ChannelFactory getChannelFactory();
JChannel getChannel(); JChannel getChannel();
MarshallingContext getMarshallingContext(); MarshallingContext getMarshallingContext();
long getTimeout(); Duration getTimeout();
} }
Expand Up @@ -21,7 +21,7 @@
*/ */
package org.wildfly.clustering.server.dispatcher; package org.wildfly.clustering.server.dispatcher;


import java.util.concurrent.TimeUnit; import java.time.Duration;
import java.util.function.Consumer; import java.util.function.Consumer;
import java.util.function.Function; import java.util.function.Function;
import java.util.function.Supplier; import java.util.function.Supplier;
Expand Down Expand Up @@ -93,7 +93,7 @@ public MarshallingConfiguration apply(MarshallingConfigurationContext context) {
private volatile SupplierDependency<JChannel> channel; private volatile SupplierDependency<JChannel> channel;
private volatile SupplierDependency<Module> module; private volatile SupplierDependency<Module> module;
private volatile Supplier<ModuleLoader> loader; private volatile Supplier<ModuleLoader> loader;
private volatile long timeout = TimeUnit.MINUTES.toMillis(1); private volatile Duration timeout = Duration.ofMinutes(1);


public ChannelCommandDispatcherFactoryServiceConfigurator(ServiceName name, String group) { public ChannelCommandDispatcherFactoryServiceConfigurator(ServiceName name, String group) {
super(name); super(name);
Expand Down Expand Up @@ -122,8 +122,8 @@ public ServiceBuilder<?> build(ServiceTarget target) {
return builder.setInstance(service).setInitialMode(ServiceController.Mode.PASSIVE); return builder.setInstance(service).setInitialMode(ServiceController.Mode.PASSIVE);
} }


public ChannelCommandDispatcherFactoryServiceConfigurator timeout(long value, TimeUnit unit) { public ChannelCommandDispatcherFactoryServiceConfigurator timeout(Duration timeout) {
this.timeout = unit.toMillis(value); this.timeout = timeout;
return this; return this;
} }


Expand All @@ -148,7 +148,7 @@ public MarshallingContext getMarshallingContext() {
} }


@Override @Override
public long getTimeout() { public Duration getTimeout() {
return this.timeout; return this.timeout;
} }


Expand Down

0 comments on commit 60aa086

Please sign in to comment.