Skip to content

Commit

Permalink
sqeuences on distributed sceleton finished
Browse files Browse the repository at this point in the history
  • Loading branch information
markodjurovic committed Oct 1, 2018
1 parent 6bc21fc commit 76b7c14
Show file tree
Hide file tree
Showing 7 changed files with 272 additions and 86 deletions.
Expand Up @@ -21,6 +21,7 @@
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.nio.charset.StandardCharsets;

/**
*
Expand Down Expand Up @@ -123,44 +124,53 @@ private Boolean deserializeBoolean(DataInput in) throws IOException{
}

public void serialize(DataOutput out) throws IOException{
out.writeInt(action.getActionType());
byte[] sequenceNameBytes = action.getSequenceName().getBytes("UTF8");
out.writeInt(sequenceNameBytes.length);
out.write(sequenceNameBytes);
out.writeByte(action.getSequenceType().getVal());

OSequence.CreateParams params = action.getParameters();
serializeLong(params.start, out);
serializeInt(params.increment, out);
serializeInt(params.cacheSize, out);
serializeLong(params.limitValue, out);
serializeOrderType(params.orderType, out);
serializeBoolean(params.recyclable, out);
serializeBoolean(params.turnLimitOff, out);
if (action != null){
out.writeInt(action.getActionType());
byte[] sequenceNameBytes = action.getSequenceName().getBytes(StandardCharsets.UTF_8.name());
out.writeInt(sequenceNameBytes.length);
out.write(sequenceNameBytes);
out.writeByte(action.getSequenceType().getVal());

OSequence.CreateParams params = action.getParameters();
serializeLong(params.start, out);
serializeInt(params.increment, out);
serializeInt(params.cacheSize, out);
serializeLong(params.limitValue, out);
serializeOrderType(params.orderType, out);
serializeBoolean(params.recyclable, out);
serializeBoolean(params.turnLimitOff, out);
}
else{
out.writeInt(-1);
}
}

public void deserialize(DataInput in) throws IOException{
int actionType = in.readInt();
int nameLength = in.readInt();
byte[] nameBytes = new byte[nameLength];
in.readFully(nameBytes);
byte sequenceTypeByte = in.readByte();
OSequence.SEQUENCE_TYPE sequenceType = OSequence.SEQUENCE_TYPE.fromVal(sequenceTypeByte);
if (sequenceType == null){
//TODO throw some other exception;
throw new IOException("Inavlid sequnce type value: " + sequenceTypeByte);
}
String sequenceName = new String(nameBytes, "UTF8");
OSequence.CreateParams params = new OSequence.CreateParams();
params.resetNull();
params.start = deserializeLong(in);
params.increment = deserializeInt(in);
params.cacheSize = deserializeInt(in);
params.limitValue = deserializeLong(in);
params.orderType = deserializeOrderType(in);
params.recyclable = deserializeBoolean(in);
params.turnLimitOff = deserializeBoolean(in);
action = new OSequenceAction(actionType, sequenceName, params, sequenceType);
if (actionType > 0){
int nameLength = in.readInt();
byte[] nameBytes = new byte[nameLength];
in.readFully(nameBytes);
byte sequenceTypeByte = in.readByte();
OSequence.SEQUENCE_TYPE sequenceType = OSequence.SEQUENCE_TYPE.fromVal(sequenceTypeByte);
if (sequenceType == null){
throw new IOException("Inavlid sequnce type value: " + sequenceTypeByte);
}
String sequenceName = new String(nameBytes, StandardCharsets.UTF_8.name());
OSequence.CreateParams params = new OSequence.CreateParams();
params.resetNull();
params.start = deserializeLong(in);
params.increment = deserializeInt(in);
params.cacheSize = deserializeInt(in);
params.limitValue = deserializeLong(in);
params.orderType = deserializeOrderType(in);
params.recyclable = deserializeBoolean(in);
params.turnLimitOff = deserializeBoolean(in);
action = new OSequenceAction(actionType, sequenceName, params, sequenceType);
}
else {
action = null;
}
}

public OSequenceAction getAction() {
Expand Down
Expand Up @@ -9,17 +9,19 @@ public class OCoordinateMessagesFactory {
public static final int TRANSACTION_FIRST_PHASE_RESPONSE = 1;
public static final int TRANSACTION_SECOND_PHASE_REQUEST = 2;
public static final int TRANSACTION_SECOND_PHASE_RESPONSE = 2;
public static final int SEQUENCE_ACTIONS_SUBMIT_REQUEST = 3;
public static final int SEQUENCE_ACTIONS_SUBMIT_RESPONSE = 3;
public static final int SEQUENCE_ACTION_REQUEST = 4;
public static final int SEQUENCE_ACTION_RESPONSE = 4;
public static final int SEQUENCE_ACTION_COORDINATOR_SUBMIT = 3;
public static final int SEQUENCE_ACTION_COORDINATOR_RESPONSE = 3;
public static final int SEQUENCE_ACTION_NODE_REQUEST = 4;
public static final int SEQUENCE_ACTION_NODE_RESPONSE = 4;

public ONodeResponse createOperationResponse(int responseType) {
switch (responseType) {
case TRANSACTION_FIRST_PHASE_RESPONSE:
return new OTransactionFirstPhaseResult();
case TRANSACTION_SECOND_PHASE_RESPONSE:
return new OTransactionSecondPhaseResponse();
case SEQUENCE_ACTION_NODE_RESPONSE:
return new OSequenceActionNodeResponse();
}
return null;
}
Expand All @@ -30,7 +32,8 @@ public ONodeRequest createOperationRequest(int requestType) {
return new OTransactionFirstPhaseOperation();
case TRANSACTION_SECOND_PHASE_REQUEST:
return new OTransactionSecondPhaseOperation();

case SEQUENCE_ACTION_NODE_REQUEST:
return new OSequenceActionNodeRequest();
}
return null;
}
Expand All @@ -39,6 +42,8 @@ public OSubmitRequest createSubmitRequest(int requestType) {
switch (requestType) {
case TRANSACTION_SUBMIT_REQUEST:
return new OTransactionSubmit();
case SEQUENCE_ACTION_COORDINATOR_SUBMIT:
return new OSequenceActionCoordinatorSubmit();
}
return null;
}
Expand All @@ -47,6 +52,8 @@ public OSubmitResponse createSubmitResponse(int responseType) {
switch (responseType) {
case TRANSACTION_SUBMIT_RESPONSE:
return new OTransactionResponse();
case SEQUENCE_ACTION_COORDINATOR_RESPONSE:
return new OSequenceActionCoordinatorResponse();
}
return null;
}
Expand Down
Expand Up @@ -15,30 +15,45 @@
*/
package com.orientechnologies.orient.server.distributed.impl.coordinator.transaction;

import com.orientechnologies.orient.server.distributed.impl.coordinator.ONodeResponse;
import com.orientechnologies.orient.server.distributed.impl.coordinator.OCoordinateMessagesFactory;
import com.orientechnologies.orient.server.distributed.impl.coordinator.OSubmitResponse;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;

/**
*
* @author marko
* @author mdjurovi
*/
public class OSequenceActionsNodeResponse implements ONodeResponse{

public class OSequenceActionCoordinatorResponse implements OSubmitResponse{

private int failedOn = 0;
private int limitReachedOn = 0;

public OSequenceActionCoordinatorResponse(){

}

public OSequenceActionCoordinatorResponse(int failedOnNo, int limitReachedOnNo){
failedOn = failedOnNo;
limitReachedOn = limitReachedOnNo;
}

@Override
public void serialize(DataOutput output) throws IOException {
throw new UnsupportedOperationException("Not supported yet."); //To change body of generated methods, choose Tools | Templates.
output.writeInt(failedOn);
output.writeInt(limitReachedOn);
}

@Override
public void deserialize(DataInput input) throws IOException {
throw new UnsupportedOperationException("Not supported yet."); //To change body of generated methods, choose Tools | Templates.
failedOn = input.readInt();
limitReachedOn = input.readInt();
}

@Override
public int getResponseType() {
throw new UnsupportedOperationException("Not supported yet."); //To change body of generated methods, choose Tools | Templates.
return OCoordinateMessagesFactory.SEQUENCE_ACTION_COORDINATOR_RESPONSE;
}

}
Expand Up @@ -24,59 +24,55 @@
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;

/**
*
* @author marko
*/
public class OSequenceActionsSubmit implements OSubmitRequest{
public class OSequenceActionCoordinatorSubmit implements OSubmitRequest{

private List<OSequenceActionRequest> actions = null;
private OSequenceActionRequest action = null;

public OSequenceActionsSubmit(){
public OSequenceActionCoordinatorSubmit(){

}

public OSequenceActionsSubmit(List<OSequenceAction> actions){
this.actions = new ArrayList<>();
for (OSequenceAction action : actions){
OSequenceActionRequest actionrequest = new OSequenceActionRequest(action);
this.actions.add(actionrequest);
}
public OSequenceActionCoordinatorSubmit(OSequenceAction action){
this.action = new OSequenceActionRequest(action);
}

@Override
public void begin(ODistributedMember member, OSessionOperationId operationId, ODistributedCoordinator coordinator) {
OSequenceActionsNodeRequest nodeRequest = new OSequenceActionsNodeRequest();
OSequenceActionsNodeResponseHandler nodeResponseHandler = new OSequenceActionsNodeResponseHandler();
OSequenceActionNodeRequest nodeRequest = new OSequenceActionNodeRequest();
OSequenceActionNodeResponseHandler nodeResponseHandler = new OSequenceActionNodeResponseHandler(operationId);

coordinator.sendOperation(this, nodeRequest, nodeResponseHandler);
}

@Override
public void serialize(DataOutput output) throws IOException {
output.writeInt(actions.size());
for (OSequenceActionRequest actionsRequest : actions){
actionsRequest.serialize(output);
if (action != null){
output.writeByte(1);
action.serialize(output);
}
else{
output.writeByte(0);
}
}

@Override
public void deserialize(DataInput input) throws IOException {
this.actions = new ArrayList<>();
int size = input.readInt();
for (int i = 0; i < size; i++){
OSequenceActionRequest actionrequest = new OSequenceActionRequest();
actionrequest.deserialize(input);
actions.add(actionrequest);
byte flag = input.readByte();
action = null;
if (flag > 0){
action = new OSequenceActionRequest();
action.deserialize(input);
}
}

@Override
public int getRequestType() {
return OCoordinateMessagesFactory.SEQUENCE_ACTIONS_SUBMIT_REQUEST;
return OCoordinateMessagesFactory.SEQUENCE_ACTION_COORDINATOR_SUBMIT;
}

}
Expand Up @@ -20,6 +20,7 @@
import com.orientechnologies.orient.core.metadata.sequence.OSequence;
import com.orientechnologies.orient.core.metadata.sequence.OSequenceAction;
import com.orientechnologies.orient.core.metadata.sequence.OSequenceLibrary;
import com.orientechnologies.orient.core.metadata.sequence.OSequenceLimitReachedException;
import com.orientechnologies.orient.server.distributed.impl.ODatabaseDocumentDistributed;
import com.orientechnologies.orient.server.distributed.impl.coordinator.OCoordinateMessagesFactory;
import com.orientechnologies.orient.server.distributed.impl.coordinator.ODistributedExecutor;
Expand All @@ -30,27 +31,26 @@
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.util.List;

/**
*
* @author marko
*/
public class OSequenceActionsNodeRequest implements ONodeRequest{
public class OSequenceActionNodeRequest implements ONodeRequest{

List<OSequenceActionRequest> actions;
OSequenceActionRequest actionRequest;

public OSequenceActionsNodeRequest(){
public OSequenceActionNodeRequest(){

}

public OSequenceActionsNodeRequest(List<OSequenceActionRequest> actions){
this.actions = actions;
public OSequenceActionNodeRequest(OSequenceActionRequest action){
actionRequest = action;
}

@Override
public ONodeResponse execute(ODistributedMember nodeFrom, OLogId opId, ODistributedExecutor executor, ODatabaseDocumentInternal session) {
for (OSequenceActionRequest actionRequest : actions){
public ONodeResponse execute(ODistributedMember nodeFrom, OLogId opId, ODistributedExecutor executor, ODatabaseDocumentInternal session) {
try{
OSequenceAction action = actionRequest.getAction();
ODatabaseDocumentDistributed db = (ODatabaseDocumentDistributed) session;
OSequenceLibrary sequences = db.getMetadata().getSequenceLibrary();
Expand All @@ -60,14 +60,14 @@ public ONodeResponse execute(ODistributedMember nodeFrom, OLogId opId, ODistribu
if (actionType != OSequenceAction.CREATE){
targetSequence = sequences.getSequence(sequenceName);
if (targetSequence == null){
//TODO throw some exception
throw new RuntimeException("Sequence with name: " + sequenceName + " doesn't exists");
}
}
switch (actionType){
case OSequenceAction.CREATE:
OSequence sequence = sequences.createSequence(sequenceName, action.getSequenceType(), action.getParameters());
if (sequence == null){
//TODO throw some exception
throw new RuntimeException("Faled to create sequence: " + sequenceName);
}
break;
case OSequenceAction.REMOVE:
Expand All @@ -85,23 +85,43 @@ public ONodeResponse execute(ODistributedMember nodeFrom, OLogId opId, ODistribu
case OSequenceAction.UPDATE:
targetSequence.updateParams(action.getParameters());
break;
}
}
return new OSequenceActionNodeResponse(OSequenceActionNodeResponse.Type.SUCCESS, null);
}
catch (OSequenceLimitReachedException e){
return new OSequenceActionNodeResponse(OSequenceActionNodeResponse.Type.LIMIT_REACHED, null);
}
catch (RuntimeException exc){
return new OSequenceActionNodeResponse(OSequenceActionNodeResponse.Type.ERROR, exc.getMessage());
}
}

@Override
public void serialize(DataOutput output) throws IOException {
throw new UnsupportedOperationException("Not supported yet."); //To change body of generated methods, choose Tools | Templates.
if (actionRequest != null){
output.writeByte(1);
actionRequest.serialize(output);
}
else{
output.write(0);
}
}

@Override
public void deserialize(DataInput input) throws IOException {
throw new UnsupportedOperationException("Not supported yet."); //To change body of generated methods, choose Tools | Templates.
byte flag = input.readByte();
if (flag > 0){
actionRequest = new OSequenceActionRequest();
actionRequest.deserialize(input);
}
else{
actionRequest = null;
}
}

@Override
public int getRequestType() {
return OCoordinateMessagesFactory.SEQUENCE_ACTION_REQUEST;
return OCoordinateMessagesFactory.SEQUENCE_ACTION_NODE_REQUEST;
}

}

0 comments on commit 76b7c14

Please sign in to comment.