Skip to content

Commit

Permalink
delta has two parts update and delete, they will treated in separate
Browse files Browse the repository at this point in the history
  • Loading branch information
markodjurovic committed Jun 14, 2018
1 parent b67eaec commit 9b1ce38
Show file tree
Hide file tree
Showing 6 changed files with 110 additions and 21 deletions.
Expand Up @@ -3171,8 +3171,8 @@ protected void unTrack(OIdentifiable id) {
super.unTrack(id); super.unTrack(id);
} }


private Object getDeltaValue(Object originalValue, Object currentValue, boolean changed){ private Object getUpdateDeltaValue(Object currentValue, boolean changed){
//TODO review this clause once again //TODO review this clause once again
if (changed || !(currentValue instanceof ODocument)){ if (changed || !(currentValue instanceof ODocument)){
return currentValue; return currentValue;
} }
Expand All @@ -3184,20 +3184,64 @@ private Object getDeltaValue(Object originalValue, Object currentValue, boolean
else{ else{
docVal = (ODocument)currentValue; docVal = (ODocument)currentValue;
} }
return docVal.getDeltaFromOriginal(); return docVal.getDeltaFromOriginalForUpdate();
}
}

private Object getDeleteDeltaValue(Object currentValue, boolean exist){
//TODO review this clause once again
if (!exist || !(currentValue instanceof ODocument)){
return currentValue;
}
else{
ODocument docVal;
if (currentValue instanceof ODocumentSerializable){
docVal = ((ODocumentSerializable)currentValue).toDocument();
}
else{
docVal = (ODocument)currentValue;
}
return docVal.getDeltaFromOriginalForDelete();
} }
} }


public ODocument getDeltaFromOriginal(){ private ODocument getDeltaFromOriginalForUpdate(){
ODocument retVal = new ODocument(); ODocument updated = new ODocument();
//get updated and new records
for (Map.Entry<String, ODocumentEntry> fieldVal : _fields.entrySet()){ for (Map.Entry<String, ODocumentEntry> fieldVal : _fields.entrySet()){
ODocumentEntry val = fieldVal.getValue(); ODocumentEntry val = fieldVal.getValue();
if (val.isChangedTree()){ if (val.isChangedTree()){
String fieldName = fieldVal.getKey(); String fieldName = fieldVal.getKey();
Object deltaValue = getDeltaValue(val.original, val.value, val.isChanged()); Object deltaValue = getUpdateDeltaValue(val.value, val.isChanged());
retVal.field(fieldName, deltaValue); updated.field(fieldName, deltaValue);
}
}
return updated;
}

private ODocument getDeltaFromOriginalForDelete(){
ODocument updated = new ODocument();
//get updated and new records
for (Map.Entry<String, ODocumentEntry> fieldVal : _fields.entrySet()){
ODocumentEntry val = fieldVal.getValue();
if (val.hasNonExistingTree()){
String fieldName = fieldVal.getKey();
Object deltaValue = getDeleteDeltaValue(val.value, val.exist());
updated.field(fieldName, deltaValue);
} }
} }
return retVal; return updated;
}

public ODocument getDeltaFromOriginal(){
ODocument ret = new ODocument();
ODocument updated = getDeltaFromOriginalForUpdate();
ret.field("u", updated);

//get deleted delta
ODocument deleted = getDeltaFromOriginalForDelete();
ret.field("d", deleted);

return ret;
} }
} }
Expand Up @@ -51,8 +51,9 @@ public boolean isChanged() {
} }


public boolean isChangedTree(){ public boolean isChangedTree(){
if (changed) if (changed && exist){
return true; return true;
}


if (value instanceof ODocument){ if (value instanceof ODocument){
ODocument doc = (ODocument)value; ODocument doc = (ODocument)value;
Expand All @@ -65,6 +66,23 @@ public boolean isChangedTree(){


return false; return false;
} }

public boolean hasNonExistingTree(){
if (!exist){
return true;
}

if (value instanceof ODocument){
ODocument doc = (ODocument)value;
for (Map.Entry<String, ODocumentEntry> field : doc._fields.entrySet()){
if (field.getValue().hasNonExistingTree()){
return true;
}
}
}

return false;
}


public void setChanged(final boolean changed) { public void setChanged(final boolean changed) {
this.changed = changed; this.changed = changed;
Expand Down
Expand Up @@ -866,8 +866,15 @@ public byte[] toStream(ORecord iSource, boolean iOnlyDelta) {
} else { } else {
final BytesContainer container = new BytesContainer(); final BytesContainer container = new BytesContainer();


ODocument doc = (ODocument) iSource;
// SERIALIZE RECORD // SERIALIZE RECORD
serialize((ODocument) iSource, container, false); if (!iOnlyDelta){
serialize(doc, container, false);
}
else{
ODocument deltaDoc = doc.getDeltaFromOriginal();
serialize(deltaDoc, container, false);
}


return container.fitBytes(); return container.fitBytes();
} }
Expand Down
Expand Up @@ -392,19 +392,31 @@ public void testGetDiffFromOriginalSimple(){
String constantFieldName = "constantField"; String constantFieldName = "constantField";
String originalValue = "orValue"; String originalValue = "orValue";
String testValue = "testValue"; String testValue = "testValue";
String removeField = "removeField";

doc.field(fieldName, originalValue); doc.field(fieldName, originalValue);
doc.field(constantFieldName, "someValue"); doc.field(constantFieldName, "someValue");
doc.field(removeField, "removeVal");


doc = db.save(doc); doc = db.save(doc);


// doc._fields.get(fieldName).original = originalValue; // doc._fields.get(fieldName).original = originalValue;
// doc._fields.get(constantFieldName).changed = false; // doc._fields.get(constantFieldName).changed = false;
doc.field(fieldName, testValue); doc.field(fieldName, testValue);
doc.removeField(removeField);
ODocument dc = doc.getDeltaFromOriginal(); ODocument dc = doc.getDeltaFromOriginal();
assertFalse(dc._fields.containsKey(constantFieldName));
assertTrue(dc._fields.containsKey(fieldName));
assertEquals(dc.field(fieldName), testValue);


ODocument updatePart = dc.field("u");
ODocument deletePart = dc.field("d");

assertFalse(updatePart._fields.containsKey(constantFieldName));
assertTrue(updatePart._fields.containsKey(fieldName));
assertEquals(updatePart.field(fieldName), testValue);

assertFalse(deletePart._fields.containsKey(constantFieldName));
assertTrue(deletePart._fields.containsKey(removeField));

doc = db.save(doc);
db.close(); db.close();
} }


Expand Down
Expand Up @@ -6,7 +6,6 @@
import com.orientechnologies.orient.core.Orient; import com.orientechnologies.orient.core.Orient;
import com.orientechnologies.orient.core.command.OCommandDistributedReplicateRequest; import com.orientechnologies.orient.core.command.OCommandDistributedReplicateRequest;
import com.orientechnologies.orient.core.db.ODatabaseDocumentInternal; import com.orientechnologies.orient.core.db.ODatabaseDocumentInternal;
import com.orientechnologies.orient.core.db.ODatabaseRecordThreadLocal;
import com.orientechnologies.orient.core.db.record.ORecordOperation; import com.orientechnologies.orient.core.db.record.ORecordOperation;
import com.orientechnologies.orient.core.exception.OConcurrentModificationException; import com.orientechnologies.orient.core.exception.OConcurrentModificationException;
import com.orientechnologies.orient.core.id.ORecordId; import com.orientechnologies.orient.core.id.ORecordId;
Expand Down Expand Up @@ -68,7 +67,12 @@ public void genOps(List<ORecordOperation> ops) {
request.setId(txEntry.getRecord().getIdentity()); request.setId(txEntry.getRecord().getIdentity());
request.setRecordType(ORecordInternal.getRecordType(txEntry.getRecord())); request.setRecordType(ORecordInternal.getRecordType(txEntry.getRecord()));
switch (txEntry.type) { switch (txEntry.type) {
case ORecordOperation.CREATED: case ORecordOperation.CREATED:{
byte[] newRec = ORecordSerializerNetworkV37.INSTANCE.toStream(txEntry.getRecord(), false);
request.setRecord(newRec);
request.setContentChanged(ORecordInternal.isContentChanged(txEntry.getRecord()));
}
break;
case ORecordOperation.UPDATED: case ORecordOperation.UPDATED:
byte[] deltaRec = ORecordSerializerNetworkV37.INSTANCE.toStream(txEntry.getRecord(), true); byte[] deltaRec = ORecordSerializerNetworkV37.INSTANCE.toStream(txEntry.getRecord(), true);
byte[] newRec = ORecordSerializerNetworkV37.INSTANCE.toStream(txEntry.getRecord(), false); byte[] newRec = ORecordSerializerNetworkV37.INSTANCE.toStream(txEntry.getRecord(), false);
Expand Down Expand Up @@ -157,12 +161,16 @@ private void convert(ODatabaseDocumentInternal database) {


ORecord record = null; ORecord record = null;
switch (type) { switch (type) {
case ORecordOperation.CREATED: case ORecordOperation.CREATED:{
record = ORecordSerializerNetworkV37.INSTANCE.fromStream(req.getRecord(), null, null);
ORecordInternal.setRecordSerializer(record, database.getSerializer());
}
break;
case ORecordOperation.UPDATED: { case ORecordOperation.UPDATED: {
record = ORecordSerializerNetworkV37.INSTANCE.fromStream(req.getRecord(), null, null); record = ORecordSerializerNetworkV37.INSTANCE.fromStream(req.getRecord(), null, null);
ORecordInternal.setRecordSerializer(record, database.getSerializer()); ORecordInternal.setRecordSerializer(record, database.getSerializer());
} }
break; break;
case ORecordOperation.DELETED: case ORecordOperation.DELETED:
record = database.getRecord(req.getId()); record = database.getRecord(req.getId());
if (record == null) { if (record == null) {
Expand Down
Expand Up @@ -117,7 +117,7 @@ public void testExecutionConcurrentModificationDelete() throws Exception {
doc.field("first", "one"); doc.field("first", "one");
session.save(doc); session.save(doc);
ODocument old = doc.copy(); ODocument old = doc.copy();
doc.field("first", "two"); doc.field("first", "two");
session.save(doc); session.save(doc);
session.getLocalCache().clear(); session.getLocalCache().clear();


Expand Down

0 comments on commit 9b1ce38

Please sign in to comment.