Skip to content

Commit

Permalink
KAA-963 Add ability to handle new device management codes
Browse files Browse the repository at this point in the history
  • Loading branch information
abohomol committed Apr 12, 2016
1 parent c4821a6 commit 0606464
Show file tree
Hide file tree
Showing 17 changed files with 86 additions and 59 deletions.
Expand Up @@ -22,6 +22,7 @@
import org.kaaproject.kaa.client.channel.failover.FailoverManager;
import org.kaaproject.kaa.client.channel.KaaInternalChannelManager;
import org.kaaproject.kaa.client.channel.TransportProtocolId;
import org.kaaproject.kaa.client.channel.failover.FailoverStatus;
import org.kaaproject.kaa.client.transport.TransportException;
import org.kaaproject.kaa.common.endpoint.gen.ProtocolMetaData;

Expand All @@ -45,7 +46,7 @@ public interface BootstrapManager {
* @param transportId of the transport protocol.
* @see TransportProtocolId
*/
void useNextOperationsServer(TransportProtocolId transportId);
void useNextOperationsServer(TransportProtocolId transportId, FailoverStatus status);

/**
* Update the Channel Manager with endpoint's properties retrieved by its DNS.
Expand Down
Expand Up @@ -75,7 +75,7 @@ public void receiveOperationsServerList() throws TransportException {
}

@Override
public void useNextOperationsServer(TransportProtocolId transportId) {
public void useNextOperationsServer(TransportProtocolId transportId, FailoverStatus status) {
if (mappedOperationServerList != null && !mappedOperationServerList.isEmpty()) {
if (mappedIterators.get(transportId).hasNext()) {
ProtocolMetaData nextOperationsServer = mappedIterators.get(transportId).next();
Expand All @@ -90,8 +90,7 @@ public void useNextOperationsServer(TransportProtocolId transportId) {
}
} else {
LOG.warn("Failed to find server for channel [{}]", transportId);
FailoverDecision decision = failoverManager.onFailover(FailoverStatus.OPERATION_SERVERS_NA);
applyDecision(decision);
resolveFailoverStatus(status);
}
} else {
throw new BootstrapRuntimeException("Operations Server list is empty");
Expand Down Expand Up @@ -153,8 +152,7 @@ public synchronized void onProtocolListUpdated(List<ProtocolMetaData> list) {

if (operationsServerList == null || operationsServerList.isEmpty()) {
LOG.trace("Received empty operations server list");
FailoverDecision decision = failoverManager.onFailover(FailoverStatus.NO_OPERATION_SERVERS_RECEIVED);
applyDecision(decision);
resolveFailoverStatus(FailoverStatus.NO_OPERATION_SERVERS_RECEIVED);
return;
}

Expand Down Expand Up @@ -185,7 +183,8 @@ public synchronized void onProtocolListUpdated(List<ProtocolMetaData> list) {
}
}

private void applyDecision(FailoverDecision decision) {
private void resolveFailoverStatus(FailoverStatus status) {
FailoverDecision decision = failoverManager.onFailover(status);
switch (decision.getAction()) {
case NOOP:
LOG.warn("No operation is performed according to failover strategy decision");
Expand All @@ -208,7 +207,7 @@ public void run() {
case USE_NEXT_BOOTSTRAP:
LOG.warn("Trying to switch to the next bootstrap server according to failover strategy decision");
retryPeriod = decision.getRetryPeriod();
failoverManager.onServerFailed(channelManager.getActiveServer(TransportType.BOOTSTRAP));
failoverManager.onServerFailed(channelManager.getActiveServer(TransportType.BOOTSTRAP), status);
executorContext.getScheduledExecutor().schedule(new Runnable() {
@Override
public void run() {
Expand Down
Expand Up @@ -19,6 +19,7 @@
import java.util.List;

import org.kaaproject.kaa.client.channel.failover.FailoverManager;
import org.kaaproject.kaa.client.channel.failover.FailoverStatus;
import org.kaaproject.kaa.client.channel.impl.channels.DefaultBootstrapChannel;
import org.kaaproject.kaa.client.channel.impl.channels.DefaultOperationHttpChannel;
import org.kaaproject.kaa.client.channel.impl.channels.DefaultOperationsChannel;
Expand Down Expand Up @@ -87,7 +88,7 @@
* Call to {@link #clearChannelList()} removes <b>all</b> existing channels.<br>
* <br>
* If physical connection to remote server failed, call
* {@link #onServerFailed(TransportConnectionInfo)} to switch to another
* {@link #onServerFailed(TransportConnectionInfo, FailoverStatus)} to switch to another
* available server.
*
* @author Yaroslav Zeygerman
Expand Down Expand Up @@ -170,7 +171,7 @@ public interface KaaChannelManager {
* @see TransportConnectionInfo
*
*/
void onServerFailed(TransportConnectionInfo server);
void onServerFailed(TransportConnectionInfo server, FailoverStatus status);

/**
* Clears the list of channels.
Expand Down
Expand Up @@ -64,7 +64,7 @@ public DefaultFailoverManager(KaaChannelManager channelManager,
}

@Override
public synchronized void onServerFailed(final TransportConnectionInfo connectionInfo) {
public synchronized void onServerFailed(final TransportConnectionInfo connectionInfo, FailoverStatus status) {
if (connectionInfo == null) {
LOG.warn("Server failed, but connection info is null, can't resolve");
return;
Expand Down Expand Up @@ -99,7 +99,7 @@ public void run() {
}
}, failureResolutionTimeout, timeUnit);

channelManager.onServerFailed(connectionInfo);
channelManager.onServerFailed(connectionInfo, status);

long updatedResolutionTime = currentAccessPointIdResolution != null ? currentAccessPointIdResolution.getResolutionTime() : currentResolutionTime;

Expand Down
Expand Up @@ -27,12 +27,13 @@ public interface FailoverManager {
/**
* Needs to be invoked when a server fail occurs.
*
* @param connectionInfo
* the connection information of the failed server.
* @param connectionInfo the connection information of the failed server.
* @param status
*
*
* @see org.kaaproject.kaa.client.channel.TransportConnectionInfo
*/
void onServerFailed(TransportConnectionInfo connectionInfo);
void onServerFailed(TransportConnectionInfo connectionInfo, FailoverStatus status);

/**
* Needs to be invoked as soon as current server is changed.
Expand Down
Expand Up @@ -22,6 +22,7 @@
*/
public enum FailoverStatus {
ENDPOINT_VERIFICATION_FAILED,
CREDENTIALS_REVOKED,
BOOTSTRAP_SERVERS_NA,
CURRENT_BOOTSTRAP_SERVER_NA,
OPERATION_SERVERS_NA,
Expand Down
Expand Up @@ -73,6 +73,9 @@ public FailoverDecision onFailover(FailoverStatus failoverStatus) {
return new FailoverDecision(FailoverAction.RETRY, operationsServersRetryPeriod, timeUnit);
case NO_CONNECTIVITY:
return new FailoverDecision(FailoverAction.RETRY, noConnectivityRetryPeriod, timeUnit);
case ENDPOINT_VERIFICATION_FAILED:
case CREDENTIALS_REVOKED:
return new FailoverDecision(FailoverAction.STOP_APP);
default:
return new FailoverDecision(FailoverAction.NOOP);
}
Expand Down
Expand Up @@ -285,7 +285,7 @@ public synchronized void onTransportConnectionInfoUpdated(TransportConnectionInf
}

@Override
public synchronized void onServerFailed(final TransportConnectionInfo server) {
public synchronized void onServerFailed(final TransportConnectionInfo server, FailoverStatus status) {
if (isShutdown) {
LOG.warn("Can't process server failure. Channel manager is down");
return;
Expand Down Expand Up @@ -331,7 +331,7 @@ public void run() {
}
} else {
LOG.trace("Can't find next bootstrap server");
FailoverDecision decision = failoverManager.onFailover(FailoverStatus.BOOTSTRAP_SERVERS_NA);
FailoverDecision decision = failoverManager.onFailover(status);
switch (decision.getAction()) {
case NOOP:
LOG.warn("No operation is performed according to failover strategy decision");
Expand All @@ -356,7 +356,7 @@ public void run() {
}
}
} else {
bootstrapManager.useNextOperationsServer(server.getTransportId());
bootstrapManager.useNextOperationsServer(server.getTransportId(), status);
}
}

Expand Down
Expand Up @@ -34,6 +34,7 @@
import org.kaaproject.kaa.client.channel.TransportProtocolId;
import org.kaaproject.kaa.client.channel.TransportProtocolIdConstants;
import org.kaaproject.kaa.client.channel.connectivity.ConnectivityChecker;
import org.kaaproject.kaa.client.channel.failover.FailoverStatus;
import org.kaaproject.kaa.client.persistence.KaaClientState;
import org.kaaproject.kaa.client.transport.AbstractHttpClient;
import org.kaaproject.kaa.common.TransportType;
Expand All @@ -44,7 +45,8 @@ public abstract class AbstractHttpChannel implements KaaDataChannel {
public static final Logger LOG = LoggerFactory // NOSONAR
.getLogger(AbstractHttpChannel.class);

private static final int UNATHORIZED_HTTP_STATUS = 401;
private static final int UNAUTHORIZED_HTTP_STATUS = 401;
private static final int FORBIDDEN_HTTP_STATUS = 403;

private IPTransportInfo currentServer;
private final AbstractKaaClient client;
Expand Down Expand Up @@ -241,17 +243,22 @@ protected void connectionFailed(boolean failed) {
}

protected void connectionFailed(boolean failed, int status) {
FailoverStatus failoverStatus = FailoverStatus.OPERATION_SERVERS_NA;
switch (status) {
case UNATHORIZED_HTTP_STATUS:
case UNAUTHORIZED_HTTP_STATUS:
state.clean();
failoverStatus = FailoverStatus.ENDPOINT_VERIFICATION_FAILED;
break;
case FORBIDDEN_HTTP_STATUS:
failoverStatus = FailoverStatus.CREDENTIALS_REVOKED;
break;
default:
break;
}

lastConnectionFailed = failed;
if (failed) {
failoverManager.onServerFailed(currentServer);
failoverManager.onServerFailed(currentServer, failoverStatus);
} else {
failoverManager.onServerConnected(currentServer);
}
Expand Down
Expand Up @@ -119,14 +119,14 @@ public void onMessage(ConnAck message) {

if (message.getReturnCode() != ReturnCode.ACCEPTED) {
LOG.error("Connection for channel [{}] was rejected: {}", getId(), message.getReturnCode());
if (message.getReturnCode() == ReturnCode.REFUSE_BAD_CREDENTIALS) {
LOG.info("Cleaning client state");
state.clean();
}
if (message.getReturnCode() != ReturnCode.REFUSE_VERIFICATION_FAILED) {
onServerFailed();
} else {

LOG.info("Cleaning client state");
state.clean();

if (message.getReturnCode() == ReturnCode.REFUSE_VERIFICATION_FAILED) {
onServerFailed(FailoverStatus.ENDPOINT_VERIFICATION_FAILED);
} else {
onServerFailed();
}
}
}
Expand Down Expand Up @@ -183,11 +183,18 @@ public void onMessage(SyncResponse message) {
@Override
public void onMessage(Disconnect message) {
LOG.info("Disconnect message (reason={}) received for channel [{}]", message.getReason(), getId());
if (!message.getReason().equals(DisconnectReason.NONE)) {
LOG.error("Server error occurred: {}", message.getReason());
onServerFailed();
} else {
closeConnection();
switch (message.getReason()) {
case NONE:
closeConnection();
break;
case CREDENTIALS_REVOKED:
LOG.error("Endpoint credentials been revoked");
onServerFailed(FailoverStatus.CREDENTIALS_REVOKED);
break;
default:
LOG.error("Server error occurred: {}", message.getReason());
onServerFailed();
break;
}
}
};
Expand Down Expand Up @@ -393,7 +400,7 @@ private void onServerFailed(FailoverStatus status) {
break;
}
} else {
failoverManager.onServerFailed(currentServer);
failoverManager.onServerFailed(currentServer, status);
}
}

Expand Down
Expand Up @@ -39,6 +39,7 @@
import org.kaaproject.kaa.client.channel.TransportProtocolId;
import org.kaaproject.kaa.client.channel.TransportProtocolIdConstants;
import org.kaaproject.kaa.client.channel.connectivity.ConnectivityChecker;
import org.kaaproject.kaa.client.channel.failover.FailoverStatus;
import org.kaaproject.kaa.client.channel.impl.channels.polling.CancelableCommandRunnable;
import org.kaaproject.kaa.client.channel.impl.channels.polling.CancelableRunnable;
import org.kaaproject.kaa.client.channel.impl.channels.polling.CancelableScheduledFuture;
Expand Down Expand Up @@ -206,7 +207,7 @@ public void onServerError(TransportConnectionInfo info) {
synchronized (this) {
stopPollScheduler(false);
}
failoverManager.onServerFailed(info);
failoverManager.onServerFailed(info, FailoverStatus.NO_CONNECTIVITY);
} else {
LOG.debug("Channel [{}] connection aborted", getId());
}
Expand Down
Expand Up @@ -30,6 +30,7 @@
import org.kaaproject.kaa.client.channel.KaaChannelManager;
import org.kaaproject.kaa.client.channel.LogTransport;
import org.kaaproject.kaa.client.channel.TransportConnectionInfo;
import org.kaaproject.kaa.client.channel.failover.FailoverStatus;
import org.kaaproject.kaa.client.context.ExecutorContext;
import org.kaaproject.kaa.client.logging.future.RecordFuture;
import org.kaaproject.kaa.client.logging.memory.MemLogStorage;
Expand Down Expand Up @@ -305,7 +306,7 @@ private class DefaultLogUploadController implements LogFailoverCommand {
public void switchAccessPoint() {
TransportConnectionInfo server = channelManager.getActiveServer(TransportType.LOGGING);
if (server != null) {
failoverManager.onServerFailed(server);
failoverManager.onServerFailed(server, FailoverStatus.OPERATION_SERVERS_NA);
} else {
LOG.warn("Failed to switch Operation server. No channel is used for logging transport");
}
Expand Down
Expand Up @@ -547,8 +547,8 @@ public void setAttachedToUser(boolean isAttached) {

@Override
public void clean() {
state.setProperty(IS_REGISTERED, Boolean.FALSE.toString());
state.setProperty(NEED_PROFILE_RESYNC, Boolean.FALSE.toString());
setRegistered(false);
setIfNeedProfileResync(false);
saveFileDelete(stateFileLocation);
saveFileDelete(stateFileLocation + "_bckp");
hasUpdate = true;
Expand Down
Expand Up @@ -47,6 +47,7 @@
import org.kaaproject.kaa.client.channel.TransportConnectionInfo;
import org.kaaproject.kaa.client.channel.TransportProtocolIdConstants;
import org.kaaproject.kaa.client.channel.connectivity.ConnectivityChecker;
import org.kaaproject.kaa.client.channel.failover.FailoverStatus;
import org.kaaproject.kaa.client.channel.failover.strategies.DefaultFailoverStrategy;
import org.kaaproject.kaa.client.channel.failover.strategies.FailoverStrategy;
import org.kaaproject.kaa.client.channel.failover.DefaultFailoverManager;
Expand Down Expand Up @@ -99,7 +100,7 @@ public KaaDataChannel getChannel(String id) {
}

@Override
public void onServerFailed(TransportConnectionInfo server) {
public void onServerFailed(TransportConnectionInfo server, FailoverStatus status) {

}

Expand Down Expand Up @@ -201,7 +202,7 @@ public void testReceiveOperationsServerList() throws TransportException {
boolean exception = false;
try {
manager.receiveOperationsServerList();
manager.useNextOperationsServer(TransportProtocolIdConstants.HTTP_TRANSPORT_ID);
manager.useNextOperationsServer(TransportProtocolIdConstants.HTTP_TRANSPORT_ID, FailoverStatus.NO_CONNECTIVITY);
} catch (BootstrapRuntimeException e) {
exception = true;
}
Expand All @@ -219,7 +220,7 @@ public void testOperationsServerInfoRetrieving() throws TransportException, NoSu

boolean exception = false;
try {
manager.useNextOperationsServer(TransportProtocolIdConstants.HTTP_TRANSPORT_ID);
manager.useNextOperationsServer(TransportProtocolIdConstants.HTTP_TRANSPORT_ID, FailoverStatus.NO_CONNECTIVITY);
} catch (BootstrapRuntimeException e) {
exception = true;
}
Expand Down Expand Up @@ -247,7 +248,7 @@ public void testOperationsServerInfoRetrieving() throws TransportException, NoSu
manager.setFailoverManager(failoverManager);
manager.setTransport(transport);
manager.onProtocolListUpdated(list);
manager.useNextOperationsServer(TransportProtocolIdConstants.HTTP_TRANSPORT_ID);
manager.useNextOperationsServer(TransportProtocolIdConstants.HTTP_TRANSPORT_ID, FailoverStatus.NO_CONNECTIVITY);
assertTrue(channelManager.isServerUpdated());
assertEquals("http://localhost:9889", channelManager.getReceivedUrl());

Expand Down
Expand Up @@ -165,8 +165,9 @@ public void testOperationServerFailed() throws NoSuchAlgorithmException, Invalid
ServerType.OPERATIONS, TransportProtocolIdConstants.HTTP_TRANSPORT_ID, "localhost", 9999, KeyUtil.generateKeyPair().getPublic());
channelManager.onTransportConnectionInfoUpdated(opServer);

channelManager.onServerFailed(opServer);
Mockito.verify(bootstrapManager, Mockito.times(1)).useNextOperationsServer(TransportProtocolIdConstants.HTTP_TRANSPORT_ID);
channelManager.onServerFailed(opServer, FailoverStatus.NO_CONNECTIVITY);
Mockito.verify(bootstrapManager, Mockito.times(1))
.useNextOperationsServer(TransportProtocolIdConstants.HTTP_TRANSPORT_ID, FailoverStatus.NO_CONNECTIVITY);
}

@Test
Expand Down Expand Up @@ -197,7 +198,8 @@ public void testBootstrapServerFailed() throws NoSuchAlgorithmException, Invalid

Mockito.verify(failoverManager, Mockito.times(1)).onServerChanged(Mockito.any(TransportConnectionInfo.class));

channelManager.onServerFailed(bootststrapServers.get(TransportProtocolIdConstants.HTTP_TRANSPORT_ID).get(0));
channelManager.onServerFailed(bootststrapServers.get(TransportProtocolIdConstants.HTTP_TRANSPORT_ID).get(0),
FailoverStatus.CURRENT_BOOTSTRAP_SERVER_NA);
new Thread(new Runnable() {
@Override
public void run() {
Expand Down Expand Up @@ -229,7 +231,8 @@ public void testSingleBootstrapServerFailed() throws NoSuchAlgorithmException, I

Mockito.verify(failoverManager, Mockito.times(1)).onServerChanged(Mockito.any(TransportConnectionInfo.class));

channelManager.onServerFailed(bootststrapServers.get(TransportProtocolIdConstants.HTTP_TRANSPORT_ID).get(0));
channelManager.onServerFailed(bootststrapServers.get(TransportProtocolIdConstants.HTTP_TRANSPORT_ID).get(0),
FailoverStatus.CURRENT_BOOTSTRAP_SERVER_NA);
}

@Test
Expand Down Expand Up @@ -396,7 +399,7 @@ public void testShutdown() throws NoSuchAlgorithmException, InvalidKeySpecExcept
channelManager.addChannel(channel);

channelManager.shutdown();
channelManager.onServerFailed(null);
channelManager.onServerFailed(null, FailoverStatus.BOOTSTRAP_SERVERS_NA);
channelManager.onTransportConnectionInfoUpdated(null);
channelManager.addChannel(null);
channelManager.setChannel(null, null);
Expand Down

0 comments on commit 0606464

Please sign in to comment.