Skip to content

Commit

Permalink
add test case for coordinated transaction begin, with relative refact…
Browse files Browse the repository at this point in the history
…or for rid allocations
  • Loading branch information
tglman committed Aug 31, 2018
1 parent 29949cc commit 3313779
Show file tree
Hide file tree
Showing 7 changed files with 112 additions and 50 deletions.
@@ -0,0 +1,23 @@
package com.orientechnologies.orient.server.distributed.impl.coordinator.transaction;

import com.orientechnologies.orient.server.distributed.impl.coordinator.OClusterPositionAllocator;

import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;

public class OMockAllocator implements OClusterPositionAllocator {
//Just Test not really need to be concurrent.
private Map<Integer, AtomicLong> allocator = new HashMap<>();

@Override
public long allocate(int clusterId) {
AtomicLong counter = allocator.get(clusterId);
if (counter == null) {
counter = new AtomicLong(0);
allocator.put(clusterId, counter);
}
return counter.get();
}
}
Expand Up @@ -38,11 +38,8 @@ public ONodeResponse execute(ODistributedMember nodeFrom, OLogId opId, ODistribu
OTransactionOptimisticDistributed tx = new OTransactionOptimisticDistributed(session, operations);
ONodeResponse response;
try {
//TODO:Refactor this method to match the new api
((ODatabaseDocumentDistributed) session).txFirstPhase(operationId, tx);
//TODO:get the allocated ids and send to the coordinator.
Success metadata = new Success(new ArrayList<>());
response = new OTransactionFirstPhaseResult(Type.SUCCESS, metadata);
response = new OTransactionFirstPhaseResult(Type.SUCCESS, null);

} catch (OConcurrentModificationException ex) {
ConcurrentModification metadata = new ConcurrentModification((ORecordId) ex.getRid().getIdentity(),
Expand All @@ -55,7 +52,8 @@ public ONodeResponse execute(ODistributedMember nodeFrom, OLogId opId, ODistribu
} catch (RuntimeException ex) {
//TODO: get action with some exception handler to offline the node or activate a recover operation
response = new OTransactionFirstPhaseResult(Type.EXCEPTION, null);
} return response;
}
return response;
}

private List<ORecordOperation> convert(ODatabaseDocumentInternal database, List<ORecordOperationRequest> operations) {
Expand Down
Expand Up @@ -4,21 +4,17 @@
import com.orientechnologies.orient.core.id.ORecordId;
import com.orientechnologies.orient.server.distributed.impl.coordinator.*;
import com.orientechnologies.orient.server.distributed.impl.coordinator.transaction.OTransactionFirstPhaseResult.ConcurrentModification;
import com.orientechnologies.orient.server.distributed.impl.coordinator.transaction.OTransactionFirstPhaseResult.Success;
import com.orientechnologies.orient.server.distributed.impl.coordinator.transaction.OTransactionFirstPhaseResult.UniqueKeyViolation;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.*;

public class OTransactionFirstPhaseResponseHandler implements OResponseHandler {

private final OSessionOperationId operationId;
private final OTransactionSubmit request;
private final ODistributedMember requester;
private int responseCount = 0;
private final Map<ODistributedMember, Success> success = new HashMap<>();
private final Set<ODistributedMember> success = new HashSet<>();
private final Map<ORID, List<ODistributedMember>> cme = new HashMap<>();
private final Map<String, List<ODistributedMember>> unique = new HashMap<>();
private final List<ODistributedMember> exceptions = new ArrayList<>();
Expand All @@ -41,7 +37,7 @@ public boolean receive(ODistributedCoordinator coordinator, ORequestContext cont
OTransactionFirstPhaseResult result = (OTransactionFirstPhaseResult) response;
switch (result.getType()) {
case SUCCESS:
success.put(member, (Success) result.getResultMetadata());
success.add(member);
break;
case CONCURRENT_MODIFICATION_EXCEPTION: {
ConcurrentModification concurrentModification = (ConcurrentModification) result.getResultMetadata();
Expand Down Expand Up @@ -70,8 +66,7 @@ public boolean receive(ODistributedCoordinator coordinator, ORequestContext cont
int quorum = context.getQuorum();
if (responseCount >= quorum && !secondPhaseSent) {
if (success.size() >= quorum) {
Success ids = success.values().iterator().next();
sendSecondPhaseSuccess(coordinator, ids.getAllocatedIds());
sendSecondPhaseSuccess(coordinator);
}

for (Map.Entry<ORID, List<ODistributedMember>> entry : cme.entrySet()) {
Expand All @@ -93,20 +88,23 @@ public boolean receive(ODistributedCoordinator coordinator, ORequestContext cont
private void sendSecondPhaseError(ODistributedCoordinator coordinator) {
OTransactionSecondPhaseResponseHandler responseHandler = new OTransactionSecondPhaseResponseHandler(true, request, requester,
null);
coordinator.sendOperation(null, new OTransactionSecondPhaseOperation(operationId, false, new ArrayList<>()), responseHandler);
coordinator.sendOperation(null, new OTransactionSecondPhaseOperation(operationId, false), responseHandler);
if (guards != null) {
for (OLockGuard guard : guards) {
guard.release();
}
}
coordinator.reply(requester, new OTransactionResponse());
if (!replySent) {
coordinator.reply(requester, new OTransactionResponse());
replySent = true;
}
secondPhaseSent = true;
}

private void sendSecondPhaseSuccess(ODistributedCoordinator coordinator, List<ORecordId> allocatedIds) {
private void sendSecondPhaseSuccess(ODistributedCoordinator coordinator) {
OTransactionSecondPhaseResponseHandler responseHandler = new OTransactionSecondPhaseResponseHandler(false, request, requester,
guards);
coordinator.sendOperation(null, new OTransactionSecondPhaseOperation(operationId, true, allocatedIds), responseHandler);
coordinator.sendOperation(null, new OTransactionSecondPhaseOperation(operationId, true), responseHandler);
secondPhaseSent = true;
}

Expand Down
Expand Up @@ -19,18 +19,6 @@ public enum Type {
private Type type;
private Object resultMetadata;

public static class Success {
private List<ORecordId> allocatedIds;

public Success(List<ORecordId> allocatedIds) {
this.allocatedIds = allocatedIds;
}

public List<ORecordId> getAllocatedIds() {
return allocatedIds;
}
}

public static class ConcurrentModification {
private final ORecordId recordId;
private final int updateVersion;
Expand Down
Expand Up @@ -11,12 +11,10 @@
public class OTransactionSecondPhaseOperation implements ONodeRequest {
private OSessionOperationId operationId;
private boolean success;
private List<ORecordId> allocatedIds;

public OTransactionSecondPhaseOperation(OSessionOperationId operationId, boolean success, List<ORecordId> allocatedIds) {
public OTransactionSecondPhaseOperation(OSessionOperationId operationId, boolean success) {
this.operationId = operationId;
this.success = success;
this.allocatedIds = allocatedIds;
}

@Override
Expand All @@ -26,23 +24,18 @@ public ONodeResponse execute(ODistributedMember nodeFrom, OLogId opId, ODistribu
return new OTransactionSecondPhaseResponse(true);
}

public List<ORecordId> getAllocatedIds() {
return allocatedIds;
}

@Override
public boolean equals(Object o) {
if (this == o)
return true;
if (o == null || getClass() != o.getClass())
return false;
OTransactionSecondPhaseOperation that = (OTransactionSecondPhaseOperation) o;
return success == that.success && Objects.equals(allocatedIds, that.allocatedIds);
return success == that.success;
}

@Override
public int hashCode() {

return Objects.hash(success, allocatedIds);
return Objects.hash(success);
}
}
Expand Up @@ -3,7 +3,6 @@
import com.orientechnologies.orient.core.id.ORecordId;
import com.orientechnologies.orient.server.distributed.impl.coordinator.*;
import com.orientechnologies.orient.server.distributed.impl.coordinator.transaction.OTransactionFirstPhaseResult.ConcurrentModification;
import com.orientechnologies.orient.server.distributed.impl.coordinator.transaction.OTransactionFirstPhaseResult.Success;
import com.orientechnologies.orient.server.distributed.impl.coordinator.transaction.OTransactionFirstPhaseResult.Type;
import com.orientechnologies.orient.server.distributed.impl.coordinator.transaction.OTransactionFirstPhaseResult.UniqueKeyViolation;
import org.junit.Before;
Expand All @@ -15,9 +14,7 @@
import java.util.ArrayList;
import java.util.List;

import static org.mockito.Matchers.any;
import static org.mockito.Matchers.eq;
import static org.mockito.Matchers.same;
import static org.mockito.Matchers.*;
import static org.mockito.Mockito.times;

public class FirstPhaseResponseHandlerTest {
Expand All @@ -44,12 +41,12 @@ public void testFirstPhaseQuorumSuccess() {
OLogId id = new OLogId(1);
ORequestContext context = new ORequestContext(null, null, null, members, handler, id);

handler.receive(coordinator, context, member1, new OTransactionFirstPhaseResult(Type.SUCCESS, new Success(new ArrayList<>())));
handler.receive(coordinator, context, member2, new OTransactionFirstPhaseResult(Type.SUCCESS, new Success(new ArrayList<>())));
handler.receive(coordinator, context, member3, new OTransactionFirstPhaseResult(Type.SUCCESS, new Success(new ArrayList<>())));
handler.receive(coordinator, context, member1, new OTransactionFirstPhaseResult(Type.SUCCESS, null));
handler.receive(coordinator, context, member2, new OTransactionFirstPhaseResult(Type.SUCCESS, null));
handler.receive(coordinator, context, member3, new OTransactionFirstPhaseResult(Type.SUCCESS, null));

Mockito.verify(coordinator, times(1))
.sendOperation(any(OSubmitRequest.class), eq(new OTransactionSecondPhaseOperation(operationId, true, new ArrayList<>())),
.sendOperation(any(OSubmitRequest.class), eq(new OTransactionSecondPhaseOperation(operationId, true)),
any(OTransactionSecondPhaseResponseHandler.class));
Mockito.verify(coordinator, times(0)).reply(same(member1), any(OTransactionResponse.class));
}
Expand All @@ -75,7 +72,7 @@ public void testFirstPhaseQuorumCME() {
new ConcurrentModification(new ORecordId(10, 10), 0, 1)));

Mockito.verify(coordinator, times(1))
.sendOperation(any(OSubmitRequest.class), eq(new OTransactionSecondPhaseOperation(operationId, false, new ArrayList<>())),
.sendOperation(any(OSubmitRequest.class), eq(new OTransactionSecondPhaseOperation(operationId, false)),
any(OTransactionSecondPhaseResponseHandler.class));

Mockito.verify(coordinator, times(1)).reply(same(member1), any(OTransactionResponse.class));
Expand All @@ -102,7 +99,7 @@ public void testFirstPhaseQuorumUnique() {
new UniqueKeyViolation("Key", new ORecordId(10, 10), new ORecordId(10, 11), "Class.property")));

Mockito.verify(coordinator, times(1))
.sendOperation(any(OSubmitRequest.class), eq(new OTransactionSecondPhaseOperation(operationId, false, new ArrayList<>())),
.sendOperation(any(OSubmitRequest.class), eq(new OTransactionSecondPhaseOperation(operationId, false)),
any(OTransactionSecondPhaseResponseHandler.class));

Mockito.verify(coordinator, times(1)).reply(same(member1), any(OTransactionResponse.class));
Expand Down
@@ -0,0 +1,65 @@
package com.orientechnologies.orient.server.distributed.impl.coordinator.transaction;

import com.orientechnologies.orient.core.db.record.ORecordOperation;
import com.orientechnologies.orient.core.id.ORecordId;
import com.orientechnologies.orient.core.record.impl.ODocument;
import com.orientechnologies.orient.server.distributed.impl.coordinator.*;
import com.orientechnologies.orient.server.distributed.impl.coordinator.mocktx.CoordinatorTxTest;
import com.orientechnologies.orient.server.distributed.impl.coordinator.mocktx.OSubmitTx;
import org.junit.Test;

import java.util.ArrayList;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;

import static org.junit.Assert.assertTrue;

public class OSubmitTransactionBeginTest {

@Test
public void testBegin() throws InterruptedException {
ODistributedCoordinator coordinator = new ODistributedCoordinator(Executors.newSingleThreadExecutor(), new MockOperationLog(),
new ODistributedLockManagerImpl(0), new OMockAllocator());

MockChannel cOne = new MockChannel();
ODistributedMember mOne = new ODistributedMember("one", cOne);
coordinator.join(mOne);

MockChannel cTwo = new MockChannel();
ODistributedMember mTwo = new ODistributedMember("two", cTwo);
coordinator.join(mTwo);

MockChannel cThree = new MockChannel();
ODistributedMember mThree = new ODistributedMember("three", cThree);
coordinator.join(mThree);

ArrayList<ORecordOperation> recordOps = new ArrayList<>();
ORecordOperation op = new ORecordOperation(new ORecordId(10, 10), ORecordOperation.CREATED);
op.setRecord(new ODocument("aaaa"));
recordOps.add(op);
coordinator.submit(mOne, new OTransactionSubmit(new OSessionOperationId(), recordOps, new ArrayList<>()));
assertTrue(cOne.sentRequest.await(1, TimeUnit.SECONDS));
assertTrue(cTwo.sentRequest.await(1, TimeUnit.SECONDS));
assertTrue(cThree.sentRequest.await(1, TimeUnit.SECONDS));
}

private class MockChannel implements ODistributedChannel {
private CountDownLatch sentRequest = new CountDownLatch(1);

@Override
public void sendRequest(OLogId id, ONodeRequest nodeRequest) {
sentRequest.countDown();
}

@Override
public void sendResponse(OLogId id, ONodeResponse nodeResponse) {

}

@Override
public void reply(OSubmitResponse response) {

}
}
}

0 comments on commit 3313779

Please sign in to comment.