Skip to content

Commit

Permalink
Merge branch 'master' of github.com:kaaproject/kaa
Browse files Browse the repository at this point in the history
  • Loading branch information
Yaroslav Zeygerman committed Feb 25, 2015
2 parents a50c9b1 + f63e38a commit 7156062
Show file tree
Hide file tree
Showing 9 changed files with 101 additions and 91 deletions.
Expand Up @@ -24,6 +24,8 @@
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

import org.kaaproject.kaa.client.bootstrap.BootstrapManager;
import org.kaaproject.kaa.client.bootstrap.DefaultBootstrapManager;
Expand Down Expand Up @@ -97,16 +99,6 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.nio.ByteBuffer;
import java.security.GeneralSecurityException;
import java.security.PrivateKey;
import java.security.PublicKey;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

/**
* <p>
* Abstract class that holds general elements of Kaa library.
Expand Down Expand Up @@ -141,9 +133,12 @@ public abstract class AbstractKaaClient implements GenericKaaClient {
private static final Logger LOG = LoggerFactory.getLogger(AbstractKaaClient.class);
private static final long LONG_POLL_TIMEOUT = 60000L;

private boolean isInitialized = false;
private volatile boolean isInitialized = false;

protected final ConfigurationManager configurationManager;
protected final AbstractLogCollector logCollector;

private final ExecutorService lifecycleTasksExecutor = Executors.newSingleThreadExecutor();

private final DefaultNotificationManager notificationManager;
private final DefaultProfileManager profileManager;
Expand All @@ -155,7 +150,6 @@ public abstract class AbstractKaaClient implements GenericKaaClient {
private final EventFamilyFactory eventFamilyFactory;

private final DefaultEndpointRegistrationManager endpointRegistrationManager;
protected final AbstractLogCollector logCollector;

private final Map<TransportType, KaaTransport> transports = new HashMap<TransportType, KaaTransport>();
private final DefaultOperationDataProcessor operationsDataProcessor = new DefaultOperationDataProcessor();
Expand Down Expand Up @@ -239,7 +233,7 @@ public abstract class AbstractKaaClient implements GenericKaaClient {
bootstrapTransport.setBootstrapManager(bootstrapManager);

configurationManager = new ResyncConfigurationManager(properties);

transports.put(TransportType.BOOTSTRAP, bootstrapTransport);
profileTransport.setProfileManager(profileManager);
profileTransport.setClientProperties(this.properties);
Expand All @@ -250,7 +244,8 @@ public abstract class AbstractKaaClient implements GenericKaaClient {
transports.put(TransportType.NOTIFICATION, notificationTransport);
configurationTransport.setConfigurationHashContainer(configurationManager.getConfigurationHashContainer());
configurationTransport.setConfigurationProcessor(configurationManager.getConfigurationProcessor());
//TODO: this should be part of properties and provided by user during SDK generation
// TODO: this should be part of properties and provided by user during
// SDK generation
configurationTransport.setResyncOnly(true);
transports.put(TransportType.CONFIGURATION, configurationTransport);
userTransport.setEndpointRegistrationProcessor(endpointRegistrationManager);
Expand All @@ -273,73 +268,101 @@ public AbstractHttpClient createHttpClient(String url, PrivateKey privateKey, Pu

@Override
public void start() {
try {
if (!isInitialized) {
isInitialized = true;
} else {
LOG.warn("Client is already initialized!");
return;
}
//Load configuration
configurationManager.init();
bootstrapManager.receiveOperationsServerList();
if (stateListener != null) {
stateListener.onStarted();
}
} catch (TransportException e) {
if (stateListener != null) {
stateListener.onStartFailure(new KaaClusterConnectionException(e));
lifecycleTasksExecutor.submit(new Runnable() {
@Override
public void run() {
LOG.debug("Client startup initiated");
try {
if (!isInitialized) {
isInitialized = true;
} else {
LOG.warn("Client is already initialized!");
return;
}
// Load configuration
configurationManager.init();
bootstrapManager.receiveOperationsServerList();
if (stateListener != null) {
stateListener.onStarted();
}
} catch (TransportException e) {
LOG.error("Start failed", e);
if (stateListener != null) {
stateListener.onStartFailure(new KaaClusterConnectionException(e));
}
} catch (KaaRuntimeException e) {
LOG.error("Start failed", e);
if (stateListener != null) {
stateListener.onStartFailure(new KaaException(e));
}
}
}
} catch (KaaRuntimeException e) {
if (stateListener != null) {
stateListener.onStartFailure(new KaaException(e));
}
}
});
}

@Override
public void stop() {
try {
kaaClientState.persist();
channelManager.shutdown();
isInitialized = false;
if (stateListener != null) {
stateListener.onStopped();
}
} catch (Exception e) {
if (stateListener != null) {
stateListener.onStopFailure(new KaaException(e));
lifecycleTasksExecutor.submit(new Runnable() {
@Override
public void run() {
try {
logCollector.stop();
kaaClientState.persist();
channelManager.shutdown();
isInitialized = false;
if (stateListener != null) {
stateListener.onStopped();
}
} catch (Exception e) {
LOG.error("Stop failed", e);
if (stateListener != null) {
stateListener.onStopFailure(new KaaException(e));
}
}
}
}
});
lifecycleTasksExecutor.shutdown();
}

@Override
public void pause() {
try {
kaaClientState.persist();
channelManager.pause();
if (stateListener != null) {
stateListener.onPaused();
}
} catch (Exception e) {
if (stateListener != null) {
stateListener.onPauseFailure(new KaaException(e));
lifecycleTasksExecutor.submit(new Runnable() {
@Override
public void run() {
try {
kaaClientState.persist();
channelManager.pause();
if (stateListener != null) {
stateListener.onPaused();
}
} catch (Exception e) {
LOG.error("Pause failed", e);
if (stateListener != null) {
stateListener.onPauseFailure(new KaaException(e));
}
}
}
}
});
}

@Override
public void resume() {
try {
channelManager.resume();
if (stateListener != null) {
stateListener.onResume();
lifecycleTasksExecutor.submit(new Runnable() {
@Override
public void run() {
try {
channelManager.resume();
if (stateListener != null) {
stateListener.onResume();
}
} catch (Exception e) {
LOG.error("Resume failed", e);
if (stateListener != null) {
stateListener.onResumeFailure(new KaaException(e));
}
}
}
} catch (Exception e) {
if (stateListener != null) {
stateListener.onResumeFailure(new KaaException(e));
}
}
});
}

@Override
Expand All @@ -351,7 +374,7 @@ public void setProfileContainer(ProfileContainer container) {
public void updateProfile() {
this.profileManager.updateProfile();
}

@Override
public void setConfigurationStorage(ConfigurationStorage storage) {
this.configurationManager.setConfigurationStorage(storage);
Expand Down
Expand Up @@ -80,7 +80,6 @@ public void setSchemaProcessor(SchemaProcessor processor) {
public ConfigurationSyncRequest createConfigurationRequest() {
if (clientState != null && hashContainer != null) {
EndpointObjectHash hash = hashContainer.getConfigurationHash();
clientState.setConfigurationHash(hash);
ConfigurationSyncRequest request = new ConfigurationSyncRequest();
if (hash != null) {
request.setConfigurationHash(ByteBuffer.wrap(hash.getData()));
Expand Down
Expand Up @@ -37,7 +37,7 @@ public abstract class AbstractConfigurationManager implements ConfigurationManag
private final KaaClientProperties properties;
protected final ConfigurationDeserializer deserializer = new ConfigurationDeserializer();

private byte[] configurationData;
private volatile byte[] configurationData;
private ConfigurationStorage storage;
private ConfigurationHashContainer container;

Expand Down
Expand Up @@ -133,6 +133,11 @@ public synchronized void onLogResponse(LogSyncResponse logSyncResponse) throws I
processUploadDecision(strategy.isUploadNeeded(storage.getStatus()));
}
}

@Override
public void stop() {
scheduler.shutdown();
}

private void processUploadDecision(LogUploadStrategyDecision decision) {
switch (decision) {
Expand Down
Expand Up @@ -50,5 +50,5 @@ public synchronized void addLogRecord(Log record) {
uploadIfNeeded();
}
}

}
Expand Up @@ -54,4 +54,9 @@ public interface GenericLogCollector {
* User-defined log upload strategy object.
*/
void setStrategy(LogUploadStrategy strategy);

/**
* Stops and/or cleanup resources.
*/
void stop();
}
Expand Up @@ -56,7 +56,6 @@ public class KaaClientPropertiesState implements KaaClientState {
private static final String APP_STATE_SEQ_NUMBER = "APP_STATE_SEQ_NUMBER";
private static final String CONFIG_SEQ_NUMBER = "CONFIG_SEQ_NUMBER";
private static final String NOTIFICATION_SEQ_NUMBER = "NOTIFICATION_SEQ_NUMBER";
private static final String CONFIGURATION_HASH = "CONFIGURATION_HASH";
private static final String PROFILE_HASH = "PROFILE_HASH";
private static final String ENDPOINT_ACCESS_TOKEN = "ENDPOINT_TOKEN";

Expand Down Expand Up @@ -359,11 +358,6 @@ public int getAppStateSeqNumber() {
return Integer.parseInt(state.getProperty(APP_STATE_SEQ_NUMBER, "1"));
}

@Override
public EndpointObjectHash getConfigurationHash() {
return EndpointObjectHash.fromBytes(base64.decodeBase64(state.getProperty(CONFIGURATION_HASH, new String(base64.encodeBase64(new byte[0]), Charsets.UTF_8)).getBytes(Charsets.UTF_8)));
}

@Override
public EndpointObjectHash getProfileHash() {
return EndpointObjectHash.fromBytes(base64.decodeBase64(state.getProperty(PROFILE_HASH, new String(base64.encodeBase64(new byte[0]), Charsets.UTF_8)).getBytes(Charsets.UTF_8)));
Expand All @@ -374,11 +368,6 @@ public void setAppStateSeqNumber(int appStateSeqNumber) {
state.setProperty(APP_STATE_SEQ_NUMBER, Integer.toString(appStateSeqNumber));
}

@Override
public void setConfigurationHash(EndpointObjectHash hash) {
state.setProperty(CONFIGURATION_HASH, new String(base64.encodeBase64(hash.getData()), Charsets.UTF_8));
}

@Override
public void setProfileHash(EndpointObjectHash hash) {
state.setProperty(PROFILE_HASH, new String(base64.encodeBase64(hash.getData()), Charsets.UTF_8));
Expand Down
Expand Up @@ -45,9 +45,6 @@ public interface KaaClientState {
void setNotificationSeqNumber(int notificationSeqNumber);
int getNotificationSeqNumber();

void setConfigurationHash(EndpointObjectHash hash);
EndpointObjectHash getConfigurationHash();

void setProfileHash(EndpointObjectHash hash);
EndpointObjectHash getProfileHash();

Expand Down
Expand Up @@ -85,14 +85,6 @@ public void testProfileHash() throws IOException {
assertEquals(hash, state.getProfileHash());
}

@Test
public void testConfigHash() throws IOException {
KaaClientState state = new KaaClientPropertiesState(new FilePersistentStorage(), CommonsBase64.getInstance(), getProperties());
EndpointObjectHash hash = EndpointObjectHash.fromSHA1(new byte[]{1, 2, 3});
state.setConfigurationHash(hash);
assertEquals(hash, state.getConfigurationHash());
}

@Test
public void testNfSubscription() throws IOException {
KaaClientState state = new KaaClientPropertiesState(new FilePersistentStorage(), CommonsBase64.getInstance(), getProperties());
Expand Down

0 comments on commit 7156062

Please sign in to comment.