Skip to content

Commit

Permalink
Merge remote-tracking branch 'origin/develop' into feature/KAA-876
Browse files Browse the repository at this point in the history
  • Loading branch information
ashvayka committed Apr 6, 2016
2 parents 64cd7dc + c921a37 commit 57ddd99
Show file tree
Hide file tree
Showing 42 changed files with 867 additions and 249 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@
import org.kaaproject.kaa.client.channel.BootstrapTransport;
import org.kaaproject.kaa.client.channel.ConfigurationTransport;
import org.kaaproject.kaa.client.channel.EventTransport;
import org.kaaproject.kaa.client.channel.FailoverManager;
import org.kaaproject.kaa.client.channel.failover.FailoverManager;
import org.kaaproject.kaa.client.channel.KaaChannelManager;
import org.kaaproject.kaa.client.channel.KaaDataChannel;
import org.kaaproject.kaa.client.channel.KaaInternalChannelManager;
Expand All @@ -42,9 +42,10 @@
import org.kaaproject.kaa.client.channel.TransportConnectionInfo;
import org.kaaproject.kaa.client.channel.TransportProtocolId;
import org.kaaproject.kaa.client.channel.UserTransport;
import org.kaaproject.kaa.client.channel.failover.strategies.FailoverStrategy;
import org.kaaproject.kaa.client.channel.impl.DefaultBootstrapDataProcessor;
import org.kaaproject.kaa.client.channel.impl.DefaultChannelManager;
import org.kaaproject.kaa.client.channel.impl.DefaultFailoverManager;
import org.kaaproject.kaa.client.channel.failover.DefaultFailoverManager;
import org.kaaproject.kaa.client.channel.impl.DefaultOperationDataProcessor;
import org.kaaproject.kaa.client.channel.impl.channels.DefaultBootstrapChannel;
import org.kaaproject.kaa.client.channel.impl.channels.DefaultOperationTcpChannel;
Expand Down Expand Up @@ -574,6 +575,11 @@ public void setLogDeliveryListener(LogDeliveryListener listener) {
logCollector.setLogDeliveryListener(listener);
}

@Override
public void setFailoverStrategy(FailoverStrategy failoverStrategy) {
failoverManager.setFailoverStrategy(failoverStrategy);
}

protected TransportContext buildTransportContext(KaaClientProperties properties, KaaClientState kaaClientState) {
BootstrapTransport bootstrapTransport = buildBootstrapTransport(properties, kaaClientState);
ProfileTransport profileTransport = buildProfileTransport();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@

import org.kaaproject.kaa.client.channel.KaaChannelManager;
import org.kaaproject.kaa.client.channel.KaaDataChannel;
import org.kaaproject.kaa.client.channel.failover.strategies.FailoverStrategy;
import org.kaaproject.kaa.client.configuration.base.ConfigurationListener;
import org.kaaproject.kaa.client.configuration.storage.ConfigurationStorage;
import org.kaaproject.kaa.client.event.EndpointAccessToken;
Expand Down Expand Up @@ -599,4 +600,11 @@ public interface GenericKaaClient {
* @see org.kaaproject.kaa.client.logging.LogDeliveryListener
*/
void setLogDeliveryListener(LogDeliveryListener listener);

/**
* @param failoverStrategy strategy that will be used to resolve failovers.
*
* @see org.kaaproject.kaa.client.channel.failover.strategies.FailoverStrategy
*/
void setFailoverStrategy(FailoverStrategy failoverStrategy);
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
import java.util.List;

import org.kaaproject.kaa.client.channel.BootstrapTransport;
import org.kaaproject.kaa.client.channel.FailoverManager;
import org.kaaproject.kaa.client.channel.failover.FailoverManager;
import org.kaaproject.kaa.client.channel.KaaInternalChannelManager;
import org.kaaproject.kaa.client.channel.TransportProtocolId;
import org.kaaproject.kaa.client.transport.TransportException;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,9 @@
import java.util.concurrent.TimeUnit;

import org.kaaproject.kaa.client.channel.BootstrapTransport;
import org.kaaproject.kaa.client.channel.FailoverDecision;
import org.kaaproject.kaa.client.channel.FailoverManager;
import org.kaaproject.kaa.client.channel.FailoverStatus;
import org.kaaproject.kaa.client.channel.failover.FailoverDecision;
import org.kaaproject.kaa.client.channel.failover.FailoverManager;
import org.kaaproject.kaa.client.channel.failover.FailoverStatus;
import org.kaaproject.kaa.client.channel.GenericTransportInfo;
import org.kaaproject.kaa.client.channel.KaaInternalChannelManager;
import org.kaaproject.kaa.client.channel.ServerType;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

import java.util.List;

import org.kaaproject.kaa.client.channel.failover.FailoverManager;
import org.kaaproject.kaa.client.channel.impl.channels.DefaultBootstrapChannel;
import org.kaaproject.kaa.client.channel.impl.channels.DefaultOperationHttpChannel;
import org.kaaproject.kaa.client.channel.impl.channels.DefaultOperationsChannel;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,15 +14,13 @@
* limitations under the License.
*/

package org.kaaproject.kaa.client.channel.impl;
package org.kaaproject.kaa.client.channel.failover;

import org.kaaproject.kaa.client.channel.FailoverDecision;
import org.kaaproject.kaa.client.channel.FailoverDecision.FailoverAction;
import org.kaaproject.kaa.client.channel.FailoverManager;
import org.kaaproject.kaa.client.channel.FailoverStatus;
import org.kaaproject.kaa.client.channel.KaaChannelManager;
import org.kaaproject.kaa.client.channel.ServerType;
import org.kaaproject.kaa.client.channel.TransportConnectionInfo;
import org.kaaproject.kaa.client.channel.failover.strategies.DefaultFailoverStrategy;
import org.kaaproject.kaa.client.channel.failover.strategies.FailoverStrategy;
import org.kaaproject.kaa.client.context.ExecutorContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -33,45 +31,34 @@
import java.util.concurrent.TimeUnit;

public class DefaultFailoverManager implements FailoverManager {

private static final Logger LOG = LoggerFactory.getLogger(DefaultFailoverManager.class);

// all timeout values are specified in seconds
private static final long DEFAULT_FAILURE_RESOLUTION_TIMEOUT = 10;
private static final long DEFAULT_BOOTSTRAP_SERVERS_RETRY_PERIOD = 2;
private static final long DEFAULT_OPERATION_SERVERS_RETRY_PERIOD = 2;
private static final long DEFAULT_NO_CONNECTIVITY_RETRY_PERIOD = 5;
private static final TimeUnit DEFAULT_TIMEUNIT = TimeUnit.SECONDS;
private static final TimeUnit DEFAULT_TIME_UNIT = TimeUnit.SECONDS;

private long failureResolutionTimeout;
private long bootstrapServersRetryPeriod;
private long operationsServersRetryPeriod;
private long noConnectivityRetryPeriod;
private TimeUnit timeUnit = DEFAULT_TIMEUNIT;
private TimeUnit timeUnit = DEFAULT_TIME_UNIT;

private FailoverStrategy failoverStrategy;

private final KaaChannelManager channelManager;
private final ExecutorContext context;
private Map<ServerType, AccessPointIdResolution> resolutionProgressMap = new HashMap<>();

public DefaultFailoverManager(KaaChannelManager channelManager, ExecutorContext context) {
this(channelManager,
context,
DEFAULT_FAILURE_RESOLUTION_TIMEOUT,
DEFAULT_BOOTSTRAP_SERVERS_RETRY_PERIOD,
DEFAULT_OPERATION_SERVERS_RETRY_PERIOD,
DEFAULT_NO_CONNECTIVITY_RETRY_PERIOD,
DEFAULT_TIMEUNIT);
this(channelManager, context, new DefaultFailoverStrategy(), DEFAULT_FAILURE_RESOLUTION_TIMEOUT, DEFAULT_TIME_UNIT);
}

public DefaultFailoverManager(KaaChannelManager channelManager, ExecutorContext context,
long failureResolutionTimeout, long bootstrapServersRetryPeriod,
long operationsServersRetryPeriod,
long noConnectivityRetryPeriod, TimeUnit timeUnit) {
public DefaultFailoverManager(KaaChannelManager channelManager,
ExecutorContext context,
FailoverStrategy failoverStrategy,
long failureResolutionTimeout,
TimeUnit timeUnit) {
this.channelManager = channelManager;
this.context = context;
this.failoverStrategy = failoverStrategy;
this.failureResolutionTimeout = failureResolutionTimeout;
this.bootstrapServersRetryPeriod = bootstrapServersRetryPeriod;
this.operationsServersRetryPeriod = operationsServersRetryPeriod;
this.noConnectivityRetryPeriod = noConnectivityRetryPeriod;
this.timeUnit = timeUnit;
}

Expand Down Expand Up @@ -158,6 +145,8 @@ public synchronized void onServerConnected(TransportConnectionInfo connectionInf
return;
}

failoverStrategy.onRecover(connectionInfo);

AccessPointIdResolution accessPointIdResolution = resolutionProgressMap.get(connectionInfo.getServerType());
if (accessPointIdResolution == null) {
LOG.trace("Server hasn't been set yet (failover resolution has happened), so a new server: {} can't be connected", connectionInfo);
Expand All @@ -174,41 +163,40 @@ public synchronized void onServerConnected(TransportConnectionInfo connectionInf
}
}

@Override
public void setFailoverStrategy(FailoverStrategy failoverStrategy) {
if (failoverStrategy == null) {
throw new IllegalArgumentException("Failover strategy can't be null");
}

this.failoverStrategy = failoverStrategy;
}

@Override
public synchronized FailoverDecision onFailover(FailoverStatus failoverStatus) {
LOG.trace("Applying failover strategy for status: {}", failoverStatus);
AccessPointIdResolution accessPointIdResolution = null;
long resolutionTime = System.currentTimeMillis();
switch (failoverStatus) {
case ENDPOINT_VERIFICATION_FAILED:
return new FailoverDecision(FailoverAction.STOP_APP);
case BOOTSTRAP_SERVERS_NA:
AccessPointIdResolution bootstrapResolution = resolutionProgressMap.get(ServerType.BOOTSTRAP);
if (bootstrapResolution != null) {
bootstrapResolution.setResolutionTime(System.currentTimeMillis() + TimeUnit.MILLISECONDS.convert(bootstrapServersRetryPeriod, timeUnit));
}
return new FailoverDecision(FailoverAction.RETRY, bootstrapServersRetryPeriod, timeUnit);
case CURRENT_BOOTSTRAP_SERVER_NA:
bootstrapResolution = resolutionProgressMap.get(ServerType.BOOTSTRAP);
if (bootstrapResolution != null) {
bootstrapResolution.setResolutionTime(System.currentTimeMillis() + TimeUnit.MILLISECONDS.convert(bootstrapServersRetryPeriod, timeUnit));
}
return new FailoverDecision(FailoverAction.USE_NEXT_BOOTSTRAP, bootstrapServersRetryPeriod, timeUnit);
accessPointIdResolution = resolutionProgressMap.get(ServerType.BOOTSTRAP);
resolutionTime += failoverStrategy.getTimeUnit().toMillis(failoverStrategy.getBootstrapServersRetryPeriod());
break;
case NO_OPERATION_SERVERS_RECEIVED:
bootstrapResolution = resolutionProgressMap.get(ServerType.BOOTSTRAP);
if (bootstrapResolution != null) {
bootstrapResolution.setResolutionTime(System.currentTimeMillis());
}
return new FailoverDecision(FailoverAction.USE_NEXT_BOOTSTRAP, bootstrapServersRetryPeriod, timeUnit);
accessPointIdResolution = resolutionProgressMap.get(ServerType.BOOTSTRAP);
break;
case OPERATION_SERVERS_NA:
AccessPointIdResolution operationsResolution = resolutionProgressMap.get(ServerType.OPERATIONS);
if (operationsResolution != null) {
operationsResolution.setResolutionTime(System.currentTimeMillis() + TimeUnit.MILLISECONDS.convert(operationsServersRetryPeriod, timeUnit));
}
return new FailoverDecision(FailoverAction.RETRY, operationsServersRetryPeriod, timeUnit);
case NO_CONNECTIVITY:
return new FailoverDecision(FailoverAction.RETRY, noConnectivityRetryPeriod, timeUnit);
default:
return new FailoverDecision(FailoverAction.NOOP);
accessPointIdResolution = resolutionProgressMap.get(ServerType.OPERATIONS);
resolutionTime += failoverStrategy.getTimeUnit().toMillis(failoverStrategy.getOperationServersRetryPeriod());
break;
}
if (accessPointIdResolution != null) {
accessPointIdResolution.setResolutionTime(resolutionTime);
}

return failoverStrategy.onFailover(failoverStatus);
}

private void cancelCurrentFailResolution(AccessPointIdResolution accessPointIdResolution) {
Expand All @@ -222,13 +210,13 @@ private void cancelCurrentFailResolution(AccessPointIdResolution accessPointIdRe

static class AccessPointIdResolution {
private int accessPointId;
private long resolutionTime; // in milliseconds
private long resolutionTimeMillis;
private Future<?> curResolution;

public AccessPointIdResolution(int accessPointId, Future<?> curResolution) {
this.accessPointId = accessPointId;
this.curResolution = curResolution;
this.resolutionTime = Long.MAX_VALUE;
this.resolutionTimeMillis = Long.MAX_VALUE;
}

public int getAccessPointId() {
Expand All @@ -244,11 +232,11 @@ public void setCurResolution(Future<?> curResolution) {
}

public long getResolutionTime() {
return resolutionTime;
return resolutionTimeMillis;
}

public void setResolutionTime(long resolutionTime) {
this.resolutionTime = resolutionTime;
public void setResolutionTime(long resolutionTimeMillis) {
this.resolutionTimeMillis = resolutionTimeMillis;
}

@Override
Expand Down Expand Up @@ -283,7 +271,7 @@ public int hashCode() {
public String toString() {
return "AccessPointIdResolution{" +
"accessPointId=" + accessPointId +
", resolutionTime=" + resolutionTime +
", resolutionTime=" + resolutionTimeMillis +
", curResolution=" + curResolution +
'}';
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,14 +14,14 @@
* limitations under the License.
*/

package org.kaaproject.kaa.client.channel;
package org.kaaproject.kaa.client.channel.failover;

import java.util.concurrent.TimeUnit;

/**
* Class that describes a decision which is made by a failover manager,
* which corresponds to a failover strategy
* @see org.kaaproject.kaa.client.channel.FailoverManager
* @see FailoverManager
*/
public class FailoverDecision {
private final FailoverAction action;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,10 @@
* limitations under the License.
*/

package org.kaaproject.kaa.client.channel;
package org.kaaproject.kaa.client.channel.failover;

import org.kaaproject.kaa.client.channel.TransportConnectionInfo;
import org.kaaproject.kaa.client.channel.failover.strategies.FailoverStrategy;

/**
* Manager is responsible for managing current server's failover/connection events
Expand Down Expand Up @@ -60,8 +63,15 @@ public interface FailoverManager {
*
* @return decision which is meant to resolve the failover.
*
* @see org.kaaproject.kaa.client.channel.FailoverDecision
* @see org.kaaproject.kaa.client.channel.FailoverStatus
* @see FailoverDecision
* @see FailoverStatus
*/
FailoverDecision onFailover(FailoverStatus failoverStatus);

/**
* @param failoverStrategy strategy that will be used to resolve failovers.
*
* @see org.kaaproject.kaa.client.channel.failover.strategies.FailoverStrategy
*/
void setFailoverStrategy(FailoverStrategy failoverStrategy);
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
* limitations under the License.
*/

package org.kaaproject.kaa.client.channel;
package org.kaaproject.kaa.client.channel.failover;

/**
* Enum which describes status of the current failover state. Managed by
Expand Down

0 comments on commit 57ddd99

Please sign in to comment.