Skip to content

Commit

Permalink
KAA-896: add ability to set failover strategy for failover manager
Browse files Browse the repository at this point in the history
  • Loading branch information
abohomol committed Mar 30, 2016
1 parent 4c3198b commit 9cbb274
Show file tree
Hide file tree
Showing 7 changed files with 205 additions and 59 deletions.
@@ -0,0 +1,94 @@
/**
* Copyright 2014-2016 CyberVision, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.kaaproject.kaa.client.channel.failover;

import org.kaaproject.kaa.client.channel.FailoverDecision;
import org.kaaproject.kaa.client.channel.FailoverDecision.FailoverAction;
import org.kaaproject.kaa.client.channel.FailoverStatus;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.concurrent.TimeUnit;

/**
* Reference implementation for {@link FailoverStrategy}.
*/
public class DefaultFailoverStrategy implements FailoverStrategy {

private static final Logger LOG = LoggerFactory.getLogger(DefaultFailoverStrategy.class);

private static final long DEFAULT_BOOTSTRAP_SERVERS_RETRY_PERIOD = 2;
private static final long DEFAULT_OPERATION_SERVERS_RETRY_PERIOD = 2;
private static final long DEFAULT_NO_CONNECTIVITY_RETRY_PERIOD = 5;
private static final TimeUnit DEFAULT_TIME_UNIT = TimeUnit.SECONDS;

private long bootstrapServersRetryPeriod;
private long operationsServersRetryPeriod;
private long noConnectivityRetryPeriod;
private TimeUnit timeUnit;

public DefaultFailoverStrategy() {
this(DEFAULT_BOOTSTRAP_SERVERS_RETRY_PERIOD,
DEFAULT_OPERATION_SERVERS_RETRY_PERIOD,
DEFAULT_NO_CONNECTIVITY_RETRY_PERIOD,
DEFAULT_TIME_UNIT);
}

public DefaultFailoverStrategy(long bootstrapServersRetryPeriod,
long operationsServersRetryPeriod,
long noConnectivityRetryPeriod,
TimeUnit timeUnit) {
this.bootstrapServersRetryPeriod = bootstrapServersRetryPeriod;
this.operationsServersRetryPeriod = operationsServersRetryPeriod;
this.noConnectivityRetryPeriod = noConnectivityRetryPeriod;
this.timeUnit = timeUnit;
}

@Override
public FailoverDecision onFailover(FailoverStatus failoverStatus) {
LOG.trace("Producing failover decision for failover status: {}", failoverStatus);
switch (failoverStatus) {
case BOOTSTRAP_SERVERS_NA:
return new FailoverDecision(FailoverAction.RETRY, bootstrapServersRetryPeriod, timeUnit);
case CURRENT_BOOTSTRAP_SERVER_NA:
return new FailoverDecision(FailoverAction.USE_NEXT_BOOTSTRAP, bootstrapServersRetryPeriod, timeUnit);
case NO_OPERATION_SERVERS_RECEIVED:
return new FailoverDecision(FailoverAction.USE_NEXT_BOOTSTRAP, bootstrapServersRetryPeriod, timeUnit);
case OPERATION_SERVERS_NA:
return new FailoverDecision(FailoverAction.RETRY, operationsServersRetryPeriod, timeUnit);
case NO_CONNECTIVITY:
return new FailoverDecision(FailoverAction.RETRY, noConnectivityRetryPeriod, timeUnit);
default:
return new FailoverDecision(FailoverAction.NOOP);
}
}

@Override
public long getBootstrapServersRetryPeriod() {
return bootstrapServersRetryPeriod;
}

@Override
public long getOperationServersRetryPeriod() {
return operationsServersRetryPeriod;
}

@Override
public TimeUnit getTimeUnit() {
return timeUnit;
}
}
@@ -0,0 +1,59 @@
/**
* Copyright 2014-2016 CyberVision, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.kaaproject.kaa.client.channel.failover;

import org.kaaproject.kaa.client.channel.FailoverDecision;
import org.kaaproject.kaa.client.channel.FailoverStatus;

import java.util.concurrent.TimeUnit;

/**
* Failover strategy is responsible for producing failover decisions based on failover statuses.
*/
public interface FailoverStrategy {

/**
* Needs to be invoked to determine a decision that resolves the failover.
*
* @param failoverStatus current status of the failover.
*
* @return decision which is meant to resolve the failover.
*
* @see org.kaaproject.kaa.client.channel.FailoverDecision
* @see org.kaaproject.kaa.client.channel.FailoverStatus
*/
FailoverDecision onFailover(FailoverStatus failoverStatus);

/**
* Use the {@link #getTimeUnit()} method to get current time unit.
*
* @return period of time after which will be made attempt to tweak bootstrap server.
*/
long getBootstrapServersRetryPeriod();

/**
* Use the {@link #getTimeUnit()} method to get current time unit.
*
* @return period of time after which will be made attempt to tweak operation server.
*/
long getOperationServersRetryPeriod();

/**
* @return time unit used within a scope of current failover strategy.
*/
TimeUnit getTimeUnit();
}
Expand Up @@ -17,12 +17,13 @@
package org.kaaproject.kaa.client.channel.impl;

import org.kaaproject.kaa.client.channel.FailoverDecision;
import org.kaaproject.kaa.client.channel.FailoverDecision.FailoverAction;
import org.kaaproject.kaa.client.channel.FailoverManager;
import org.kaaproject.kaa.client.channel.FailoverStatus;
import org.kaaproject.kaa.client.channel.KaaChannelManager;
import org.kaaproject.kaa.client.channel.ServerType;
import org.kaaproject.kaa.client.channel.TransportConnectionInfo;
import org.kaaproject.kaa.client.channel.failover.DefaultFailoverStrategy;
import org.kaaproject.kaa.client.channel.failover.FailoverStrategy;
import org.kaaproject.kaa.client.context.ExecutorContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -33,45 +34,34 @@
import java.util.concurrent.TimeUnit;

public class DefaultFailoverManager implements FailoverManager {

private static final Logger LOG = LoggerFactory.getLogger(DefaultFailoverManager.class);

// all timeout values are specified in seconds
private static final long DEFAULT_FAILURE_RESOLUTION_TIMEOUT = 10;
private static final long DEFAULT_BOOTSTRAP_SERVERS_RETRY_PERIOD = 2;
private static final long DEFAULT_OPERATION_SERVERS_RETRY_PERIOD = 2;
private static final long DEFAULT_NO_CONNECTIVITY_RETRY_PERIOD = 5;
private static final TimeUnit DEFAULT_TIMEUNIT = TimeUnit.SECONDS;
private static final TimeUnit DEFAULT_TIME_UNIT = TimeUnit.SECONDS;

private long failureResolutionTimeout;
private long bootstrapServersRetryPeriod;
private long operationsServersRetryPeriod;
private long noConnectivityRetryPeriod;
private TimeUnit timeUnit = DEFAULT_TIMEUNIT;
private TimeUnit timeUnit = DEFAULT_TIME_UNIT;

private FailoverStrategy failoverStrategy;

private final KaaChannelManager channelManager;
private final ExecutorContext context;
private Map<ServerType, AccessPointIdResolution> resolutionProgressMap = new HashMap<>();

public DefaultFailoverManager(KaaChannelManager channelManager, ExecutorContext context) {
this(channelManager,
context,
DEFAULT_FAILURE_RESOLUTION_TIMEOUT,
DEFAULT_BOOTSTRAP_SERVERS_RETRY_PERIOD,
DEFAULT_OPERATION_SERVERS_RETRY_PERIOD,
DEFAULT_NO_CONNECTIVITY_RETRY_PERIOD,
DEFAULT_TIMEUNIT);
this(channelManager, context, new DefaultFailoverStrategy(), DEFAULT_FAILURE_RESOLUTION_TIMEOUT, DEFAULT_TIME_UNIT);
}

public DefaultFailoverManager(KaaChannelManager channelManager, ExecutorContext context,
long failureResolutionTimeout, long bootstrapServersRetryPeriod,
long operationsServersRetryPeriod,
long noConnectivityRetryPeriod, TimeUnit timeUnit) {
public DefaultFailoverManager(KaaChannelManager channelManager,
ExecutorContext context,
FailoverStrategy failoverStrategy,
long failureResolutionTimeout,
TimeUnit timeUnit) {
this.channelManager = channelManager;
this.context = context;
this.failoverStrategy = failoverStrategy;
this.failureResolutionTimeout = failureResolutionTimeout;
this.bootstrapServersRetryPeriod = bootstrapServersRetryPeriod;
this.operationsServersRetryPeriod = operationsServersRetryPeriod;
this.noConnectivityRetryPeriod = noConnectivityRetryPeriod;
this.timeUnit = timeUnit;
}

Expand Down Expand Up @@ -176,37 +166,27 @@ public synchronized void onServerConnected(TransportConnectionInfo connectionInf

@Override
public synchronized FailoverDecision onFailover(FailoverStatus failoverStatus) {
LOG.trace("Applying failover strategy for status: {}", failoverStatus);
AccessPointIdResolution accessPointIdResolution = null;
long resolutionTime = System.currentTimeMillis();
switch (failoverStatus) {
case BOOTSTRAP_SERVERS_NA:
AccessPointIdResolution bootstrapResolution = resolutionProgressMap.get(ServerType.BOOTSTRAP);
if (bootstrapResolution != null) {
bootstrapResolution.setResolutionTime(System.currentTimeMillis() + TimeUnit.MILLISECONDS.convert(bootstrapServersRetryPeriod, timeUnit));
}
return new FailoverDecision(FailoverAction.RETRY, bootstrapServersRetryPeriod, timeUnit);
case CURRENT_BOOTSTRAP_SERVER_NA:
bootstrapResolution = resolutionProgressMap.get(ServerType.BOOTSTRAP);
if (bootstrapResolution != null) {
bootstrapResolution.setResolutionTime(System.currentTimeMillis() + TimeUnit.MILLISECONDS.convert(bootstrapServersRetryPeriod, timeUnit));
}
return new FailoverDecision(FailoverAction.USE_NEXT_BOOTSTRAP, bootstrapServersRetryPeriod, timeUnit);
accessPointIdResolution = resolutionProgressMap.get(ServerType.BOOTSTRAP);
resolutionTime += failoverStrategy.getTimeUnit().toMillis(failoverStrategy.getBootstrapServersRetryPeriod());
break;
case NO_OPERATION_SERVERS_RECEIVED:
bootstrapResolution = resolutionProgressMap.get(ServerType.BOOTSTRAP);
if (bootstrapResolution != null) {
bootstrapResolution.setResolutionTime(System.currentTimeMillis());
}
return new FailoverDecision(FailoverAction.USE_NEXT_BOOTSTRAP, bootstrapServersRetryPeriod, timeUnit);
accessPointIdResolution = resolutionProgressMap.get(ServerType.BOOTSTRAP);
break;
case OPERATION_SERVERS_NA:
AccessPointIdResolution operationsResolution = resolutionProgressMap.get(ServerType.OPERATIONS);
if (operationsResolution != null) {
operationsResolution.setResolutionTime(System.currentTimeMillis() + TimeUnit.MILLISECONDS.convert(operationsServersRetryPeriod, timeUnit));
}
return new FailoverDecision(FailoverAction.RETRY, operationsServersRetryPeriod, timeUnit);
case NO_CONNECTIVITY:
return new FailoverDecision(FailoverAction.RETRY, noConnectivityRetryPeriod, timeUnit);
default:
return new FailoverDecision(FailoverAction.NOOP);
accessPointIdResolution = resolutionProgressMap.get(ServerType.OPERATIONS);
resolutionTime += failoverStrategy.getTimeUnit().toMillis(failoverStrategy.getOperationServersRetryPeriod());
break;
}
if (accessPointIdResolution != null) {
accessPointIdResolution.setResolutionTime(resolutionTime);
}

return failoverStrategy.onFailover(failoverStatus);
}

private void cancelCurrentFailResolution(AccessPointIdResolution accessPointIdResolution) {
Expand All @@ -220,13 +200,13 @@ private void cancelCurrentFailResolution(AccessPointIdResolution accessPointIdRe

static class AccessPointIdResolution {
private int accessPointId;
private long resolutionTime; // in milliseconds
private long resolutionTimeMillis;
private Future<?> curResolution;

public AccessPointIdResolution(int accessPointId, Future<?> curResolution) {
this.accessPointId = accessPointId;
this.curResolution = curResolution;
this.resolutionTime = Long.MAX_VALUE;
this.resolutionTimeMillis = Long.MAX_VALUE;
}

public int getAccessPointId() {
Expand All @@ -242,11 +222,11 @@ public void setCurResolution(Future<?> curResolution) {
}

public long getResolutionTime() {
return resolutionTime;
return resolutionTimeMillis;
}

public void setResolutionTime(long resolutionTime) {
this.resolutionTime = resolutionTime;
public void setResolutionTime(long resolutionTimeMillis) {
this.resolutionTimeMillis = resolutionTimeMillis;
}

@Override
Expand Down Expand Up @@ -281,7 +261,7 @@ public int hashCode() {
public String toString() {
return "AccessPointIdResolution{" +
"accessPointId=" + accessPointId +
", resolutionTime=" + resolutionTime +
", resolutionTime=" + resolutionTimeMillis +
", curResolution=" + curResolution +
'}';
}
Expand Down
Expand Up @@ -47,6 +47,8 @@
import org.kaaproject.kaa.client.channel.TransportConnectionInfo;
import org.kaaproject.kaa.client.channel.TransportProtocolIdConstants;
import org.kaaproject.kaa.client.channel.connectivity.ConnectivityChecker;
import org.kaaproject.kaa.client.channel.failover.DefaultFailoverStrategy;
import org.kaaproject.kaa.client.channel.failover.FailoverStrategy;
import org.kaaproject.kaa.client.channel.impl.DefaultFailoverManager;
import org.kaaproject.kaa.client.context.ExecutorContext;
import org.kaaproject.kaa.client.transport.TransportException;
Expand Down Expand Up @@ -237,8 +239,9 @@ public void testOperationsServerInfoRetrieving() throws TransportException, NoSu

ChanelManagerMock channelManager = spy(new ChanelManagerMock());
when(executorContext.getScheduledExecutor()).thenReturn(Executors.newScheduledThreadPool(1));
FailoverStrategy strategy = new DefaultFailoverStrategy(1, 1, 1, TimeUnit.MILLISECONDS);
FailoverManager failoverManager =
spy(new DefaultFailoverManager(channelManager, executorContext, 1, 1, 1, 1, TimeUnit.MILLISECONDS));
spy(new DefaultFailoverManager(channelManager, executorContext, strategy, 1, TimeUnit.MILLISECONDS));

manager.setChannelManager(channelManager);
manager.setFailoverManager(failoverManager);
Expand Down
Expand Up @@ -33,6 +33,8 @@
import org.junit.Test;
import org.kaaproject.kaa.client.bootstrap.BootstrapManager;
import org.kaaproject.kaa.client.channel.connectivity.ConnectivityChecker;
import org.kaaproject.kaa.client.channel.failover.DefaultFailoverStrategy;
import org.kaaproject.kaa.client.channel.failover.FailoverStrategy;
import org.kaaproject.kaa.client.channel.impl.ChannelRuntimeException;
import org.kaaproject.kaa.client.channel.impl.DefaultChannelManager;
import org.kaaproject.kaa.client.channel.impl.DefaultFailoverManager;
Expand Down Expand Up @@ -184,7 +186,9 @@ public void testBootstrapServerFailed() throws NoSuchAlgorithmException, Invalid
ExecutorContext context = Mockito.mock(ExecutorContext.class);
Mockito.when(context.getScheduledExecutor()).thenReturn(Executors.newScheduledThreadPool(1));
KaaChannelManager channelManager = new DefaultChannelManager(bootstrapManager, bootststrapServers, context);
FailoverManager failoverManager = Mockito.spy(new DefaultFailoverManager(channelManager, CONTEXT, 1, 1, 1, 1, TimeUnit.MILLISECONDS));

FailoverStrategy failoverStrategy = new DefaultFailoverStrategy(1, 1, 1, TimeUnit.MILLISECONDS);
FailoverManager failoverManager = Mockito.spy(new DefaultFailoverManager(channelManager, CONTEXT, failoverStrategy, 1, TimeUnit.MILLISECONDS));
channelManager.setFailoverManager(failoverManager);

channelManager.addChannel(channel);
Expand Down
Expand Up @@ -29,6 +29,8 @@
import org.junit.Assert;
import org.junit.Test;
import org.kaaproject.kaa.client.AbstractKaaClient;
import org.kaaproject.kaa.client.channel.failover.DefaultFailoverStrategy;
import org.kaaproject.kaa.client.channel.failover.FailoverStrategy;
import org.kaaproject.kaa.client.channel.impl.DefaultFailoverManager;
import org.kaaproject.kaa.client.channel.impl.channels.DefaultOperationsChannel;
import org.kaaproject.kaa.client.context.ExecutorContext;
Expand Down Expand Up @@ -170,7 +172,8 @@ public void testServerFailed() throws Exception {
AbstractHttpClient httpClient = Mockito.mock(AbstractHttpClient.class);
ExecutorContext context = Mockito.mock(ExecutorContext.class);
Mockito.when(context.getScheduledExecutor()).thenReturn(Executors.newScheduledThreadPool(1));
FailoverManager flManager = new DefaultFailoverManager(manager, context, 100, 1, 1, 1, TimeUnit.MILLISECONDS);
FailoverStrategy failoverStrategy = new DefaultFailoverStrategy(1, 1, 1, TimeUnit.MILLISECONDS);
FailoverManager flManager = new DefaultFailoverManager(manager, context, failoverStrategy, 100, TimeUnit.MILLISECONDS);
FailoverManager failoverManager = Mockito.spy(flManager);
Mockito.when(
httpClient.executeHttpRequest(Mockito.anyString(),
Expand Down
Expand Up @@ -22,6 +22,8 @@
import org.kaaproject.kaa.client.channel.KaaChannelManager;
import org.kaaproject.kaa.client.channel.ServerType;
import org.kaaproject.kaa.client.channel.TransportConnectionInfo;
import org.kaaproject.kaa.client.channel.failover.DefaultFailoverStrategy;
import org.kaaproject.kaa.client.channel.failover.FailoverStrategy;
import org.kaaproject.kaa.client.context.ExecutorContext;
import org.mockito.ArgumentCaptor;
import org.mockito.Mockito;
Expand Down Expand Up @@ -50,7 +52,8 @@ public void setUp() {
channelManager = Mockito.mock(KaaChannelManager.class);
context = Mockito.mock(ExecutorContext.class);
Mockito.when(context.getScheduledExecutor()).thenReturn(Executors.newScheduledThreadPool(1));
failoverManager = new DefaultFailoverManager(channelManager, context, RESOLUTION_TIMEOUT_MS, BOOTSTRAP_RETRY_PERIOD, 1, 1, TimeUnit.MILLISECONDS);
FailoverStrategy failoverStrategy = new DefaultFailoverStrategy(BOOTSTRAP_RETRY_PERIOD, 1, 1, TimeUnit.MILLISECONDS);
failoverManager = new DefaultFailoverManager(channelManager, context, failoverStrategy, RESOLUTION_TIMEOUT_MS, TimeUnit.MILLISECONDS);
resolutionProgressMap = Mockito.spy(new HashMap<ServerType, DefaultFailoverManager.AccessPointIdResolution>());
ReflectionTestUtils.setField(failoverManager, "resolutionProgressMap", resolutionProgressMap);
}
Expand Down

0 comments on commit 9cbb274

Please sign in to comment.