Skip to content

Commit

Permalink
KAA-1279: Fixed about 200 warnings
Browse files Browse the repository at this point in the history
  • Loading branch information
sashadidukh committed Sep 30, 2016
1 parent 2647c68 commit 3948511
Show file tree
Hide file tree
Showing 37 changed files with 335 additions and 240 deletions.
Expand Up @@ -43,7 +43,7 @@
import java.util.concurrent.TimeUnit;

/**
* Service class to store base endpoint configuration
* Service class to store base endpoint configuration.
*/
public class KaaClientProperties extends Properties {
public static final String KAA_CLIENT_PROPERTIES_FILE = "kaaClientPropertiesFile";
Expand Down Expand Up @@ -118,8 +118,8 @@ public byte[] getPropertiesHash() {
updateDigest(digest, SDK_TOKEN);

propertiesHash = digest.digest();
} catch (NoSuchAlgorithmException e) {
LOG.warn("Failed to calculate hash for SDK properties: {}", e);
} catch (NoSuchAlgorithmException ex) {
LOG.warn("Failed to calculate hash for SDK properties: {}", ex);
}
}

Expand All @@ -141,8 +141,8 @@ public String getCommitHash() {
return getProperty(BUILD_COMMIT_HASH);
}

public Map<TransportProtocolId, List<TransportConnectionInfo>> getBootstrapServers() throws InvalidKeySpecException,
NoSuchAlgorithmException {
public Map<TransportProtocolId, List<TransportConnectionInfo>> getBootstrapServers()
throws InvalidKeySpecException, NoSuchAlgorithmException {
return parseBootstrapServers(getProperty(KaaClientProperties.BOOTSTRAP_SERVERS));
}

Expand All @@ -162,8 +162,8 @@ public TimeUnit getPollUnit() {
return TimeUnit.valueOf(getProperty(KaaClientProperties.TRANSPORT_POLL_UNIT));
}

private Map<TransportProtocolId, List<TransportConnectionInfo>> parseBootstrapServers(String serversStr)
throws InvalidKeySpecException, NoSuchAlgorithmException {
private Map<TransportProtocolId, List<TransportConnectionInfo>> parseBootstrapServers(
String serversStr) throws InvalidKeySpecException, NoSuchAlgorithmException {
Map<TransportProtocolId, List<TransportConnectionInfo>> servers = new HashMap<>();
String[] serversSplit = serversStr.split(";");

Expand All @@ -172,10 +172,11 @@ private Map<TransportProtocolId, List<TransportConnectionInfo>> parseBootstrapSe
String[] tokens = server.split(":");
ProtocolMetaData md = new ProtocolMetaData();
md.setAccessPointId(Integer.valueOf(tokens[0]));
md.setProtocolVersionInfo(new ProtocolVersionPair(Integer.valueOf(tokens[1]), Integer.valueOf(tokens[2])));
md.setProtocolVersionInfo(new ProtocolVersionPair(Integer.valueOf(tokens[1]),
Integer.valueOf(tokens[2])));
md.setConnectionInfo(ByteBuffer.wrap(getBase64().decodeBase64(tokens[3])));
TransportProtocolId key = new TransportProtocolId(md.getProtocolVersionInfo().getId(), md.getProtocolVersionInfo()
.getVersion());
TransportProtocolId key = new TransportProtocolId(md.getProtocolVersionInfo().getId(),
md.getProtocolVersionInfo().getVersion());
List<TransportConnectionInfo> serverList = servers.get(key);
if (serverList == null) {
serverList = new ArrayList<TransportConnectionInfo>();
Expand Down
Expand Up @@ -19,7 +19,7 @@
import org.kaaproject.kaa.client.exceptions.KaaException;

/**
* Notifies about Kaa client state changes and errors
* Notifies about Kaa client state changes and errors.
*
* @author Andrew Shvayka
*/
Expand Down
Expand Up @@ -19,7 +19,6 @@
/**
* Demultiplexer is responsible for deserializing of response data and notifying
* appropriate services.
*
* Required in user implementation of any kind of data channel.
*
* @author Yaroslav Zeygerman
Expand All @@ -35,7 +34,7 @@ public interface KaaDataDemultiplexer {
void processResponse(byte[] response) throws Exception;

/**
* Routines to be executed before response will be processed
* Routines to be executed before response will be processed.
*/
void preProcess();

Expand Down
Expand Up @@ -19,7 +19,7 @@
import org.kaaproject.kaa.client.channel.connectivity.ConnectivityChecker;

/**
* @author Yaroslav Zeygerman
* @author Yaroslav Zeygerman.
* @see KaaDataChannel
*/
public interface KaaInternalChannelManager extends KaaChannelManager {
Expand Down
Expand Up @@ -34,16 +34,19 @@ public class DefaultBootstrapDataProcessor implements KaaDataMultiplexer, KaaDat

private static final Logger LOG = LoggerFactory.getLogger(DefaultBootstrapDataProcessor.class);

private final AvroByteArrayConverter<SyncRequest> requestConverter = new AvroByteArrayConverter<>(SyncRequest.class);
private final AvroByteArrayConverter<SyncResponse> responseConverter = new AvroByteArrayConverter<>(SyncResponse.class);
private final AvroByteArrayConverter<SyncRequest> requestConverter =
new AvroByteArrayConverter<>(SyncRequest.class);
private final AvroByteArrayConverter<SyncResponse> responseConverter =
new AvroByteArrayConverter<>(SyncResponse.class);
private BootstrapTransport transport;

public void setBootstrapTransport(BootstrapTransport transport) {
this.transport = transport;
}

@Override
public synchronized byte[] compileRequest(Map<TransportType, ChannelDirection> types) throws IOException {
public synchronized byte[] compileRequest(Map<TransportType, ChannelDirection> types)
throws IOException {
if (transport != null) {
SyncRequest request = transport.createResolveRequest();
LOG.trace("Created Resolve request {}", request);
Expand Down
Expand Up @@ -52,13 +52,13 @@ public class DefaultChannelManager implements KaaInternalChannelManager {
public static final Logger LOG = LoggerFactory // NOSONAR
.getLogger(DefaultChannelManager.class);
private final List<KaaDataChannel> channels = new LinkedList<>();
private final Map<TransportType, KaaDataChannel> upChannels = new HashMap<TransportType, KaaDataChannel>();
private final Map<TransportType, KaaDataChannel> upChannels = new HashMap<>();
private final BootstrapManager bootstrapManager;
private final Map<TransportProtocolId, TransportConnectionInfo> lastServers = new HashMap<>();
private final Map<TransportProtocolId, List<TransportConnectionInfo>> bootststrapServers;
private final Map<TransportProtocolId, TransportConnectionInfo> lastBSServers = new HashMap<>();
private final Map<String, BlockingQueue<SyncTask>> syncTaskQueueMap = new ConcurrentHashMap<String, BlockingQueue<SyncTask>>();
private final Map<String, SyncWorker> syncWorkers = new HashMap<String, DefaultChannelManager.SyncWorker>();
private final Map<TransportProtocolId, TransportConnectionInfo> lastBsServers = new HashMap<>();
private final Map<String, BlockingQueue<SyncTask>> syncTaskQueueMap = new ConcurrentHashMap<>();
private final Map<String, SyncWorker> syncWorkers = new HashMap<>();
private FailureListener failureListener;
private FailoverManager failoverManager;
private ExecutorContext executorContext;
Expand All @@ -73,7 +73,8 @@ public class DefaultChannelManager implements KaaInternalChannelManager {
private KaaDataDemultiplexer bootstrapDemultiplexer;

public DefaultChannelManager(BootstrapManager manager, Map<TransportProtocolId,
List<TransportConnectionInfo>> bootststrapServers, ExecutorContext executorContext, FailureListener failureListener) {
List<TransportConnectionInfo>> bootststrapServers, ExecutorContext executorContext,
FailureListener failureListener) {
if (manager == null || bootststrapServers == null || bootststrapServers.isEmpty()) {
throw new ChannelRuntimeException("Failed to create channel manager");
}
Expand All @@ -85,7 +86,8 @@ public DefaultChannelManager(BootstrapManager manager, Map<TransportProtocolId,

private boolean useChannelForType(KaaDataChannel channel, TransportType type) {
ChannelDirection direction = channel.getSupportedTransportTypes().get(type);
if (direction != null && (direction.equals(ChannelDirection.BIDIRECTIONAL) || direction.equals(ChannelDirection.UP))) {
if (direction != null && (direction.equals(ChannelDirection.BIDIRECTIONAL)
|| direction.equals(ChannelDirection.UP))) {
upChannels.put(type, channel);
return true;
}
Expand Down Expand Up @@ -130,7 +132,8 @@ private void addChannelToList(KaaDataChannel channel) {
server = lastServers.get(channel.getTransportProtocolId());
}
if (server != null) {
LOG.debug("Applying server {} for channel [{}] type {}", server, channel.getId(), channel.getTransportProtocolId());
LOG.debug("Applying server {} for channel [{}] type {}",
server, channel.getId(), channel.getTransportProtocolId());
channel.setServer(server);
if (failoverManager != null) {
failoverManager.onServerChanged(server);
Expand All @@ -147,22 +150,24 @@ private void addChannelToList(KaaDataChannel channel) {
channel.getTransportProtocolId());
}
} else {
LOG.debug("list of services is empty for channel [{}] type {}", channel.getId(), channel.getTransportProtocolId());
LOG.debug("list of services is empty for channel [{}] type {}",
channel.getId(), channel.getTransportProtocolId());
}
}
}
}

@Override
public synchronized void setChannel(TransportType transport, KaaDataChannel channel) throws KaaInvalidChannelException {
public synchronized void setChannel(TransportType transport, KaaDataChannel channel)
throws KaaInvalidChannelException {
if (isShutdown) {
LOG.warn("Can't set a channel. Channel manager is down");
return;
}
if (channel != null) {
if (!useChannelForType(channel, transport)) {
throw new KaaInvalidChannelException("Unsupported transport type " + transport.toString() + " for channel \""
+ channel.getId() + "\"");
throw new KaaInvalidChannelException("Unsupported transport type " + transport.toString()
+ " for channel \"" + channel.getId() + "\"");
}
if (isPaused) {
channel.pause();
Expand Down Expand Up @@ -298,7 +303,8 @@ public synchronized void onTransportConnectionInfoUpdated(TransportConnectionInf
}

@Override
public synchronized void onServerFailed(final TransportConnectionInfo server, FailoverStatus status) {
public synchronized void onServerFailed(final TransportConnectionInfo server,
FailoverStatus status) {
if (isShutdown) {
LOG.warn("Can't process server failure. Channel manager is down");
return;
Expand All @@ -308,15 +314,16 @@ public synchronized void onServerFailed(final TransportConnectionInfo server, Fa
final TransportConnectionInfo nextConnectionInfo = getNextBootstrapServer(server);
if (nextConnectionInfo != null) {
LOG.trace("Using next bootstrap service");
FailoverDecision decision = failoverManager.onFailover(FailoverStatus.CURRENT_BOOTSTRAP_SERVER_NA);
FailoverDecision decision = failoverManager.onFailover(
FailoverStatus.CURRENT_BOOTSTRAP_SERVER_NA);
switch (decision.getAction()) {
case NOOP:
LOG.warn("No operation is performed according to failover strategy decision");
break;
case RETRY:
long retryPeriod = decision.getRetryPeriod();
LOG.warn("Attempt to reconnect to the current bootstrap service will be made in {} ms, " +
"according to failover strategy decision", retryPeriod);
LOG.warn("Attempt to reconnect to the current bootstrap service will be made in {} ms, "
+ "according to failover strategy decision", retryPeriod);
executorContext.getScheduledExecutor().schedule(new Runnable() {
@Override
public void run() {
Expand All @@ -326,8 +333,8 @@ public void run() {
break;
case USE_NEXT_BOOTSTRAP:
retryPeriod = decision.getRetryPeriod();
LOG.warn("Attempt to connect to the next bootstrap service will be made in {} ms, " +
"according to failover strategy decision", retryPeriod);
LOG.warn("Attempt to connect to the next bootstrap service will be made in {} ms, "
+ "according to failover strategy decision", retryPeriod);
executorContext.getScheduledExecutor().schedule(new Runnable() {
@Override
public void run() {
Expand All @@ -351,8 +358,8 @@ public void run() {
break;
case RETRY:
long retryPeriod = decision.getRetryPeriod();
LOG.warn("Attempt to reconnect to first bootstrap service will be made in {} ms, " +
"according to failover strategy decision", retryPeriod);
LOG.warn("Attempt to reconnect to first bootstrap service will be made in {} ms, "
+ "according to failover strategy decision", retryPeriod);
executorContext.getScheduledExecutor().schedule(new Runnable() {
@Override
public void run() {
Expand Down Expand Up @@ -380,12 +387,12 @@ public synchronized void clearChannelList() {
}

private TransportConnectionInfo getCurrentBootstrapServer(TransportProtocolId type) {
TransportConnectionInfo bsi = lastBSServers.get(type);
TransportConnectionInfo bsi = lastBsServers.get(type);
if (bsi == null) {
List<TransportConnectionInfo> serverList = bootststrapServers.get(type);
if (serverList != null && !serverList.isEmpty()) {
bsi = serverList.get(0);
lastBSServers.put(type, bsi);
lastBsServers.put(type, bsi);
}
}

Expand All @@ -395,15 +402,16 @@ private TransportConnectionInfo getCurrentBootstrapServer(TransportProtocolId ty
private TransportConnectionInfo getNextBootstrapServer(TransportConnectionInfo currentServer) {
TransportConnectionInfo bsi = null;

List<TransportConnectionInfo> serverList = bootststrapServers.get(currentServer.getTransportId());
List<TransportConnectionInfo> serverList = bootststrapServers.get(
currentServer.getTransportId());
int serverIndex = serverList.indexOf(currentServer);

if (serverIndex >= 0) {
if (++serverIndex == serverList.size()) {
serverIndex = 0;
}
bsi = serverList.get(serverIndex);
lastBSServers.put(currentServer.getTransportId(), bsi);
lastBsServers.put(currentServer.getTransportId(), bsi);
}

return bsi;
Expand Down Expand Up @@ -531,20 +539,22 @@ public void run() {
task = SyncTask.merge(task, additionalTasks);
}
if (task.isAll()) {
LOG.debug("[{}] Going to invoke syncAll method for types {}", channel.getId(), task.getTypes());
LOG.debug("[{}] Going to invoke syncAll method for types {}",
channel.getId(), task.getTypes());
channel.syncAll();
} else if (task.isAckOnly()) {
LOG.debug("[{}] Going to invoke syncAck method for types {}", channel.getId(), task.getTypes());
LOG.debug("[{}] Going to invoke syncAck method for types {}",
channel.getId(), task.getTypes());
channel.syncAck(task.getTypes());
} else {
LOG.debug("[{}] Going to invoke sync method", channel.getId());
channel.sync(task.getTypes());
}
} catch (InterruptedException e) {
} catch (InterruptedException ex) {
if (stop) {
LOG.debug("[{}] Worker is interrupted.", channel.getId());
} else {
LOG.warn("[{}] Worker is interrupted.", channel.getId(), e);
LOG.warn("[{}] Worker is interrupted.", channel.getId(), ex);
}
}
}
Expand Down
Expand Up @@ -46,8 +46,10 @@ public class DefaultOperationDataProcessor implements KaaDataMultiplexer, KaaDat

private static final Logger LOG = LoggerFactory.getLogger(DefaultOperationDataProcessor.class);
private final AtomicInteger requestsCounter = new AtomicInteger(0);
private final AvroByteArrayConverter<SyncRequest> requestConverter = new AvroByteArrayConverter<>(SyncRequest.class);
private final AvroByteArrayConverter<SyncResponse> responseConverter = new AvroByteArrayConverter<>(SyncResponse.class);
private final AvroByteArrayConverter<SyncRequest> requestConverter =
new AvroByteArrayConverter<>(SyncRequest.class);
private final AvroByteArrayConverter<SyncResponse> responseConverter =
new AvroByteArrayConverter<>(SyncResponse.class);
private final KaaClientState state;
private MetaDataTransport metaDataTransport;
private ConfigurationTransport configurationTransport;
Expand All @@ -71,7 +73,8 @@ public synchronized void setMetaDataTransport(MetaDataTransport metaDataTranspor
this.metaDataTransport = metaDataTransport;
}

public synchronized void setConfigurationTransport(ConfigurationTransport configurationTransport) {
public synchronized void setConfigurationTransport(
ConfigurationTransport configurationTransport) {
this.configurationTransport = configurationTransport;
}

Expand Down Expand Up @@ -103,7 +106,8 @@ public synchronized void processResponse(byte[] response) throws Exception {

LOG.info("Received Sync response: {}", syncResponse);
if (syncResponse.getConfigurationSyncResponse() != null && configurationTransport != null) {
configurationTransport.onConfigurationResponse(syncResponse.getConfigurationSyncResponse());
configurationTransport.onConfigurationResponse(
syncResponse.getConfigurationSyncResponse());
}
if (eventTransport != null) {
eventTransport.onSyncResposeIdReceived(syncResponse.getRequestId());
Expand All @@ -127,7 +131,8 @@ public synchronized void processResponse(byte[] response) throws Exception {
logTransport.onLogResponse(syncResponse.getLogSyncResponse());
}

boolean needProfileResync = syncResponse.getStatus() == SyncResponseResultType.PROFILE_RESYNC;
boolean needProfileResync = syncResponse.getStatus() == SyncResponseResultType
.PROFILE_RESYNC;
state.setIfNeedProfileResync(needProfileResync);
if (needProfileResync) {
LOG.info("Going to resync profile...");
Expand All @@ -140,7 +145,8 @@ public synchronized void processResponse(byte[] response) throws Exception {
}

@Override
public synchronized byte[] compileRequest(Map<TransportType, ChannelDirection> types) throws Exception {
public synchronized byte[] compileRequest(Map<TransportType, ChannelDirection> types)
throws Exception {
if (types != null) {
SyncRequest request = new SyncRequest();
request.setRequestId(requestsCounter.incrementAndGet());
Expand All @@ -153,22 +159,26 @@ public synchronized byte[] compileRequest(Map<TransportType, ChannelDirection> t
switch (type.getKey()) {
case CONFIGURATION:
if (configurationTransport != null) {
request.setConfigurationSyncRequest(configurationTransport.createConfigurationRequest());
request.setConfigurationSyncRequest(
configurationTransport.createConfigurationRequest());
}
break;
case EVENT:
if (isDownDirection) {
request.setEventSyncRequest(new EventSyncRequest());
} else if (eventTransport != null) {
request.setEventSyncRequest(eventTransport.createEventRequest(request.getRequestId()));
request.setEventSyncRequest(
eventTransport.createEventRequest(request.getRequestId()));
}
break;
case NOTIFICATION:
if (notificationTransport != null) {
if (isDownDirection) {
request.setNotificationSyncRequest(notificationTransport.createEmptyNotificationRequest());
request.setNotificationSyncRequest(
notificationTransport.createEmptyNotificationRequest());
} else {
request.setNotificationSyncRequest(notificationTransport.createNotificationRequest());
request.setNotificationSyncRequest(
notificationTransport.createNotificationRequest());
}
}
break;
Expand Down

0 comments on commit 3948511

Please sign in to comment.