Skip to content

Commit

Permalink
KAA-1279: Fixed about 50 warnings
Browse files Browse the repository at this point in the history
  • Loading branch information
sashadidukh committed Sep 30, 2016
1 parent 79a10d7 commit 210430b
Show file tree
Hide file tree
Showing 12 changed files with 58 additions and 42 deletions.
Expand Up @@ -17,7 +17,7 @@
package org.kaaproject.kaa.client.configuration; package org.kaaproject.kaa.client.configuration;


/** /**
* Notifies subscribers when all deltas have been already processed * Notifies subscribers when all deltas have been already processed.
* *
* @author Yaroslav Zeygerman * @author Yaroslav Zeygerman
*/ */
Expand Down
Expand Up @@ -18,7 +18,6 @@


/** /**
* Interface for configuration processing is finished observers. * Interface for configuration processing is finished observers.
*
* Receiver can be subscribed/unsubscribed via {@link ConfigurationProcessedObservable} * Receiver can be subscribed/unsubscribed via {@link ConfigurationProcessedObservable}
*/ */
public interface ConfigurationProcessedObserver { public interface ConfigurationProcessedObserver {
Expand Down
Expand Up @@ -21,7 +21,6 @@


/** /**
* Interface for a configuration processor. * Interface for a configuration processor.
*
* Receives and decodes the raw configuration data * Receives and decodes the raw configuration data
* *
* @author Yaroslav Zeygerman * @author Yaroslav Zeygerman
Expand Down
Expand Up @@ -17,22 +17,22 @@
package org.kaaproject.kaa.client.configuration; package org.kaaproject.kaa.client.configuration;


/** /**
* Sends notifications with decoded configuration * Sends notifications with decoded configuration.
* *
* @author Yaroslav Zeygerman * @author Yaroslav Zeygerman
*/ */
public interface DecodedDeltaObservable { public interface DecodedDeltaObservable {


/** /**
* Subscribes new receiver for decoded data updates * Subscribes new receiver for decoded data updates.
* *
* @param receiver receiver to get decoded configuration updates * @param receiver receiver to get decoded configuration updates
* @see GenericDeltaReceiver * @see GenericDeltaReceiver
*/ */
void subscribeForUpdates(GenericDeltaReceiver receiver); void subscribeForUpdates(GenericDeltaReceiver receiver);


/** /**
* Unsubscribes receiver from decoded data updates * Unsubscribes receiver from decoded data updates.
* *
* @param receiver receiver to be unsubscribed from configuration updates * @param receiver receiver to be unsubscribed from configuration updates
* @see GenericDeltaReceiver * @see GenericDeltaReceiver
Expand Down
Expand Up @@ -28,34 +28,38 @@
import java.util.List; import java.util.List;


/** /**
* Implementation of {@link ConfigurationProcessor} using avro decoding mechanisms * Implementation of {@link ConfigurationProcessor} using avro decoding mechanisms.
* *
* @author Yaroslav Zeygerman * @author Yaroslav Zeygerman
*/ */
public class DefaultConfigurationProcessor implements public class DefaultConfigurationProcessor implements
ConfigurationProcessor, DecodedDeltaObservable, ConfigurationProcessor, DecodedDeltaObservable,
SchemaUpdatesReceiver, ConfigurationProcessedObservable { SchemaUpdatesReceiver, ConfigurationProcessedObservable {


private final List<GenericDeltaReceiver> onDeltaReceived = new LinkedList<GenericDeltaReceiver>(); private final List<GenericDeltaReceiver> onDeltaReceived = new LinkedList<>();
private final List<ConfigurationProcessedObserver> onProcessed = new LinkedList<ConfigurationProcessedObserver>(); private final List<ConfigurationProcessedObserver> onProcessed = new LinkedList<>();
private Schema schema; private Schema schema;


public DefaultConfigurationProcessor() { public DefaultConfigurationProcessor() {


} }


@Override @Override
public synchronized void processConfigurationData(ByteBuffer buffer, boolean fullResync) throws IOException { public synchronized void processConfigurationData(ByteBuffer buffer, boolean fullResync)
throws IOException {
if (buffer != null) { if (buffer != null) {
if (schema == null) { if (schema == null) {
throw new ConfigurationRuntimeException("Can't process configuration update. Schema is null"); throw new ConfigurationRuntimeException(
"Can't process configuration update. Schema is null");
} }
GenericAvroConverter<GenericArray<GenericRecord>> converter = new GenericAvroConverter<GenericArray<GenericRecord>>(schema); GenericAvroConverter<GenericArray<GenericRecord>> converter =
new GenericAvroConverter<>(schema);
GenericArray<GenericRecord> deltaArray = converter.decodeBinary(buffer.array()); GenericArray<GenericRecord> deltaArray = converter.decodeBinary(buffer.array());


for (GenericRecord delta : deltaArray) { for (GenericRecord delta : deltaArray) {
GenericRecord record = (GenericRecord) delta.get("delta"); GenericRecord record = (GenericRecord) delta.get("delta");
int index = delta.getSchema().getField("delta").schema().getTypes().indexOf(record.getSchema()); int index = delta.getSchema().getField("delta").schema().getTypes().indexOf(
record.getSchema());
for (GenericDeltaReceiver subscriber : onDeltaReceived) { for (GenericDeltaReceiver subscriber : onDeltaReceived) {
subscriber.onDeltaReceived(index, record, fullResync); subscriber.onDeltaReceived(index, record, fullResync);
} }
Expand Down
Expand Up @@ -36,15 +36,17 @@
public abstract class AbstractConfigurationManager implements ConfigurationManager { public abstract class AbstractConfigurationManager implements ConfigurationManager {
private static final Logger LOG = LoggerFactory.getLogger(AbstractConfigurationManager.class); private static final Logger LOG = LoggerFactory.getLogger(AbstractConfigurationManager.class);
protected final ConfigurationDeserializer deserializer; protected final ConfigurationDeserializer deserializer;
private final Set<ConfigurationListener> listeners = Collections.newSetFromMap(new ConcurrentHashMap<ConfigurationListener, Boolean>()); private final Set<ConfigurationListener> listeners = Collections.newSetFromMap(
new ConcurrentHashMap<ConfigurationListener, Boolean>());
private final KaaClientProperties properties; private final KaaClientProperties properties;
private final ExecutorContext executorContext; private final ExecutorContext executorContext;
private volatile byte[] configurationData; private volatile byte[] configurationData;
private ConfigurationStorage storage; private ConfigurationStorage storage;
private ConfigurationHashContainer container = new HashContainer(); private ConfigurationHashContainer container = new HashContainer();
private KaaClientState state; private KaaClientState state;


public AbstractConfigurationManager(KaaClientProperties properties, KaaClientState state, ExecutorContext executorContext) { public AbstractConfigurationManager(KaaClientProperties properties, KaaClientState state,
ExecutorContext executorContext) {
super(); super();
this.properties = properties; this.properties = properties;
this.state = state; this.state = state;
Expand All @@ -56,9 +58,9 @@ private static byte[] toByteArray(ByteBuffer buffer) {
if (buffer == null) { if (buffer == null) {
return null; return null;
} }
byte[] b = new byte[buffer.remaining()]; byte[] bytes = new byte[buffer.remaining()];
buffer.get(b); buffer.get(bytes);
return b; return bytes;
} }


@Override @Override
Expand Down Expand Up @@ -90,7 +92,8 @@ public ConfigurationProcessor getConfigurationProcessor() {
return new ConfigurationProcessor() { return new ConfigurationProcessor() {


@Override @Override
public void processConfigurationData(ByteBuffer buffer, boolean fullResync) throws IOException { public void processConfigurationData(ByteBuffer buffer, boolean fullResync)
throws IOException {
if (fullResync) { if (fullResync) {
configurationData = toByteArray(buffer); configurationData = toByteArray(buffer);
if (LOG.isTraceEnabled()) { if (LOG.isTraceEnabled()) {
Expand Down Expand Up @@ -132,15 +135,15 @@ private byte[] loadConfigurationData() {
LOG.info("Clearing old configuration data from storage {}", storage); LOG.info("Clearing old configuration data from storage {}", storage);
try { try {
storage.clearConfiguration(); storage.clearConfiguration();
} catch (IOException e) { } catch (IOException ex) {
LOG.error("Failed to clear configuration from storage", e); LOG.error("Failed to clear configuration from storage", ex);
} }
} else { } else {
LOG.debug("Loading configuration data from storage {}", storage); LOG.debug("Loading configuration data from storage {}", storage);
try { try {
configurationData = toByteArray(storage.loadConfiguration()); configurationData = toByteArray(storage.loadConfiguration());
} catch (IOException e) { } catch (IOException ex) {
LOG.error("Failed to load configuration from storage", e); LOG.error("Failed to load configuration from storage", ex);
} }
} }
} }
Expand Down
Expand Up @@ -27,7 +27,6 @@


/** /**
* This class deserialize binary data to configuration object. * This class deserialize binary data to configuration object.
*
* This implementation is auto-generated. Please modify corresponding template * This implementation is auto-generated. Please modify corresponding template
* file. * file.
* *
Expand All @@ -36,14 +35,16 @@
@Generated("ConfigurationDeserializer.java.template") @Generated("ConfigurationDeserializer.java.template")
class ConfigurationDeserializer { class ConfigurationDeserializer {


private final AvroByteArrayConverter<Configuration> converter = new AvroByteArrayConverter<Configuration>(Configuration.class); private final AvroByteArrayConverter<Configuration> converter =
new AvroByteArrayConverter<Configuration>(Configuration.class);
private final ExecutorContext executorContext; private final ExecutorContext executorContext;


public ConfigurationDeserializer(ExecutorContext executorContext) { public ConfigurationDeserializer(ExecutorContext executorContext) {
this.executorContext = executorContext; this.executorContext = executorContext;
} }


void notify(Collection<ConfigurationListener> listeners, byte[] configurationData) throws IOException { void notify(Collection<ConfigurationListener> listeners, byte[] configurationData)
throws IOException {
final Configuration configuration = fromByteArray(configurationData); final Configuration configuration = fromByteArray(configurationData);
for (final ConfigurationListener listener : listeners) { for (final ConfigurationListener listener : listeners) {
executorContext.getCallbackExecutor().submit(new Runnable() { executorContext.getCallbackExecutor().submit(new Runnable() {
Expand Down
Expand Up @@ -36,7 +36,7 @@
public interface ConfigurationManager extends GenericConfigurationManager { public interface ConfigurationManager extends GenericConfigurationManager {


/** /**
* Always returns latest configuration * Always returns latest configuration.
* *
* @return configuration * @return configuration
*/ */
Expand Down
Expand Up @@ -29,24 +29,28 @@
import javax.annotation.Generated; import javax.annotation.Generated;


@Generated("ResyncConfigurationManager.java.template") @Generated("ResyncConfigurationManager.java.template")
public class ResyncConfigurationManager extends AbstractConfigurationManager implements ConfigurationManager { public class ResyncConfigurationManager extends AbstractConfigurationManager
implements ConfigurationManager {


private static final Logger LOG = LoggerFactory.getLogger(ResyncConfigurationManager.class); private static final Logger LOG = LoggerFactory.getLogger(ResyncConfigurationManager.class);


public ResyncConfigurationManager(KaaClientProperties properties, KaaClientState state, ExecutorContext executorContext) { public ResyncConfigurationManager(KaaClientProperties properties, KaaClientState state,
ExecutorContext executorContext) {
super(properties, state, executorContext); super(properties, state, executorContext);
} }


@Override @Override
public Configuration getConfiguration() { public Configuration getConfiguration() {
try { try {
return deserializer.fromByteArray(getConfigurationData()); return deserializer.fromByteArray(getConfigurationData());
} catch (IOException e) { } catch (IOException ex) {
LOG.error("Failed to decode configuration data {}, exception catched: {}", Arrays.toString(getConfigurationData()), e); LOG.error("Failed to decode configuration data {}, exception catched: {}",
Arrays.toString(getConfigurationData()), ex);
try { try {
return deserializer.fromByteArray(getDefaultConfigurationData()); return deserializer.fromByteArray(getDefaultConfigurationData());
} catch (IOException e1) { } catch (IOException e1) {
LOG.error("Failed to decode default configuration data {}, exception catched: {}", Arrays.toString(getConfigurationData()), e1); LOG.error("Failed to decode default configuration data {}, exception catched: {}",
Arrays.toString(getConfigurationData()), e1);
return null; return null;
} }
} }
Expand Down
Expand Up @@ -92,7 +92,7 @@ public interface ConfigurationManager {
void unsubscribeFromConfigurationUpdates(ConfigurationReceiver receiver); void unsubscribeFromConfigurationUpdates(ConfigurationReceiver receiver);


/** /**
* Retrieves full configuration * Retrieves full configuration.
* *
* @return common object with full configuration * @return common object with full configuration
* @see CommonRecord * @see CommonRecord
Expand Down
Expand Up @@ -19,7 +19,7 @@
import org.kaaproject.kaa.client.common.CommonRecord; import org.kaaproject.kaa.client.common.CommonRecord;


/** /**
* Interface for subscriber to receive full configuration<br> * Interface for subscriber to receive full configuration<br>.
* <br> * <br>
* Instance of this interface implementation can be subscribed for updates in * Instance of this interface implementation can be subscribed for updates in
* {@link ConfigurationManager} * {@link ConfigurationManager}
Expand All @@ -30,7 +30,7 @@
public interface ConfigurationReceiver { public interface ConfigurationReceiver {


/** /**
* This callback will be called on any configuration update * This callback will be called on any configuration update.
* *
* @param configuration full configuration in common objects * @param configuration full configuration in common objects
* @see CommonRecord * @see CommonRecord
Expand Down
Expand Up @@ -40,11 +40,12 @@
import java.util.UUID; import java.util.UUID;


/** /**
* Default @{link ConfigurationManager} implementation * Default @{link ConfigurationManager} implementation.
* *
* @author Yaroslav Zeygerman * @author Yaroslav Zeygerman
*/ */
public class DefaultConfigurationManager implements GenericDeltaReceiver, ConfigurationManager, ConfigurationProcessedObserver { public class DefaultConfigurationManager implements GenericDeltaReceiver, ConfigurationManager,
ConfigurationProcessedObserver {
private static final Logger LOG = LoggerFactory.getLogger(DefaultConfigurationManager.class); private static final Logger LOG = LoggerFactory.getLogger(DefaultConfigurationManager.class);
private static final String UUID = "__uuid"; private static final String UUID = "__uuid";


Expand All @@ -69,7 +70,8 @@ private CommonRecord createCommonRecord(GenericRecord avroRecord) {
} }
} }


private void processRecordField(CommonRecord record, GenericRecord deltaRecord, String fieldName) { private void processRecordField(
CommonRecord record, GenericRecord deltaRecord, String fieldName) {
CommonRecord nextRecord = null; CommonRecord nextRecord = null;
CommonValue nextValue = record.getField(fieldName); CommonValue nextValue = record.getField(fieldName);
if (nextValue != null if (nextValue != null
Expand Down Expand Up @@ -101,7 +103,8 @@ private void processArrayField(CommonRecord record, GenericArray array, String f
currentArray = arrayValue.getArray().getList(); currentArray = arrayValue.getArray().getList();
} else { } else {
currentArray = new LinkedList<CommonValue>(); currentArray = new LinkedList<CommonValue>();
record.setField(fieldName, commonFactory.createCommonValue(commonFactory.createCommonArray(array.getSchema(), currentArray))); record.setField(fieldName, commonFactory.createCommonValue(
commonFactory.createCommonArray(array.getSchema(), currentArray)));
} }
if (!array.isEmpty()) { if (!array.isEmpty()) {
Object rawItem = array.get(0); Object rawItem = array.get(0);
Expand Down Expand Up @@ -131,7 +134,8 @@ private void processArrayField(CommonRecord record, GenericArray array, String f
} }
} else { } else {
for (GenericFixed item : fixedItems) { for (GenericFixed item : fixedItems) {
currentArray.add(commonFactory.createCommonValue(commonFactory.createCommonFixed(item.getSchema(), item.bytes()))); currentArray.add(commonFactory.createCommonValue(
commonFactory.createCommonFixed(item.getSchema(), item.bytes())));
} }
} }
} else { } else {
Expand All @@ -148,12 +152,14 @@ private void processEnumField(CommonRecord record, GenericEnumSymbol symbol, Str
if (AvroGenericUtils.isReset(symbol)) { if (AvroGenericUtils.isReset(symbol)) {
record.getField(fieldName).getArray().getList().clear(); record.getField(fieldName).getArray().getList().clear();
} else if (!AvroGenericUtils.isUnchanged(symbol)) { } else if (!AvroGenericUtils.isUnchanged(symbol)) {
record.setField(fieldName, commonFactory.createCommonValue(commonFactory.createCommonEnum(enumSchema, symbol.toString()))); record.setField(fieldName, commonFactory.createCommonValue(
commonFactory.createCommonEnum(enumSchema, symbol.toString())));
} }
} }


private void processFixedField(CommonRecord record, GenericFixed fixed, String fieldName) { private void processFixedField(CommonRecord record, GenericFixed fixed, String fieldName) {
record.setField(fieldName, commonFactory.createCommonValue(commonFactory.createCommonFixed(fixed.getSchema(), fixed.bytes()))); record.setField(fieldName, commonFactory.createCommonValue(
commonFactory.createCommonFixed(fixed.getSchema(), fixed.bytes())));
} }


private void updateRecord(CommonRecord record, GenericRecord delta) { private void updateRecord(CommonRecord record, GenericRecord delta) {
Expand Down

0 comments on commit 210430b

Please sign in to comment.