Skip to content

Commit

Permalink
minor changes of the structure of two phase operation
Browse files Browse the repository at this point in the history
  • Loading branch information
tglman committed Jul 16, 2018
1 parent 7fda18e commit 9244282
Show file tree
Hide file tree
Showing 7 changed files with 70 additions and 39 deletions.
Expand Up @@ -33,7 +33,7 @@ public OLogId log(ONodeRequest request) {


public ORequestContext sendOperation(OSubmitRequest submitRequest, ONodeRequest nodeRequest, OResponseHandler handler) { public ORequestContext sendOperation(OSubmitRequest submitRequest, ONodeRequest nodeRequest, OResponseHandler handler) {
OLogId id = log(nodeRequest); OLogId id = log(nodeRequest);
ORequestContext context = new ORequestContext(this, submitRequest, nodeRequest, members.size() / 2 + 1, handler); ORequestContext context = new ORequestContext(this, submitRequest, nodeRequest, members.values(), handler);
contexts.put(id, context); contexts.put(id, context);
for (ODistributedMember member : members.values()) { for (ODistributedMember member : members.values()) {
member.sendRequest(id, nodeRequest); member.sendRequest(id, nodeRequest);
Expand Down
@@ -1,6 +1,7 @@
package com.orientechnologies.orient.server.distributed.impl.coordinator; package com.orientechnologies.orient.server.distributed.impl.coordinator;


import java.util.ArrayList; import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections; import java.util.Collections;
import java.util.List; import java.util.List;


Expand All @@ -10,27 +11,27 @@ public enum Status {
STARTED, QUORUM_OK, QUORUM_KO STARTED, QUORUM_OK, QUORUM_KO
} }


private OSubmitRequest submitRequest; private OSubmitRequest submitRequest;
private ONodeRequest nodeRequest; private ONodeRequest nodeRequest;
private List<ONodeResponse> responses = Collections.synchronizedList(new ArrayList<>()); private Collection<ODistributedMember> involvedMembers;
private ODistributedCoordinator coordinator; private List<ONodeResponse> responses = Collections.synchronizedList(new ArrayList<>());
private int quorum; private ODistributedCoordinator coordinator;
private OResponseHandler handler; private int quorum;
private Status status; private OResponseHandler handler;


public ORequestContext(ODistributedCoordinator coordinator, OSubmitRequest submitRequest, ONodeRequest nodeRequest, int quorum, public ORequestContext(ODistributedCoordinator coordinator, OSubmitRequest submitRequest, ONodeRequest nodeRequest,
OResponseHandler handler) { Collection<ODistributedMember> involvedMembers, OResponseHandler handler) {
this.coordinator = coordinator; this.coordinator = coordinator;
this.submitRequest = submitRequest; this.submitRequest = submitRequest;
this.nodeRequest = nodeRequest; this.nodeRequest = nodeRequest;
this.quorum = quorum; this.involvedMembers = involvedMembers;
this.handler = handler; this.handler = handler;
this.status = Status.STARTED; this.quorum = (involvedMembers.size() / 2) + 1;
} }


public void receive(ONodeResponse response) { public void receive(ONodeResponse response) {
responses.add(response); responses.add(response);
status = handler.receive(coordinator, this, response, status); handler.receive(coordinator, this, response);
} }


public List<ONodeResponse> getResponses() { public List<ONodeResponse> getResponses() {
Expand Down
@@ -1,6 +1,5 @@
package com.orientechnologies.orient.server.distributed.impl.coordinator; package com.orientechnologies.orient.server.distributed.impl.coordinator;


public interface OResponseHandler { public interface OResponseHandler {
ORequestContext.Status receive(ODistributedCoordinator coordinator, ORequestContext context, ONodeResponse response, void receive(ODistributedCoordinator coordinator, ORequestContext context, ONodeResponse response);
ORequestContext.Status status);
} }
Expand Up @@ -25,11 +25,10 @@ public void simpleOperationTest() throws InterruptedException {
@Override @Override
public void begin(ODistributedMember member, ODistributedCoordinator coordinator) { public void begin(ODistributedMember member, ODistributedCoordinator coordinator) {
MockNodeRequest nodeRequest = new MockNodeRequest(); MockNodeRequest nodeRequest = new MockNodeRequest();
coordinator.sendOperation(this, nodeRequest, (coordinator1, context, response, status) -> { coordinator.sendOperation(this, nodeRequest, (coordinator1, context, response) -> {
if (context.getResponses().size() == 1) { if (context.getResponses().size() == 1) {
responseReceived.countDown(); responseReceived.countDown();
} }
return status;
}); });
} }


Expand All @@ -55,22 +54,20 @@ public void testTwoPhase() throws InterruptedException {
@Override @Override
public void begin(ODistributedMember member, ODistributedCoordinator coordinator) { public void begin(ODistributedMember member, ODistributedCoordinator coordinator) {
MockNodeRequest nodeRequest = new MockNodeRequest(); MockNodeRequest nodeRequest = new MockNodeRequest();
coordinator.sendOperation(this, nodeRequest, (coordinator1, context, response, status) -> { coordinator.sendOperation(this, nodeRequest, (coordinator1, context, response) -> {
if (context.getResponses().size() == 1) { if (context.getResponses().size() == 1) {
coordinator1.sendOperation(this, new ONodeRequest() { coordinator1.sendOperation(this, new ONodeRequest() {
@Override @Override
public ONodeResponse execute(ODistributedMember nodeFrom, OLogId opId, ODistributedExecutor executor) { public ONodeResponse execute(ODistributedMember nodeFrom, OLogId opId, ODistributedExecutor executor) {
return null; return null;
} }
}, (coordinator2, context1, response1, status1) -> { }, (coordinator2, context1, response1) -> {
if (context.getResponses().size() == 1) { if (context.getResponses().size() == 1) {
member.reply(new OSubmitResponse() { member.reply(new OSubmitResponse() {
}); });
} }
return status1;
}); });
} }
return status;
}); });
} }
}); });
Expand Down
@@ -0,0 +1,24 @@
package com.orientechnologies.orient.server.distributed.impl.coordinator.mocktx;

import com.orientechnologies.orient.server.distributed.impl.coordinator.*;

class FirstPhaseHandler implements OResponseHandler {
private OSubmitTx submitTx;
private final ODistributedMember member;
private boolean done;

public FirstPhaseHandler(OSubmitTx submitTx, ODistributedMember member) {
this.submitTx = submitTx;
this.member = member;
}

@Override
public void receive(ODistributedCoordinator coordinator1, ORequestContext context, ONodeResponse response) {
if (context.getResponses().size() >= context.getQuorum() && !done) {
done = true;
submitTx.firstPhase = true;
coordinator1.sendOperation(submitTx, new OPhase2Tx(), new SecondPhaseResponseHandler(submitTx, member));
}
}

}
Expand Up @@ -3,27 +3,13 @@
import com.orientechnologies.orient.server.distributed.impl.coordinator.*; import com.orientechnologies.orient.server.distributed.impl.coordinator.*;


public class OSubmitTx implements OSubmitRequest { public class OSubmitTx implements OSubmitRequest {
boolean firstPhase = false;
boolean secondPhase = false; public boolean firstPhase;
public boolean secondPhase;


@Override @Override
public void begin(ODistributedMember member, ODistributedCoordinator coordinator) { public void begin(ODistributedMember member, ODistributedCoordinator coordinator) {
coordinator.sendOperation(this, new OPhase1Tx(), (coordinator1, context, response, status) -> { coordinator.sendOperation(this, new OPhase1Tx(), new FirstPhaseHandler(this, member));
if (context.getResponses().size() >= context.getQuorum() && status == ORequestContext.Status.STARTED) {
status = ORequestContext.Status.QUORUM_OK;
firstPhase = true;
coordinator1.sendOperation(this, new OPhase2Tx(), (coordinator2, context1, response1, status1) -> {
if (context1.getResponses().size() >= context1.getQuorum() && status1 == ORequestContext.Status.STARTED) {
status1 = ORequestContext.Status.QUORUM_OK;
secondPhase = true;
member.reply(new OSubmitResponse() {
});
}
return status1;
});
}
return status;
});
} }


} }
@@ -0,0 +1,24 @@
package com.orientechnologies.orient.server.distributed.impl.coordinator.mocktx;

import com.orientechnologies.orient.server.distributed.impl.coordinator.*;

public class SecondPhaseResponseHandler implements OResponseHandler {
private final OSubmitTx submitTx;
private final ODistributedMember member;
boolean done = false;

public SecondPhaseResponseHandler(OSubmitTx submitTx, ODistributedMember member) {
this.member = member;
this.submitTx = submitTx;
}

@Override
public void receive(ODistributedCoordinator coordinator, ORequestContext context1, ONodeResponse response) {
if (context1.getResponses().size() >= context1.getQuorum() && !done) {
done = true;
submitTx.secondPhase = true;
member.reply(new OSubmitResponse() {
});
}
}
}

0 comments on commit 9244282

Please sign in to comment.