diff --git a/client/client-multi/client-java-core/src/main/java/org/kaaproject/kaa/client/channel/failover/DefaultFailoverStrategy.java b/client/client-multi/client-java-core/src/main/java/org/kaaproject/kaa/client/channel/failover/DefaultFailoverStrategy.java new file mode 100644 index 0000000000..1c67953ca9 --- /dev/null +++ b/client/client-multi/client-java-core/src/main/java/org/kaaproject/kaa/client/channel/failover/DefaultFailoverStrategy.java @@ -0,0 +1,94 @@ +/** + * Copyright 2014-2016 CyberVision, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.kaaproject.kaa.client.channel.failover; + +import org.kaaproject.kaa.client.channel.FailoverDecision; +import org.kaaproject.kaa.client.channel.FailoverDecision.FailoverAction; +import org.kaaproject.kaa.client.channel.FailoverStatus; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.concurrent.TimeUnit; + +/** + * Reference implementation for {@link FailoverStrategy}. + */ +public class DefaultFailoverStrategy implements FailoverStrategy { + + private static final Logger LOG = LoggerFactory.getLogger(DefaultFailoverStrategy.class); + + private static final long DEFAULT_BOOTSTRAP_SERVERS_RETRY_PERIOD = 2; + private static final long DEFAULT_OPERATION_SERVERS_RETRY_PERIOD = 2; + private static final long DEFAULT_NO_CONNECTIVITY_RETRY_PERIOD = 5; + private static final TimeUnit DEFAULT_TIME_UNIT = TimeUnit.SECONDS; + + private long bootstrapServersRetryPeriod; + private long operationsServersRetryPeriod; + private long noConnectivityRetryPeriod; + private TimeUnit timeUnit; + + public DefaultFailoverStrategy() { + this(DEFAULT_BOOTSTRAP_SERVERS_RETRY_PERIOD, + DEFAULT_OPERATION_SERVERS_RETRY_PERIOD, + DEFAULT_NO_CONNECTIVITY_RETRY_PERIOD, + DEFAULT_TIME_UNIT); + } + + public DefaultFailoverStrategy(long bootstrapServersRetryPeriod, + long operationsServersRetryPeriod, + long noConnectivityRetryPeriod, + TimeUnit timeUnit) { + this.bootstrapServersRetryPeriod = bootstrapServersRetryPeriod; + this.operationsServersRetryPeriod = operationsServersRetryPeriod; + this.noConnectivityRetryPeriod = noConnectivityRetryPeriod; + this.timeUnit = timeUnit; + } + + @Override + public FailoverDecision onFailover(FailoverStatus failoverStatus) { + LOG.trace("Producing failover decision for failover status: {}", failoverStatus); + switch (failoverStatus) { + case BOOTSTRAP_SERVERS_NA: + return new FailoverDecision(FailoverAction.RETRY, bootstrapServersRetryPeriod, timeUnit); + case CURRENT_BOOTSTRAP_SERVER_NA: + return new FailoverDecision(FailoverAction.USE_NEXT_BOOTSTRAP, bootstrapServersRetryPeriod, timeUnit); + case NO_OPERATION_SERVERS_RECEIVED: + return new FailoverDecision(FailoverAction.USE_NEXT_BOOTSTRAP, bootstrapServersRetryPeriod, timeUnit); + case OPERATION_SERVERS_NA: + return new FailoverDecision(FailoverAction.RETRY, operationsServersRetryPeriod, timeUnit); + case NO_CONNECTIVITY: + return new FailoverDecision(FailoverAction.RETRY, noConnectivityRetryPeriod, timeUnit); + default: + return new FailoverDecision(FailoverAction.NOOP); + } + } + + @Override + public long getBootstrapServersRetryPeriod() { + return bootstrapServersRetryPeriod; + } + + @Override + public long getOperationServersRetryPeriod() { + return operationsServersRetryPeriod; + } + + @Override + public TimeUnit getTimeUnit() { + return timeUnit; + } +} diff --git a/client/client-multi/client-java-core/src/main/java/org/kaaproject/kaa/client/channel/failover/FailoverStrategy.java b/client/client-multi/client-java-core/src/main/java/org/kaaproject/kaa/client/channel/failover/FailoverStrategy.java new file mode 100644 index 0000000000..5257793c6a --- /dev/null +++ b/client/client-multi/client-java-core/src/main/java/org/kaaproject/kaa/client/channel/failover/FailoverStrategy.java @@ -0,0 +1,59 @@ +/** + * Copyright 2014-2016 CyberVision, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.kaaproject.kaa.client.channel.failover; + +import org.kaaproject.kaa.client.channel.FailoverDecision; +import org.kaaproject.kaa.client.channel.FailoverStatus; + +import java.util.concurrent.TimeUnit; + +/** + * Failover strategy is responsible for producing failover decisions based on failover statuses. + */ +public interface FailoverStrategy { + + /** + * Needs to be invoked to determine a decision that resolves the failover. + * + * @param failoverStatus current status of the failover. + * + * @return decision which is meant to resolve the failover. + * + * @see org.kaaproject.kaa.client.channel.FailoverDecision + * @see org.kaaproject.kaa.client.channel.FailoverStatus + */ + FailoverDecision onFailover(FailoverStatus failoverStatus); + + /** + * Use the {@link #getTimeUnit()} method to get current time unit. + * + * @return period of time after which will be made attempt to tweak bootstrap server. + */ + long getBootstrapServersRetryPeriod(); + + /** + * Use the {@link #getTimeUnit()} method to get current time unit. + * + * @return period of time after which will be made attempt to tweak operation server. + */ + long getOperationServersRetryPeriod(); + + /** + * @return time unit used within a scope of current failover strategy. + */ + TimeUnit getTimeUnit(); +} diff --git a/client/client-multi/client-java-core/src/main/java/org/kaaproject/kaa/client/channel/impl/DefaultFailoverManager.java b/client/client-multi/client-java-core/src/main/java/org/kaaproject/kaa/client/channel/impl/DefaultFailoverManager.java index bfd7afb3b7..c028189af9 100644 --- a/client/client-multi/client-java-core/src/main/java/org/kaaproject/kaa/client/channel/impl/DefaultFailoverManager.java +++ b/client/client-multi/client-java-core/src/main/java/org/kaaproject/kaa/client/channel/impl/DefaultFailoverManager.java @@ -17,12 +17,13 @@ package org.kaaproject.kaa.client.channel.impl; import org.kaaproject.kaa.client.channel.FailoverDecision; -import org.kaaproject.kaa.client.channel.FailoverDecision.FailoverAction; import org.kaaproject.kaa.client.channel.FailoverManager; import org.kaaproject.kaa.client.channel.FailoverStatus; import org.kaaproject.kaa.client.channel.KaaChannelManager; import org.kaaproject.kaa.client.channel.ServerType; import org.kaaproject.kaa.client.channel.TransportConnectionInfo; +import org.kaaproject.kaa.client.channel.failover.DefaultFailoverStrategy; +import org.kaaproject.kaa.client.channel.failover.FailoverStrategy; import org.kaaproject.kaa.client.context.ExecutorContext; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -33,45 +34,34 @@ import java.util.concurrent.TimeUnit; public class DefaultFailoverManager implements FailoverManager { + private static final Logger LOG = LoggerFactory.getLogger(DefaultFailoverManager.class); - // all timeout values are specified in seconds private static final long DEFAULT_FAILURE_RESOLUTION_TIMEOUT = 10; - private static final long DEFAULT_BOOTSTRAP_SERVERS_RETRY_PERIOD = 2; - private static final long DEFAULT_OPERATION_SERVERS_RETRY_PERIOD = 2; - private static final long DEFAULT_NO_CONNECTIVITY_RETRY_PERIOD = 5; - private static final TimeUnit DEFAULT_TIMEUNIT = TimeUnit.SECONDS; + private static final TimeUnit DEFAULT_TIME_UNIT = TimeUnit.SECONDS; private long failureResolutionTimeout; - private long bootstrapServersRetryPeriod; - private long operationsServersRetryPeriod; - private long noConnectivityRetryPeriod; - private TimeUnit timeUnit = DEFAULT_TIMEUNIT; + private TimeUnit timeUnit = DEFAULT_TIME_UNIT; + + private FailoverStrategy failoverStrategy; private final KaaChannelManager channelManager; private final ExecutorContext context; private Map resolutionProgressMap = new HashMap<>(); public DefaultFailoverManager(KaaChannelManager channelManager, ExecutorContext context) { - this(channelManager, - context, - DEFAULT_FAILURE_RESOLUTION_TIMEOUT, - DEFAULT_BOOTSTRAP_SERVERS_RETRY_PERIOD, - DEFAULT_OPERATION_SERVERS_RETRY_PERIOD, - DEFAULT_NO_CONNECTIVITY_RETRY_PERIOD, - DEFAULT_TIMEUNIT); + this(channelManager, context, new DefaultFailoverStrategy(), DEFAULT_FAILURE_RESOLUTION_TIMEOUT, DEFAULT_TIME_UNIT); } - public DefaultFailoverManager(KaaChannelManager channelManager, ExecutorContext context, - long failureResolutionTimeout, long bootstrapServersRetryPeriod, - long operationsServersRetryPeriod, - long noConnectivityRetryPeriod, TimeUnit timeUnit) { + public DefaultFailoverManager(KaaChannelManager channelManager, + ExecutorContext context, + FailoverStrategy failoverStrategy, + long failureResolutionTimeout, + TimeUnit timeUnit) { this.channelManager = channelManager; this.context = context; + this.failoverStrategy = failoverStrategy; this.failureResolutionTimeout = failureResolutionTimeout; - this.bootstrapServersRetryPeriod = bootstrapServersRetryPeriod; - this.operationsServersRetryPeriod = operationsServersRetryPeriod; - this.noConnectivityRetryPeriod = noConnectivityRetryPeriod; this.timeUnit = timeUnit; } @@ -176,37 +166,27 @@ public synchronized void onServerConnected(TransportConnectionInfo connectionInf @Override public synchronized FailoverDecision onFailover(FailoverStatus failoverStatus) { - LOG.trace("Applying failover strategy for status: {}", failoverStatus); + AccessPointIdResolution accessPointIdResolution = null; + long resolutionTime = System.currentTimeMillis(); switch (failoverStatus) { case BOOTSTRAP_SERVERS_NA: - AccessPointIdResolution bootstrapResolution = resolutionProgressMap.get(ServerType.BOOTSTRAP); - if (bootstrapResolution != null) { - bootstrapResolution.setResolutionTime(System.currentTimeMillis() + TimeUnit.MILLISECONDS.convert(bootstrapServersRetryPeriod, timeUnit)); - } - return new FailoverDecision(FailoverAction.RETRY, bootstrapServersRetryPeriod, timeUnit); case CURRENT_BOOTSTRAP_SERVER_NA: - bootstrapResolution = resolutionProgressMap.get(ServerType.BOOTSTRAP); - if (bootstrapResolution != null) { - bootstrapResolution.setResolutionTime(System.currentTimeMillis() + TimeUnit.MILLISECONDS.convert(bootstrapServersRetryPeriod, timeUnit)); - } - return new FailoverDecision(FailoverAction.USE_NEXT_BOOTSTRAP, bootstrapServersRetryPeriod, timeUnit); + accessPointIdResolution = resolutionProgressMap.get(ServerType.BOOTSTRAP); + resolutionTime += failoverStrategy.getTimeUnit().toMillis(failoverStrategy.getBootstrapServersRetryPeriod()); + break; case NO_OPERATION_SERVERS_RECEIVED: - bootstrapResolution = resolutionProgressMap.get(ServerType.BOOTSTRAP); - if (bootstrapResolution != null) { - bootstrapResolution.setResolutionTime(System.currentTimeMillis()); - } - return new FailoverDecision(FailoverAction.USE_NEXT_BOOTSTRAP, bootstrapServersRetryPeriod, timeUnit); + accessPointIdResolution = resolutionProgressMap.get(ServerType.BOOTSTRAP); + break; case OPERATION_SERVERS_NA: - AccessPointIdResolution operationsResolution = resolutionProgressMap.get(ServerType.OPERATIONS); - if (operationsResolution != null) { - operationsResolution.setResolutionTime(System.currentTimeMillis() + TimeUnit.MILLISECONDS.convert(operationsServersRetryPeriod, timeUnit)); - } - return new FailoverDecision(FailoverAction.RETRY, operationsServersRetryPeriod, timeUnit); - case NO_CONNECTIVITY: - return new FailoverDecision(FailoverAction.RETRY, noConnectivityRetryPeriod, timeUnit); - default: - return new FailoverDecision(FailoverAction.NOOP); + accessPointIdResolution = resolutionProgressMap.get(ServerType.OPERATIONS); + resolutionTime += failoverStrategy.getTimeUnit().toMillis(failoverStrategy.getOperationServersRetryPeriod()); + break; } + if (accessPointIdResolution != null) { + accessPointIdResolution.setResolutionTime(resolutionTime); + } + + return failoverStrategy.onFailover(failoverStatus); } private void cancelCurrentFailResolution(AccessPointIdResolution accessPointIdResolution) { @@ -220,13 +200,13 @@ private void cancelCurrentFailResolution(AccessPointIdResolution accessPointIdRe static class AccessPointIdResolution { private int accessPointId; - private long resolutionTime; // in milliseconds + private long resolutionTimeMillis; private Future curResolution; public AccessPointIdResolution(int accessPointId, Future curResolution) { this.accessPointId = accessPointId; this.curResolution = curResolution; - this.resolutionTime = Long.MAX_VALUE; + this.resolutionTimeMillis = Long.MAX_VALUE; } public int getAccessPointId() { @@ -242,11 +222,11 @@ public void setCurResolution(Future curResolution) { } public long getResolutionTime() { - return resolutionTime; + return resolutionTimeMillis; } - public void setResolutionTime(long resolutionTime) { - this.resolutionTime = resolutionTime; + public void setResolutionTime(long resolutionTimeMillis) { + this.resolutionTimeMillis = resolutionTimeMillis; } @Override @@ -281,7 +261,7 @@ public int hashCode() { public String toString() { return "AccessPointIdResolution{" + "accessPointId=" + accessPointId + - ", resolutionTime=" + resolutionTime + + ", resolutionTime=" + resolutionTimeMillis + ", curResolution=" + curResolution + '}'; } diff --git a/client/client-multi/client-java-core/src/test/java/org/kaaproject/kaa/client/bootstrap/DefaultBootstrapManagerTest.java b/client/client-multi/client-java-core/src/test/java/org/kaaproject/kaa/client/bootstrap/DefaultBootstrapManagerTest.java index 33f045329d..f4d38b2bab 100644 --- a/client/client-multi/client-java-core/src/test/java/org/kaaproject/kaa/client/bootstrap/DefaultBootstrapManagerTest.java +++ b/client/client-multi/client-java-core/src/test/java/org/kaaproject/kaa/client/bootstrap/DefaultBootstrapManagerTest.java @@ -47,6 +47,8 @@ import org.kaaproject.kaa.client.channel.TransportConnectionInfo; import org.kaaproject.kaa.client.channel.TransportProtocolIdConstants; import org.kaaproject.kaa.client.channel.connectivity.ConnectivityChecker; +import org.kaaproject.kaa.client.channel.failover.DefaultFailoverStrategy; +import org.kaaproject.kaa.client.channel.failover.FailoverStrategy; import org.kaaproject.kaa.client.channel.impl.DefaultFailoverManager; import org.kaaproject.kaa.client.context.ExecutorContext; import org.kaaproject.kaa.client.transport.TransportException; @@ -237,8 +239,9 @@ public void testOperationsServerInfoRetrieving() throws TransportException, NoSu ChanelManagerMock channelManager = spy(new ChanelManagerMock()); when(executorContext.getScheduledExecutor()).thenReturn(Executors.newScheduledThreadPool(1)); + FailoverStrategy strategy = new DefaultFailoverStrategy(1, 1, 1, TimeUnit.MILLISECONDS); FailoverManager failoverManager = - spy(new DefaultFailoverManager(channelManager, executorContext, 1, 1, 1, 1, TimeUnit.MILLISECONDS)); + spy(new DefaultFailoverManager(channelManager, executorContext, strategy, 1, TimeUnit.MILLISECONDS)); manager.setChannelManager(channelManager); manager.setFailoverManager(failoverManager); diff --git a/client/client-multi/client-java-core/src/test/java/org/kaaproject/kaa/client/channel/DefaultChannelManagerTest.java b/client/client-multi/client-java-core/src/test/java/org/kaaproject/kaa/client/channel/DefaultChannelManagerTest.java index 23de388f0a..d5c3f10817 100644 --- a/client/client-multi/client-java-core/src/test/java/org/kaaproject/kaa/client/channel/DefaultChannelManagerTest.java +++ b/client/client-multi/client-java-core/src/test/java/org/kaaproject/kaa/client/channel/DefaultChannelManagerTest.java @@ -33,6 +33,8 @@ import org.junit.Test; import org.kaaproject.kaa.client.bootstrap.BootstrapManager; import org.kaaproject.kaa.client.channel.connectivity.ConnectivityChecker; +import org.kaaproject.kaa.client.channel.failover.DefaultFailoverStrategy; +import org.kaaproject.kaa.client.channel.failover.FailoverStrategy; import org.kaaproject.kaa.client.channel.impl.ChannelRuntimeException; import org.kaaproject.kaa.client.channel.impl.DefaultChannelManager; import org.kaaproject.kaa.client.channel.impl.DefaultFailoverManager; @@ -184,7 +186,9 @@ public void testBootstrapServerFailed() throws NoSuchAlgorithmException, Invalid ExecutorContext context = Mockito.mock(ExecutorContext.class); Mockito.when(context.getScheduledExecutor()).thenReturn(Executors.newScheduledThreadPool(1)); KaaChannelManager channelManager = new DefaultChannelManager(bootstrapManager, bootststrapServers, context); - FailoverManager failoverManager = Mockito.spy(new DefaultFailoverManager(channelManager, CONTEXT, 1, 1, 1, 1, TimeUnit.MILLISECONDS)); + + FailoverStrategy failoverStrategy = new DefaultFailoverStrategy(1, 1, 1, TimeUnit.MILLISECONDS); + FailoverManager failoverManager = Mockito.spy(new DefaultFailoverManager(channelManager, CONTEXT, failoverStrategy, 1, TimeUnit.MILLISECONDS)); channelManager.setFailoverManager(failoverManager); channelManager.addChannel(channel); diff --git a/client/client-multi/client-java-core/src/test/java/org/kaaproject/kaa/client/channel/DefaultOperationsChannelTest.java b/client/client-multi/client-java-core/src/test/java/org/kaaproject/kaa/client/channel/DefaultOperationsChannelTest.java index a680f4ffb3..733c667740 100644 --- a/client/client-multi/client-java-core/src/test/java/org/kaaproject/kaa/client/channel/DefaultOperationsChannelTest.java +++ b/client/client-multi/client-java-core/src/test/java/org/kaaproject/kaa/client/channel/DefaultOperationsChannelTest.java @@ -29,6 +29,8 @@ import org.junit.Assert; import org.junit.Test; import org.kaaproject.kaa.client.AbstractKaaClient; +import org.kaaproject.kaa.client.channel.failover.DefaultFailoverStrategy; +import org.kaaproject.kaa.client.channel.failover.FailoverStrategy; import org.kaaproject.kaa.client.channel.impl.DefaultFailoverManager; import org.kaaproject.kaa.client.channel.impl.channels.DefaultOperationsChannel; import org.kaaproject.kaa.client.context.ExecutorContext; @@ -170,7 +172,8 @@ public void testServerFailed() throws Exception { AbstractHttpClient httpClient = Mockito.mock(AbstractHttpClient.class); ExecutorContext context = Mockito.mock(ExecutorContext.class); Mockito.when(context.getScheduledExecutor()).thenReturn(Executors.newScheduledThreadPool(1)); - FailoverManager flManager = new DefaultFailoverManager(manager, context, 100, 1, 1, 1, TimeUnit.MILLISECONDS); + FailoverStrategy failoverStrategy = new DefaultFailoverStrategy(1, 1, 1, TimeUnit.MILLISECONDS); + FailoverManager flManager = new DefaultFailoverManager(manager, context, failoverStrategy, 100, TimeUnit.MILLISECONDS); FailoverManager failoverManager = Mockito.spy(flManager); Mockito.when( httpClient.executeHttpRequest(Mockito.anyString(), diff --git a/client/client-multi/client-java-core/src/test/java/org/kaaproject/kaa/client/channel/impl/DefaultFailoverManagerTest.java b/client/client-multi/client-java-core/src/test/java/org/kaaproject/kaa/client/channel/impl/DefaultFailoverManagerTest.java index c30a8fb613..39d3fddf20 100644 --- a/client/client-multi/client-java-core/src/test/java/org/kaaproject/kaa/client/channel/impl/DefaultFailoverManagerTest.java +++ b/client/client-multi/client-java-core/src/test/java/org/kaaproject/kaa/client/channel/impl/DefaultFailoverManagerTest.java @@ -22,6 +22,8 @@ import org.kaaproject.kaa.client.channel.KaaChannelManager; import org.kaaproject.kaa.client.channel.ServerType; import org.kaaproject.kaa.client.channel.TransportConnectionInfo; +import org.kaaproject.kaa.client.channel.failover.DefaultFailoverStrategy; +import org.kaaproject.kaa.client.channel.failover.FailoverStrategy; import org.kaaproject.kaa.client.context.ExecutorContext; import org.mockito.ArgumentCaptor; import org.mockito.Mockito; @@ -50,7 +52,8 @@ public void setUp() { channelManager = Mockito.mock(KaaChannelManager.class); context = Mockito.mock(ExecutorContext.class); Mockito.when(context.getScheduledExecutor()).thenReturn(Executors.newScheduledThreadPool(1)); - failoverManager = new DefaultFailoverManager(channelManager, context, RESOLUTION_TIMEOUT_MS, BOOTSTRAP_RETRY_PERIOD, 1, 1, TimeUnit.MILLISECONDS); + FailoverStrategy failoverStrategy = new DefaultFailoverStrategy(BOOTSTRAP_RETRY_PERIOD, 1, 1, TimeUnit.MILLISECONDS); + failoverManager = new DefaultFailoverManager(channelManager, context, failoverStrategy, RESOLUTION_TIMEOUT_MS, TimeUnit.MILLISECONDS); resolutionProgressMap = Mockito.spy(new HashMap()); ReflectionTestUtils.setField(failoverManager, "resolutionProgressMap", resolutionProgressMap); }