Skip to content

Commit

Permalink
Merge branch 'feature/KAA-1279' of github.com:Acarus/kaa into working
Browse files Browse the repository at this point in the history
  • Loading branch information
Acarus committed Sep 29, 2016
2 parents ead7305 + 7ad3702 commit 7e1f1c9
Show file tree
Hide file tree
Showing 51 changed files with 272 additions and 195 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ private AvroUtils() {
*
* @return list of generated bytes.
*/
public static byte[] generateUUIDBytes() {
public static byte[] generateUuidBytes() {
UUID uuid = UUID.randomUUID();

ByteBuffer byteBuffer = ByteBuffer.wrap(new byte[UUID_SIZE]);
Expand All @@ -57,7 +57,7 @@ public static byte[] generateUUIDBytes() {

public static GenericData.Fixed generateUuidObject() {
Schema avroSchema = Schema.createFixed(UUID_TYPE, null, KAA_NAMESPACE, UUID_SIZE);
return new GenericData.Fixed(avroSchema, generateUUIDBytes());
return new GenericData.Fixed(avroSchema, generateUuidBytes());
}

public static Schema getSchemaByType(Schema schema, Schema.Type type) {
Expand Down Expand Up @@ -122,7 +122,8 @@ private static JsonNode injectUuidsFromJsonNodes(JsonNode json, Schema schema) {
.filter(f -> !f.name().equals(UUID_FIELD))
.forEach(f -> injectUuidsFromJsonNodes(json.get(f.name()), f.schema()));

boolean addressable = schema.getFields().stream().filter(f -> f.name().equals(UUID_FIELD)).findFirst().isPresent();
boolean addressable = schema.getFields().stream().filter(f -> f.name().equals(
UUID_FIELD)).findFirst().isPresent();
if (addressable) {
((ObjectNode) json).put(UUID_FIELD, (Integer) null);
}
Expand All @@ -132,7 +133,7 @@ private static JsonNode injectUuidsFromJsonNodes(JsonNode json, Schema schema) {
.forEach(s -> injectUuidsFromJsonNodes(json.get(s.getName()), s));
break;
case ARRAY:
json.getElements().forEachRemaining((e) -> injectUuids(e, schema.getElementType()));
json.getElements().forEachRemaining((elem) -> injectUuids(elem, schema.getElementType()));
break;
default:
return json;
Expand All @@ -151,8 +152,9 @@ public static void removeUuids(JsonNode json) {
}

for (JsonNode node : json) {
if (node.isContainerNode())
if (node.isContainerNode()) {
removeUuids(node);
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
import java.io.IOException;

/**
* Simple container for resync delta encoded using base schema
* Simple container for resync delta encoded using base schema.
*
* @author Andrew Shvayka
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,8 +86,6 @@ public class DefaultDeltaCalculationAlgorithm implements DeltaCalculationAlgorit
*/
private AvroBinaryDelta resultDelta;

/* FieldAttribute */

/**
* Instantiates a new default delta calculator.
*
Expand All @@ -99,8 +97,6 @@ public DefaultDeltaCalculationAlgorithm(Schema deltaSchema, Schema baseSchema) {
this.baseSchema = baseSchema;
}

/* RecordTuple */

/**
* Gets the full name.
*
Expand Down Expand Up @@ -174,7 +170,8 @@ private static Schema getSchemaByFullName(Schema arraySchema, String fullName) {
List<Schema> itemTypes = arraySchema.getElementType().getTypes();
return getSchemaByFullName(itemTypes, fullName);
} else {
return arraySchema.getElementType().getFullName().equals(fullName) ? arraySchema.getElementType() : null;
return arraySchema.getElementType().getFullName().equals(
fullName) ? arraySchema.getElementType() : null;
}
}

Expand All @@ -185,8 +182,10 @@ private static Schema getSchemaByFullName(Schema arraySchema, String fullName) {
* @param field the field
* @throws DeltaCalculatorException the delta calculator exception
*/
private static void putUnchanged(GenericRecord delta, String field) throws DeltaCalculatorException {
Schema unchangedSchema = getSchemaByFullName(delta, field, KAA_NAMESPACE + "." + UNCHANGED + "T");
private static void putUnchanged(GenericRecord delta, String field)
throws DeltaCalculatorException {
Schema unchangedSchema = getSchemaByFullName(
delta, field, KAA_NAMESPACE + "." + UNCHANGED + "T");
if (unchangedSchema != null) {
GenericEnumSymbol unchanged = new GenericData.EnumSymbol(unchangedSchema, UNCHANGED);
delta.put(field, unchanged);
Expand All @@ -210,8 +209,9 @@ private static void putReset(GenericRecord delta, String field) throws DeltaCalc
GenericEnumSymbol reset = new GenericData.EnumSymbol(resetSchema, RESET);
delta.put(field, reset);
} else {
throw new DeltaCalculatorException(new StringBuilder().append("Failed to find schema for \"reset\" type ")
.append(" in ").append(delta.getSchema().getFullName()).append(" field ").append(field).toString());
throw new DeltaCalculatorException(new StringBuilder().append(
"Failed to find schema for \"reset\" type ").append(" in ").append(
delta.getSchema().getFullName()).append(" field ").append(field).toString());
}
}

Expand All @@ -223,7 +223,8 @@ private static void putReset(GenericRecord delta, String field) throws DeltaCalc
* @param record the record
* @return the generic record
*/
private static GenericRecord createSubDelta(GenericRecord delta, String field, GenericRecord record) {
private static GenericRecord createSubDelta(
GenericRecord delta, String field, GenericRecord record) {
Schema recordType = getSchemaByFullName(delta, field, getFullName(record));
return recordType == null ? null : new GenericData.Record(recordType);
}
Expand All @@ -238,7 +239,9 @@ private static GenericRecord createSubDelta(GenericRecord delta, String field, G
* @throws DeltaCalculatorException the delta calculator exception
*/
private static void fillDeltaArrayFields(GenericRecord delta, Set<String> resetFields,
Map<String, List<byte[]>> uuidFields, Queue<FieldAttribute> fieldQueue) throws DeltaCalculatorException {
Map<String, List<byte[]>> uuidFields,
Queue<FieldAttribute> fieldQueue)
throws DeltaCalculatorException {
List<Schema.Field> fields = delta.getSchema().getFields();

if (fieldQueue.isEmpty()) {
Expand Down Expand Up @@ -299,7 +302,8 @@ private Schema getDeltaSchemaByFullName(String fullName) {
* @param array the array
* @throws DeltaCalculatorException the delta calculator exception
*/
private void addComplexItemToArray(GenericContainer container, GenericArray array) throws DeltaCalculatorException {
private void addComplexItemToArray(GenericContainer container, GenericArray array)
throws DeltaCalculatorException {
Schema itemSchema = getSchemaByFullName(array.getSchema(), getFullName(container));
if (itemSchema.getType() == Type.RECORD) {
GenericRecord subDelta = new GenericData.Record(itemSchema);
Expand All @@ -320,20 +324,23 @@ private void addComplexItemToArray(GenericContainer container, GenericArray arra
* @param fieldQueue the field queue
* @throws DeltaCalculatorException the delta calculator exception
*/
private void processComplexField(GenericRecord delta, String field, GenericContainer newRecordValue,
GenericContainer oldRecordValue, Queue<FieldAttribute> fieldQueue) throws DeltaCalculatorException {
private void processComplexField(
GenericRecord delta, String field, GenericContainer newRecordValue,
GenericContainer oldRecordValue, Queue<FieldAttribute> fieldQueue)
throws DeltaCalculatorException {
boolean fieldChanged = false;
if (newRecordValue.getSchema().getType() == Type.RECORD) {
GenericRecord subDelta = createSubDelta(delta, field, (GenericRecord) newRecordValue);
if (subDelta != null) {
boolean hasChanges = false;
if (oldRecordValue != null && oldRecordValue.getSchema().getFullName().equals(newRecordValue.getSchema().getFullName())) {
if (oldRecordValue != null && oldRecordValue.getSchema().getFullName().equals(
newRecordValue.getSchema().getFullName())) {
FieldAttribute fieldPair = new FieldAttribute(getSchemaByFullName(delta, field,
getFullName(newRecordValue)), field);
Queue<FieldAttribute> newFieldQueue = new LinkedList<FieldAttribute>(fieldQueue);
newFieldQueue.offer(fieldPair);
hasChanges = fillDelta(subDelta, (GenericRecord) oldRecordValue, (GenericRecord) newRecordValue,
newFieldQueue);
hasChanges = fillDelta(subDelta, (GenericRecord) oldRecordValue,
(GenericRecord) newRecordValue, newFieldQueue);
} else {
fillDeltaWithoutMerge(subDelta, (GenericRecord) newRecordValue);
hasChanges = true;
Expand All @@ -343,7 +350,8 @@ private void processComplexField(GenericRecord delta, String field, GenericConta
fieldChanged = true;
}
} else {
throw new DeltaCalculatorException(new StringBuilder().append("Failed to find subdelta schema \"")
throw new DeltaCalculatorException(
new StringBuilder().append("Failed to find subdelta schema \"")
.append(getFullName(newRecordValue)).append("\"").toString());
}
} else if (oldRecordValue == null || field.equals(UUID_FIELD)
Expand All @@ -363,7 +371,8 @@ private void processComplexField(GenericRecord delta, String field, GenericConta
* @param root the root
* @throws DeltaCalculatorException the delta calculator exception
*/
private void fillDeltaWithoutMerge(GenericRecord delta, GenericRecord root) throws DeltaCalculatorException {
private void fillDeltaWithoutMerge(GenericRecord delta, GenericRecord root)
throws DeltaCalculatorException {
Schema rootSchema = root.getSchema();
for (Field field : rootSchema.getFields()) {
Object value = root.get(field.name());
Expand Down Expand Up @@ -425,7 +434,8 @@ private boolean fillDelta(GenericRecord delta, GenericRecord oldRoot, GenericRec
if (!newArrayItems.isEmpty()) {
if (newArrayItems.get(0) instanceof GenericRecord) {
// Item is a complex type
if (oldArrayItems != null && !oldArrayItems.isEmpty() && oldArrayItems.get(0) instanceof GenericRecord) {
if (oldArrayItems != null && !oldArrayItems.isEmpty()
&& oldArrayItems.get(0) instanceof GenericRecord) {
for (Object oldItem : oldArrayItems) {
GenericRecord oldItemRecord = (GenericRecord) oldItem;
Schema oldItemSchema = oldItemRecord.getSchema();
Expand Down Expand Up @@ -513,7 +523,8 @@ private boolean fillDelta(GenericRecord delta, GenericRecord oldRoot, GenericRec
}
}
} else if (oldArrayItems == null) {
delta.put(newField.name(), new GenericData.Array(0, getArraySchema(delta, newField.name())));
delta.put(newField.name(),
new GenericData.Array(0, getArraySchema(delta, newField.name())));
hasChanges = true;
} else if (!oldArrayItems.isEmpty()) {
resetFields.add(newField.name());
Expand All @@ -533,7 +544,8 @@ private boolean fillDelta(GenericRecord delta, GenericRecord oldRoot, GenericRec
}
} else if (newValue instanceof GenericContainer) {
GenericContainer newRecordValue = (GenericContainer) newValue;
GenericContainer oldRecordValue = (oldValue instanceof GenericContainer) ? (GenericContainer) oldValue : null;
GenericContainer oldRecordValue =
(oldValue instanceof GenericContainer) ? (GenericContainer) oldValue : null;
processComplexField(delta, newField.name(), newRecordValue, oldRecordValue, fieldQueue);
} else if ((newValue == null && oldValue != null)
|| (newValue != null && !newValue.equals(oldValue))) {
Expand Down Expand Up @@ -598,7 +610,8 @@ private void processDifferences(GenericRecord oldRoot, GenericRecord newRoot)

hasDifferences = !(oldArray.size() == newArray.size());
if (!hasDifferences) {
if (!newArray.isEmpty() && newArray.get(0) instanceof GenericRecord && oldArray.get(0) instanceof GenericRecord) {
if (!newArray.isEmpty() && newArray.get(0) instanceof GenericRecord
&& oldArray.get(0) instanceof GenericRecord) {
GenericRecord uuidCheckRecord = (GenericRecord) newArray.get(0);
Schema uuidCheckSchema = uuidCheckRecord.getSchema();
if (uuidCheckSchema.getField(UUID_FIELD) != null) {
Expand Down Expand Up @@ -647,7 +660,8 @@ private void processDifferences(GenericRecord oldRoot, GenericRecord newRoot)
if (hasDifferences && !processedRecords.contains(lastUuidRecords)) {
Schema deltaSubSchema = getDeltaSchemaByFullName(lastUuidRecordName);
if (deltaSubSchema == null) {
throw new DeltaCalculatorException(new StringBuilder().append("Failed to find schema for \"")
throw new DeltaCalculatorException(
new StringBuilder().append("Failed to find schema for \"")
.append(lastUuidRecordName).append("\"").toString());
}
GenericRecord delta = new GenericData.Record(deltaSubSchema);
Expand Down Expand Up @@ -675,12 +689,14 @@ public RawBinaryDelta calculate(BaseData endpointConfiguration, BaseData newConf
}

@Override
public RawBinaryDelta calculate(BaseData newConfigurationBody) throws IOException, DeltaCalculatorException {
public RawBinaryDelta calculate(BaseData newConfigurationBody)
throws IOException, DeltaCalculatorException {
GenericRecord newRoot = getRootNode(newConfigurationBody, baseSchema);
return calculate(newRoot);
}

public RawBinaryDelta calculate(GenericRecord oldConfig, GenericRecord newConfig) throws DeltaCalculatorException {
public RawBinaryDelta calculate(GenericRecord oldConfig, GenericRecord newConfig)
throws DeltaCalculatorException {
resultDelta = new AvroBinaryDelta(deltaSchema);
processedRecords = new HashSet<>();
processDifferences(oldConfig, newConfig);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,10 +27,12 @@
public class DefaultDeltaCalculatorFactory implements DeltaCalculatorFactory {

/* (non-Javadoc)
* @see org.kaaproject.kaa.server.operations.service.delta.DeltaCalculatorFactory#createDeltaCalculator(java.lang.String)
* @see org.kaaproject.kaa.server.operations.service.delta.DeltaCalculatorFactory
* #createDeltaCalculator(java.lang.String)
*/
@Override
public DeltaCalculationAlgorithm createDeltaCalculator(ProtocolSchema protocolSchemaBody, BaseSchema baseDataSchema) {
public DeltaCalculationAlgorithm createDeltaCalculator(
ProtocolSchema protocolSchemaBody, BaseSchema baseDataSchema) {
Schema protocolSchema = new Schema.Parser().parse(protocolSchemaBody.getRawSchema());
Schema baseSchema = new Schema.Parser().parse(baseDataSchema.getRawSchema());
return new DefaultDeltaCalculationAlgorithm(protocolSchema, baseSchema);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ public interface DeltaCalculationAlgorithm {
/**
* Calculates delta merging new and old configurations.
*
* @param oldConfiguration old configuration data (binary)
* @param endpointConfiguration old configuration data (binary)
* @param newConfigurationBody the new configuration body (binary)
* @return the raw binary delta
* @throws IOException Signals that an I/O exception has occurred.
Expand All @@ -47,5 +47,6 @@ RawBinaryDelta calculate(BaseData endpointConfiguration, BaseData newConfigurati
* @throws IOException Signals that an I/O exception has occurred.
* @throws DeltaCalculatorException the delta calculator exception
*/
RawBinaryDelta calculate(BaseData newConfigurationBody) throws IOException, DeltaCalculatorException;
RawBinaryDelta calculate(BaseData newConfigurationBody)
throws IOException, DeltaCalculatorException;
}
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ public interface DeltaCalculatorFactory {
* @param baseDataSchema base the schema body
* @return the delta calculator
*/
DeltaCalculationAlgorithm createDeltaCalculator(ProtocolSchema protocolSchemaBody, BaseSchema baseDataSchema);
DeltaCalculationAlgorithm createDeltaCalculator(
ProtocolSchema protocolSchemaBody, BaseSchema baseDataSchema);

}
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,6 @@ public ConfigurationGenerationException(String message) {

/**
* Constructs a new configuration processing exception.
* <p>
* Note that the detail message associated with {@code cause} is <i>not</i>
* automatically incorporated in this exception's detail message.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,5 +52,6 @@ public interface DefaultRecordGenerationAlgorithm<T extends KaaData> {
* @return the configuration by name
* @throws ConfigurationGenerationException the configuration processing exception
*/
GenericRecord getConfigurationByName(String name, String namespace) throws ConfigurationGenerationException;
GenericRecord getConfigurationByName(String name, String namespace)
throws ConfigurationGenerationException;
}

0 comments on commit 7e1f1c9

Please sign in to comment.