Skip to content

Commit

Permalink
first full implementation of push of storage configuration and schema
Browse files Browse the repository at this point in the history
  • Loading branch information
tglman committed Dec 29, 2017
1 parent e77072d commit 15d3464
Show file tree
Hide file tree
Showing 33 changed files with 951 additions and 336 deletions.
Original file line number Original file line Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ public interface OBinaryRequestExecutor {


OBinaryResponse executeSubscribe(OSubscribeRequest request); OBinaryResponse executeSubscribe(OSubscribeRequest request);


OBinaryResponse executeSubscribePushRequest(OSubscribeDistributedConfigurationRequest request); OBinaryResponse executeSubscribeDistributedConfiguration(OSubscribeDistributedConfigurationRequest request);


OBinaryResponse executeSubscribeLiveQuery(OSubscribeLiveQueryRequest request); OBinaryResponse executeSubscribeLiveQuery(OSubscribeLiveQueryRequest request);


Expand All @@ -126,4 +126,8 @@ public interface OBinaryRequestExecutor {
OBinaryResponse executeUnsubscribeLiveQuery(OUnsubscribeLiveQueryRequest request); OBinaryResponse executeUnsubscribeLiveQuery(OUnsubscribeLiveQueryRequest request);


OBinaryResponse executeDistributedConnect(ODistributedConnectRequest request); OBinaryResponse executeDistributedConnect(ODistributedConnectRequest request);

OBinaryResponse executeSubscribeStorageConfiguration(OSubscribeStorageConfigurationRequest request);

OBinaryResponse executeSubscribeSchemaConfiguration(OSubscribeSchemaRequest request);
} }
Original file line number Original file line Diff line number Diff line change
@@ -1,9 +1,7 @@
package com.orientechnologies.orient.client.remote; package com.orientechnologies.orient.client.remote;


import com.orientechnologies.orient.client.remote.message.OBinaryPushRequest; import com.orientechnologies.orient.client.remote.message.*;
import com.orientechnologies.orient.client.remote.message.OBinaryPushResponse; import com.orientechnologies.orient.client.remote.message.push.OStorageConfigurationPayload;
import com.orientechnologies.orient.client.remote.message.OLiveQueryPushRequest;
import com.orientechnologies.orient.client.remote.message.OPushDistributedConfigurationRequest;
import com.orientechnologies.orient.enterprise.channel.binary.OChannelBinary; import com.orientechnologies.orient.enterprise.channel.binary.OChannelBinary;


/** /**
Expand All @@ -17,11 +15,15 @@ public interface ORemotePushHandler {


OBinaryPushResponse executeUpdateDistributedConfig(OPushDistributedConfigurationRequest request); OBinaryPushResponse executeUpdateDistributedConfig(OPushDistributedConfigurationRequest request);


OBinaryPushResponse executeUpdateStorageConfig(OPushStorageConfigurationRequest request);

void executeLiveQueryPush(OLiveQueryPushRequest pushRequest); void executeLiveQueryPush(OLiveQueryPushRequest pushRequest);


void onPushReconnect(String host); void onPushReconnect(String host);


void onPushDisconnect(OChannelBinary network, Exception e); void onPushDisconnect(OChannelBinary network, Exception e);


void returnSocket(OChannelBinary network); void returnSocket(OChannelBinary network);

OBinaryPushResponse executeUpdateSchema(OPushSchemaRequest request);
} }
Original file line number Original file line Diff line number Diff line change
@@ -1,10 +1,8 @@
package com.orientechnologies.orient.client.remote; package com.orientechnologies.orient.client.remote;


import com.orientechnologies.orient.client.remote.message.OReloadResponse37; import com.orientechnologies.orient.client.remote.message.OReloadResponse37;
import com.orientechnologies.orient.core.config.OContextConfiguration; import com.orientechnologies.orient.client.remote.message.push.OStorageConfigurationPayload;
import com.orientechnologies.orient.core.config.OStorageClusterConfiguration; import com.orientechnologies.orient.core.config.*;
import com.orientechnologies.orient.core.config.OStorageConfiguration;
import com.orientechnologies.orient.core.config.OStorageEntryConfiguration;


import java.text.SimpleDateFormat; import java.text.SimpleDateFormat;
import java.util.*; import java.util.*;
Expand Down Expand Up @@ -36,34 +34,34 @@ public class OStorageConfigurationRemote implements OStorageConfiguration {
private List<OStorageClusterConfiguration> clusters; private List<OStorageClusterConfiguration> clusters;
private String networkRecordSerializer; private String networkRecordSerializer;


public OStorageConfigurationRemote(String networkRecordSerializer, OReloadResponse37 response, public OStorageConfigurationRemote(String networkRecordSerializer, OStorageConfigurationPayload payload ,
OContextConfiguration contextConfiguration) { OContextConfiguration contextConfiguration) {
this.networkRecordSerializer = networkRecordSerializer; this.networkRecordSerializer = networkRecordSerializer;
this.contextConfiguration = contextConfiguration; this.contextConfiguration = contextConfiguration;
this.dateFormat = response.getDateFormat(); this.dateFormat = payload.getDateFormat();
this.dateTimeFormat = response.getDateTimeFormat(); this.dateTimeFormat = payload.getDateTimeFormat();
this.name = response.getName(); this.name = payload.getName();
this.version = response.getVersion(); this.version = payload.getVersion();
this.directory = response.getDirectory(); this.directory = payload.getDirectory();
this.properties = new HashMap<>(); this.properties = new HashMap<>();
for (OStorageEntryConfiguration conf : response.getProperties()) { for (OStorageEntryConfiguration conf : payload.getProperties()) {
this.properties.put(conf.name, conf); this.properties.put(conf.name, conf);
} }
this.schemaRecordId = response.getSchemaRecordId().toString(); this.schemaRecordId = payload.getSchemaRecordId().toString();
this.indexMgrRecordId = response.getIndexMgrRecordId().toString(); this.indexMgrRecordId = payload.getIndexMgrRecordId().toString();
this.clusterSelection = response.getClusterSelection(); this.clusterSelection = payload.getClusterSelection();
this.conflictStrategy = response.getConflictStrategy(); this.conflictStrategy = payload.getConflictStrategy();
this.validationEnabled = response.isValidationEnabled(); this.validationEnabled = payload.isValidationEnabled();
this.localeLanguage = response.getLocaleLanguage(); this.localeLanguage = payload.getLocaleLanguage();
this.minimumClusters = response.getMinimumClusters(); this.minimumClusters = payload.getMinimumClusters();
this.strictSql = response.isStrictSql(); this.strictSql = payload.isStrictSql();
this.charset = response.getCharset(); this.charset = payload.getCharset();
this.timeZone = response.getTimeZone(); this.timeZone = payload.getTimeZone();
this.localeCountry = response.getLocaleCountry(); this.localeCountry = payload.getLocaleCountry();
this.recordSerializer = response.getRecordSerializer(); this.recordSerializer = payload.getRecordSerializer();
this.recordSerializerVersion = response.getRecordSerializerVersion(); this.recordSerializerVersion = payload.getRecordSerializerVersion();
this.binaryFormatVersion = response.getBinaryFormatVersion(); this.binaryFormatVersion = payload.getBinaryFormatVersion();
this.clusters = response.getClusters(); this.clusters = payload.getClusters();
} }


@Override @Override
Expand Down
Original file line number Original file line Diff line number Diff line change
Expand Up @@ -423,7 +423,7 @@ public OSBTreeCollectionManager getSBtreeCollectionManager() {
public void reload() { public void reload() {
OReloadResponse37 res = networkOperation(new OReloadRequest37(), "error loading storage configuration"); OReloadResponse37 res = networkOperation(new OReloadRequest37(), "error loading storage configuration");
final OStorageConfiguration storageConfiguration = new OStorageConfigurationRemote( final OStorageConfiguration storageConfiguration = new OStorageConfigurationRemote(
ORecordSerializerFactory.instance().getDefaultRecordSerializer().toString(), res, clientConfiguration); ORecordSerializerFactory.instance().getDefaultRecordSerializer().toString(), res.getPayload(), clientConfiguration);


updateStorageConfiguration(storageConfiguration); updateStorageConfiguration(storageConfiguration);
} }
Expand Down Expand Up @@ -1398,6 +1398,7 @@ private void initPush(OStorageRemoteSession session) {
pushThread.start(); pushThread.start();
subscribeStorageConfiguration(session); subscribeStorageConfiguration(session);
subscribeDistributedConfiguration(session); subscribeDistributedConfiguration(session);
subscribeSchema(session);


} }
} finally { } finally {
Expand All @@ -1411,7 +1412,11 @@ private void subscribeDistributedConfiguration(OStorageRemoteSession nodeSession
} }


private void subscribeStorageConfiguration(OStorageRemoteSession nodeSession) { private void subscribeStorageConfiguration(OStorageRemoteSession nodeSession) {
//TODO pushThread.subscribe(new OSubscribeStorageConfigurationRequest(), nodeSession);
}

private void subscribeSchema(OStorageRemoteSession nodeSession) {
pushThread.subscribe(new OSubscribeSchemaRequest(), nodeSession);
} }


protected void openRemoteDatabase(String currentURL) { protected void openRemoteDatabase(String currentURL) {
Expand Down Expand Up @@ -1883,9 +1888,11 @@ public OBinaryPushRequest createPush(byte type) {
return new OPushDistributedConfigurationRequest(); return new OPushDistributedConfigurationRequest();
case OChannelBinaryProtocol.REQUEST_PUSH_LIVE_QUERY: case OChannelBinaryProtocol.REQUEST_PUSH_LIVE_QUERY:
return new OLiveQueryPushRequest(); return new OLiveQueryPushRequest();
// case OChannelBinaryProtocol.REQUEST_PUSH_STORAGE_CONFIG: case OChannelBinaryProtocol.REQUEST_PUSH_STORAGE_CONFIG:
// return new OPushStorageConfigurationRequest();
// return new case OChannelBinaryProtocol.REQUEST_PUSH_SCHEMA:
return new OPushSchemaRequest();

} }
return null; return null;
} }
Expand All @@ -1896,6 +1903,21 @@ public OBinaryPushResponse executeUpdateDistributedConfig(OPushDistributedConfig
return null; return null;
} }


@Override
public OBinaryPushResponse executeUpdateStorageConfig(OPushStorageConfigurationRequest payload) {
final OStorageConfiguration storageConfiguration = new OStorageConfigurationRemote(
ORecordSerializerFactory.instance().getDefaultRecordSerializer().toString(), payload.getPayload(), clientConfiguration);

updateStorageConfiguration(storageConfiguration);
return null;
}

@Override
public OBinaryPushResponse executeUpdateSchema(OPushSchemaRequest request) {
ODatabaseDocumentRemote.updateSchema(this, request.getSchema());
return null;
}

public OLiveQueryMonitor liveQuery(ODatabaseDocumentRemote database, String query, OLiveQueryClientListener listener, public OLiveQueryMonitor liveQuery(ODatabaseDocumentRemote database, String query, OLiveQueryClientListener listener,
Object[] params) { Object[] params) {


Expand Down
Original file line number Original file line Diff line number Diff line change
@@ -0,0 +1,57 @@
package com.orientechnologies.orient.client.remote.message;

import com.orientechnologies.orient.client.remote.ORemotePushHandler;
import com.orientechnologies.orient.client.remote.message.push.OStorageConfigurationPayload;
import com.orientechnologies.orient.core.config.OStorageConfiguration;
import com.orientechnologies.orient.core.record.impl.ODocument;
import com.orientechnologies.orient.core.serialization.serializer.record.binary.ORecordSerializerNetworkV37;
import com.orientechnologies.orient.enterprise.channel.binary.OChannelDataInput;
import com.orientechnologies.orient.enterprise.channel.binary.OChannelDataOutput;

import java.io.IOException;

import static com.orientechnologies.orient.enterprise.channel.binary.OChannelBinaryProtocol.REQUEST_PUSH_SCHEMA;
import static com.orientechnologies.orient.enterprise.channel.binary.OChannelBinaryProtocol.REQUEST_PUSH_STORAGE_CONFIG;

public class OPushSchemaRequest implements OBinaryPushRequest<OBinaryPushResponse> {

private ODocument schema;

public OPushSchemaRequest() {

}

public OPushSchemaRequest(ODocument schema) {
this.schema = schema;
}

@Override
public void write(OChannelDataOutput channel) throws IOException {
channel.writeBytes(ORecordSerializerNetworkV37.INSTANCE.toStream(schema, false));
}

@Override
public void read(OChannelDataInput network) throws IOException {
byte[] bytes = network.readBytes();
this.schema = (ODocument) ORecordSerializerNetworkV37.INSTANCE.fromStream(bytes, null, null);
}

@Override
public OBinaryPushResponse execute(ORemotePushHandler pushHandler) {
return pushHandler.executeUpdateSchema(this);
}

@Override
public OBinaryPushResponse createResponse() {
return null;
}

@Override
public byte getPushCommand() {
return REQUEST_PUSH_SCHEMA;
}

public ODocument getSchema() {
return schema;
}
}
Original file line number Original file line Diff line number Diff line change
@@ -0,0 +1,56 @@
package com.orientechnologies.orient.client.remote.message;

import com.orientechnologies.orient.client.remote.ORemotePushHandler;
import com.orientechnologies.orient.client.remote.message.OBinaryPushRequest;
import com.orientechnologies.orient.client.remote.message.OBinaryPushResponse;
import com.orientechnologies.orient.client.remote.message.OPushDistributedConfigurationRequest;
import com.orientechnologies.orient.client.remote.message.push.OStorageConfigurationPayload;
import com.orientechnologies.orient.core.config.OStorageConfiguration;
import com.orientechnologies.orient.enterprise.channel.binary.OChannelDataInput;
import com.orientechnologies.orient.enterprise.channel.binary.OChannelDataOutput;

import java.io.IOException;

import static com.orientechnologies.orient.enterprise.channel.binary.OChannelBinaryProtocol.REQUEST_PUSH_STORAGE_CONFIG;

public class OPushStorageConfigurationRequest implements OBinaryPushRequest<OBinaryPushResponse> {

private OStorageConfigurationPayload payload;

public OPushStorageConfigurationRequest() {
payload = new OStorageConfigurationPayload();
}

public OPushStorageConfigurationRequest(OStorageConfiguration configuration) {
payload = new OStorageConfigurationPayload(configuration);
}

@Override
public void write(OChannelDataOutput channel) throws IOException {
payload.write(channel);
}

@Override
public void read(OChannelDataInput network) throws IOException {
payload.read(network);
}

@Override
public OBinaryPushResponse execute(ORemotePushHandler pushHandler) {
return pushHandler.executeUpdateStorageConfig(this);
}

@Override
public OBinaryPushResponse createResponse() {
return null;
}

@Override
public byte getPushCommand() {
return REQUEST_PUSH_STORAGE_CONFIG;
}

public OStorageConfigurationPayload getPayload() {
return payload;
}
}
Loading

0 comments on commit 15d3464

Please sign in to comment.