Skip to content

Commit

Permalink
fixed send of index changes after clear in remote transactions
Browse files Browse the repository at this point in the history
  • Loading branch information
tglman committed Jun 5, 2017
1 parent 0080d76 commit 46bbcab
Show file tree
Hide file tree
Showing 2 changed files with 92 additions and 58 deletions.
Expand Up @@ -239,37 +239,35 @@ static void writeTransactionIndexChanges(OChannelDataOutput network, ORecordSeri
for (IndexChange indexChange : changes) {
network.writeString(indexChange.getName());
network.writeBoolean(indexChange.getKeyChanges().cleared);
if (!indexChange.getKeyChanges().cleared) {

int size = indexChange.getKeyChanges().changesPerKey.size();
if (indexChange.getKeyChanges().nullKeyChanges != null) {
size += 1;
int size = indexChange.getKeyChanges().changesPerKey.size();
if (indexChange.getKeyChanges().nullKeyChanges != null) {
size += 1;
}
network.writeInt(size);
if (indexChange.getKeyChanges().nullKeyChanges != null) {
network.writeByte((byte) -1);
network.writeInt(indexChange.getKeyChanges().nullKeyChanges.entries.size());
for (OTransactionIndexChangesPerKey.OTransactionIndexEntry perKeyChange : indexChange
.getKeyChanges().nullKeyChanges.entries) {
network.writeInt(perKeyChange.operation.ordinal());
network.writeRID(perKeyChange.value.getIdentity());
}
network.writeInt(size);
if (indexChange.getKeyChanges().nullKeyChanges != null) {
network.writeByte((byte) -1);
network.writeInt(indexChange.getKeyChanges().nullKeyChanges.entries.size());
for (OTransactionIndexChangesPerKey.OTransactionIndexEntry perKeyChange : indexChange
.getKeyChanges().nullKeyChanges.entries) {
network.writeInt(perKeyChange.operation.ordinal());
}
for (OTransactionIndexChangesPerKey change : indexChange.getKeyChanges().changesPerKey.values()) {
OType type = OType.getTypeByValue(change.key);
byte[] value = serializer.serializeValue(change.key, type);
network.writeByte((byte) type.getId());
network.writeBytes(value);
network.writeInt(change.entries.size());
for (OTransactionIndexChangesPerKey.OTransactionIndexEntry perKeyChange : change.entries) {
OTransactionIndexChanges.OPERATION op = perKeyChange.operation;
if (op == OTransactionIndexChanges.OPERATION.REMOVE && perKeyChange.value == null)
op = OTransactionIndexChanges.OPERATION.CLEAR;

network.writeInt(op.ordinal());
if (op != OTransactionIndexChanges.OPERATION.CLEAR)
network.writeRID(perKeyChange.value.getIdentity());
}
}
for (OTransactionIndexChangesPerKey change : indexChange.getKeyChanges().changesPerKey.values()) {
OType type = OType.getTypeByValue(change.key);
byte[] value = serializer.serializeValue(change.key, type);
network.writeByte((byte) type.getId());
network.writeBytes(value);
network.writeInt(change.entries.size());
for (OTransactionIndexChangesPerKey.OTransactionIndexEntry perKeyChange : change.entries) {
OTransactionIndexChanges.OPERATION op = perKeyChange.operation;
if (op == OTransactionIndexChanges.OPERATION.REMOVE && perKeyChange.value == null)
op = OTransactionIndexChanges.OPERATION.CLEAR;

network.writeInt(op.ordinal());
if (op != OTransactionIndexChanges.OPERATION.CLEAR)
network.writeRID(perKeyChange.value.getIdentity());
}
}
}
}
Expand All @@ -284,40 +282,38 @@ static List<IndexChange> readTransactionIndexChanges(OChannelDataInput channel,
boolean cleared = channel.readBoolean();
OTransactionIndexChanges entry = new OTransactionIndexChanges();
entry.cleared = cleared;
if (!cleared) {
int changeCount = channel.readInt();
NavigableMap<Object, OTransactionIndexChangesPerKey> entries = new TreeMap<>();
while (changeCount-- > 0) {
byte bt = channel.readByte();
Object key;
if (bt == -1) {
key = null;
} else {
OType type = OType.getById(bt);
key = serializer.deserializeValue(channel.readBytes(), type);
}
OTransactionIndexChangesPerKey changesPerKey = new OTransactionIndexChangesPerKey(key);
int keyChangeCount = channel.readInt();
while (keyChangeCount-- > 0) {
int op = channel.readInt();
OTransactionIndexChanges.OPERATION oper = OTransactionIndexChanges.OPERATION.values()[op];
ORecordId id;
if (oper == OTransactionIndexChanges.OPERATION.CLEAR) {
oper = OTransactionIndexChanges.OPERATION.REMOVE;
id = null;
} else {
id = channel.readRID();
}
changesPerKey.add(id, oper);
}
if (key == null) {
entry.nullKeyChanges = changesPerKey;
int changeCount = channel.readInt();
NavigableMap<Object, OTransactionIndexChangesPerKey> entries = new TreeMap<>();
while (changeCount-- > 0) {
byte bt = channel.readByte();
Object key;
if (bt == -1) {
key = null;
} else {
OType type = OType.getById(bt);
key = serializer.deserializeValue(channel.readBytes(), type);
}
OTransactionIndexChangesPerKey changesPerKey = new OTransactionIndexChangesPerKey(key);
int keyChangeCount = channel.readInt();
while (keyChangeCount-- > 0) {
int op = channel.readInt();
OTransactionIndexChanges.OPERATION oper = OTransactionIndexChanges.OPERATION.values()[op];
ORecordId id;
if (oper == OTransactionIndexChanges.OPERATION.CLEAR) {
oper = OTransactionIndexChanges.OPERATION.REMOVE;
id = null;
} else {
entries.put(changesPerKey.key, changesPerKey);
id = channel.readRID();
}
changesPerKey.add(id, oper);
}
if (key == null) {
entry.nullKeyChanges = changesPerKey;
} else {
entries.put(changesPerKey.key, changesPerKey);
}
entry.changesPerKey = entries;
}
entry.changesPerKey = entries;
changes.add(new IndexChange(indexName, entry));
}
return changes;
Expand Down
Expand Up @@ -160,6 +160,44 @@ public void testTransactionFetchResponseWriteRead() throws IOException {
assertEquals(entryChange.entries.get(0).operation, OPERATION.PUT);
assertEquals(entryChange.entries.get(1).value, new ORecordId(2, 2));
assertEquals(entryChange.entries.get(1).operation, OPERATION.REMOVE);
}

@Test
public void testTransactionClearIndexFetchResponseWriteRead() throws IOException {

List<ORecordOperation> operations = new ArrayList<>();
Map<String, OTransactionIndexChanges> changes = new HashMap<>();
OTransactionIndexChanges change = new OTransactionIndexChanges();
change.cleared = true;
change.changesPerKey = new TreeMap<>(ODefaultComparator.INSTANCE);
OTransactionIndexChangesPerKey keyChange = new OTransactionIndexChangesPerKey("key");
keyChange.add(new ORecordId(1, 2), OPERATION.PUT);
keyChange.add(new ORecordId(2, 2), OPERATION.REMOVE);
change.changesPerKey.put(keyChange.key, keyChange);
changes.put("some", change);

MockChannel channel = new MockChannel();
OFetchTransactionResponse response = new OFetchTransactionResponse(10, operations, changes);
response.write(channel, 0, ORecordSerializerNetworkV37.INSTANCE);

channel.close();

OFetchTransactionResponse readResponse = new OFetchTransactionResponse(10, operations, changes);
readResponse.read(channel, null);

assertEquals(readResponse.getTxId(), 10);
assertEquals(readResponse.getIndexChanges().size(), 1);
assertEquals(readResponse.getIndexChanges().get(0).getName(), "some");
OTransactionIndexChanges val = readResponse.getIndexChanges().get(0).getKeyChanges();
assertEquals(val.cleared, true);
assertEquals(val.changesPerKey.size(), 1);
OTransactionIndexChangesPerKey entryChange = val.changesPerKey.firstEntry().getValue();
assertEquals(entryChange.key, "key");
assertEquals(entryChange.entries.size(), 2);
assertEquals(entryChange.entries.get(0).value, new ORecordId(1, 2));
assertEquals(entryChange.entries.get(0).operation, OPERATION.PUT);
assertEquals(entryChange.entries.get(1).value, new ORecordId(2, 2));
assertEquals(entryChange.entries.get(1).operation, OPERATION.REMOVE);

}

Expand Down

0 comments on commit 46bbcab

Please sign in to comment.