Skip to content

Commit

Permalink
started integration of new coordinator in the network layer
Browse files Browse the repository at this point in the history
  • Loading branch information
tglman committed Sep 6, 2018
1 parent 1151f8d commit 0802568
Show file tree
Hide file tree
Showing 23 changed files with 594 additions and 8 deletions.
Expand Up @@ -102,6 +102,10 @@ public class OChannelBinaryProtocol {
public static final byte SUBSCRIBE_PUSH = 100;
public static final byte UNSUBSCRIBE_PUSH = 101;
public static final byte EXPERIMENTAL = 102;
public static final byte DISTRIBUTED_SUBMIT_REQUEST = 103;
public static final byte DISTRIBUTED_SUBMIT_RESPONSE = 104;
public static final byte DISTRIBUTED_OPERATION_REQUEST = 105;
public static final byte DISTRIBUTED_OPERATION_RESPONSE = 106;

// REMOTE SB-TREE COLLECTIONS
public static final byte REQUEST_CREATE_SBTREE_BONSAI = 110;
Expand Down
@@ -1,9 +1,33 @@
package com.orientechnologies.orient.server.distributed.impl.coordinator;

public interface ODistributedChannel {
/**
* Send submit request, that should go only to coordinator.
*
* @param request the request
*/
void submit(OSubmitRequest request);

/**
* Send submit response, this is sent from the coordinator to the node that sent a submit request.
*
* @param response
*/
void reply(OSubmitResponse response);

/**
* Send an operation to the node this is used by the coordinator to send operations of the distributed flow.
*
* @param id
* @param nodeRequest
*/
void sendRequest(OLogId id, ONodeRequest nodeRequest);

/**
* Send the response back to the coordinator, this is used by nodes to send a reply to a node request.
*
* @param id
* @param nodeResponse
*/
void sendResponse(OLogId id, ONodeResponse nodeResponse);

void reply(OSubmitResponse response);
}
@@ -1,9 +1,19 @@
package com.orientechnologies.orient.server.distributed.impl.coordinator;

import java.io.*;

public class OLogId {
private long id;

public OLogId(long id) {
this.id = id;
}

public static void serialize(OLogId id, DataOutput output) throws IOException {
output.writeLong(id.id);
}

public static OLogId deserialize(DataInput input) throws IOException {
return new OLogId(input.readLong());
}
}
@@ -0,0 +1,6 @@
package com.orientechnologies.orient.server.distributed.impl.coordinator;

public interface OSubmitContext {

void receive(OSubmitResponse response);
}
@@ -1,4 +1,9 @@
package com.orientechnologies.orient.server.distributed.impl.coordinator;

import java.io.*;

public interface OSubmitResponse {
void serialize(DataOutput output) throws IOException;

void deserialize(DataInput input) throws IOException;
}
@@ -0,0 +1,12 @@
package com.orientechnologies.orient.server.distributed.impl.coordinator.network;

public interface OCoordinatedExecutor {

void executeOperationRequest(OOperationRequest request);

void executeOperationResponse(OOperationResponse response);

void executeSubmitResponse(ONetworkSubmitResponse response);

void executeSubmitRequest(ONetworkSubmitRequest request);
}
@@ -0,0 +1,39 @@
package com.orientechnologies.orient.server.distributed.impl.coordinator.network;

import com.orientechnologies.orient.server.distributed.ORemoteServerController;
import com.orientechnologies.orient.server.distributed.impl.coordinator.*;
import com.orientechnologies.orient.server.distributed.impl.coordinator.network.ONetworkSubmitRequest;
import com.orientechnologies.orient.server.distributed.impl.coordinator.network.ONetworkSubmitResponse;
import com.orientechnologies.orient.server.distributed.impl.coordinator.network.OOperationRequest;
import com.orientechnologies.orient.server.distributed.impl.coordinator.network.OOperationResponse;

public class ODistributedChannelBinaryProtocol implements ODistributedChannel {

private ORemoteServerController controller;

public ODistributedChannelBinaryProtocol(ORemoteServerController controller) {
this.controller = controller;
}

@Override
public void sendRequest(OLogId id, ONodeRequest nodeRequest) {
controller.sendBinaryRequest(new OOperationRequest(id, nodeRequest));
}

@Override
public void sendResponse(OLogId id, ONodeResponse nodeResponse) {
controller.sendBinaryRequest(new OOperationResponse(id, nodeResponse));

}

@Override
public void submit(OSubmitRequest request) {
controller.sendBinaryRequest(new ONetworkSubmitRequest(request));
}

@Override
public void reply(OSubmitResponse response) {
controller.sendBinaryRequest(new ONetworkSubmitResponse(response));
}

}
@@ -0,0 +1,6 @@
package com.orientechnologies.orient.server.distributed.impl.coordinator.network;

public interface ODistributedExecutable {

void executeDistributed(OCoordinatedExecutor executor);
}
@@ -0,0 +1,73 @@
package com.orientechnologies.orient.server.distributed.impl.coordinator.network;

import com.orientechnologies.orient.client.binary.OBinaryRequestExecutor;
import com.orientechnologies.orient.client.remote.OBinaryRequest;
import com.orientechnologies.orient.client.remote.OBinaryResponse;
import com.orientechnologies.orient.client.remote.OStorageRemoteSession;
import com.orientechnologies.orient.core.serialization.serializer.record.ORecordSerializer;
import com.orientechnologies.orient.enterprise.channel.binary.OChannelDataInput;
import com.orientechnologies.orient.enterprise.channel.binary.OChannelDataOutput;
import com.orientechnologies.orient.server.distributed.impl.coordinator.OSubmitRequest;

import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.IOException;

import static com.orientechnologies.orient.enterprise.channel.binary.OChannelBinaryProtocol.DISTRIBUTED_SUBMIT_REQUEST;

public class ONetworkSubmitRequest implements OBinaryRequest, ODistributedExecutable {
private OSubmitRequest request;

public ONetworkSubmitRequest(OSubmitRequest request) {
this.request = request;
}

public ONetworkSubmitRequest() {

}

@Override
public void write(OChannelDataOutput network, OStorageRemoteSession session) throws IOException {
request.serialize(new DataOutputStream(network.getDataOutput()));
}

@Override
public void read(OChannelDataInput channel, int protocolVersion, ORecordSerializer serializer) throws IOException {
request = null;// TODO: create instance from a ID.
request.deserialize(new DataInputStream(channel.getDataInput()));
}

@Override
public byte getCommand() {
return DISTRIBUTED_SUBMIT_REQUEST;
}

@Override
public OBinaryResponse createResponse() {
return null;
}

@Override
public OBinaryResponse execute(OBinaryRequestExecutor executor) {
return null;
}

@Override
public String getDescription() {
return "Execution request to the coordinator";
}

@Override
public boolean requireDatabaseSession() {
return false;
}

@Override
public void executeDistributed(OCoordinatedExecutor executor) {
executor.executeSubmitRequest(this);
}

public OSubmitRequest getRequest() {
return request;
}
}
@@ -0,0 +1,71 @@
package com.orientechnologies.orient.server.distributed.impl.coordinator.network;

import com.orientechnologies.orient.client.binary.OBinaryRequestExecutor;
import com.orientechnologies.orient.client.remote.OBinaryRequest;
import com.orientechnologies.orient.client.remote.OBinaryResponse;
import com.orientechnologies.orient.client.remote.OStorageRemoteSession;
import com.orientechnologies.orient.core.serialization.serializer.record.ORecordSerializer;
import com.orientechnologies.orient.enterprise.channel.binary.OChannelDataInput;
import com.orientechnologies.orient.enterprise.channel.binary.OChannelDataOutput;
import com.orientechnologies.orient.server.distributed.impl.coordinator.OSubmitResponse;

import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.IOException;

import static com.orientechnologies.orient.enterprise.channel.binary.OChannelBinaryProtocol.DISTRIBUTED_SUBMIT_RESPONSE;

public class ONetworkSubmitResponse implements OBinaryRequest,ODistributedExecutable {
private OSubmitResponse response;

public ONetworkSubmitResponse() {
}

public ONetworkSubmitResponse(OSubmitResponse response) {
this.response = response;
}

@Override
public void write(OChannelDataOutput network, OStorageRemoteSession session) throws IOException {
response.serialize(new DataOutputStream(network.getDataOutput()));
}

@Override
public void read(OChannelDataInput channel, int protocolVersion, ORecordSerializer serializer) throws IOException {
response = null;//TODO: create instance from factory.
response.deserialize(new DataInputStream(channel.getDataInput()));
}

@Override
public byte getCommand() {
return DISTRIBUTED_SUBMIT_RESPONSE;
}

@Override
public OBinaryResponse createResponse() {
return null;
}

@Override
public OBinaryResponse execute(OBinaryRequestExecutor executor) {
return null;
}

@Override
public String getDescription() {
return "execution response from coordinator";
}

public boolean requireDatabaseSession() {
return false;
}

@Override
public void executeDistributed(OCoordinatedExecutor executor) {
executor.executeSubmitResponse(this);
}

public OSubmitResponse getResponse() {
return response;
}
}
@@ -0,0 +1,85 @@
package com.orientechnologies.orient.server.distributed.impl.coordinator.network;

import com.orientechnologies.orient.client.binary.OBinaryRequestExecutor;
import com.orientechnologies.orient.client.remote.OBinaryRequest;
import com.orientechnologies.orient.client.remote.OBinaryResponse;
import com.orientechnologies.orient.client.remote.OStorageRemoteSession;
import com.orientechnologies.orient.core.serialization.serializer.record.ORecordSerializer;
import com.orientechnologies.orient.enterprise.channel.binary.OChannelDataInput;
import com.orientechnologies.orient.enterprise.channel.binary.OChannelDataOutput;
import com.orientechnologies.orient.server.distributed.impl.coordinator.OLogId;
import com.orientechnologies.orient.server.distributed.impl.coordinator.ONodeRequest;

import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.IOException;

import static com.orientechnologies.orient.enterprise.channel.binary.OChannelBinaryProtocol.DISTRIBUTED_OPERATION_REQUEST;

public class OOperationRequest implements OBinaryRequest, ODistributedExecutable {
private OLogId id;
private ONodeRequest request;

public OOperationRequest(OLogId id, ONodeRequest request) {
this.id = id;
this.request = request;
}

public OOperationRequest() {

}

@Override
public void write(OChannelDataOutput network, OStorageRemoteSession session) throws IOException {
DataOutputStream output = new DataOutputStream(network.getDataOutput());
OLogId.serialize(id, output);
//TODO: write request kind/id.
request.serialize(output);
}

@Override
public void read(OChannelDataInput channel, int protocolVersion, ORecordSerializer serializer) throws IOException {
DataInputStream input = new DataInputStream(channel.getDataInput());
id = OLogId.deserialize(input);
request = null;// TODO: read the type id and create the instance.
request.deserialize(input);
}

@Override
public byte getCommand() {
return DISTRIBUTED_OPERATION_REQUEST;
}

@Override
public OBinaryResponse createResponse() {
return null;
}

@Override
public OBinaryResponse execute(OBinaryRequestExecutor executor) {
return null;
}

@Override
public String getDescription() {
return "Distributed Operation Request/Response";
}

@Override
public boolean requireDatabaseSession() {
return false;
}

@Override
public void executeDistributed(OCoordinatedExecutor executor) {
executor.executeOperationRequest(this);
}

public OLogId getId() {
return id;
}

public ONodeRequest getRequest() {
return request;
}
}

0 comments on commit 0802568

Please sign in to comment.