From aee8b4aa50c8d6e26d85cc0086a56ddd66e76b8e Mon Sep 17 00:00:00 2001 From: lvca Date: Mon, 29 Feb 2016 20:19:35 +0100 Subject: [PATCH] First draft of management of distributed msgs by using binary protocol instead of HZ --- .../orient/client/remote/OServerAdmin.java | 57 +-- .../orient/client/remote/OStorageRemote.java | 48 +-- .../binary/OChannelBinaryProtocol.java | 188 +++++----- .../server/hazelcast/ODistributedWorker.java | 343 ------------------ .../OHazelcastDistributedDatabase.java | 193 ++++------ .../OHazelcastDistributedMessageService.java | 72 +--- .../OHazelcastDistributedRequest.java | 27 +- .../OHazelcastDistributedResponse.java | 9 +- .../server/hazelcast/OHazelcastPlugin.java | 81 ++++- .../OAsynchDistributedOperation.java | 20 +- .../ODistributedMessageService.java | 2 + .../distributed/ODistributedRequest.java | 44 +-- .../distributed/ODistributedResponse.java | 4 +- .../ODistributedResponseManager.java | 17 +- .../ODistributedServerManager.java | 17 +- .../distributed/ODistributedStorage.java | 2 +- .../distributed/ORemoteServerController.java | 103 ++++++ .../distributed/task/OAbstractRemoteTask.java | 19 +- .../task/OAbstractReplicatedTask.java | 4 +- .../distributed/task/OCompletedTxTask.java | 28 +- .../task/OCopyDatabaseChunkTask.java | 25 +- .../distributed/task/OCreateRecordTask.java | 13 +- .../distributed/task/ODeleteRecordTask.java | 33 +- .../server/distributed/task/OFixTxTask.java | 43 ++- .../task/OReadRecordIfNotLatestTask.java | 7 + .../distributed/task/OReadRecordTask.java | 18 +- .../server/distributed/task/ORemoteTask.java | 62 ++++ .../distributed/task/ORemoteTaskFactory.java | 81 +++++ .../distributed/task/ORestartNodeTask.java | 17 +- .../task/OResurrectRecordTask.java | 15 +- .../distributed/task/OSQLCommandTask.java | 6 + .../server/distributed/task/OScriptTask.java | 17 +- .../distributed/task/OSyncClusterTask.java | 18 +- .../task/OSyncDatabaseDeltaTask.java | 17 +- .../distributed/task/OSyncDatabaseTask.java | 32 +- .../server/distributed/task/OTxTask.java | 37 +- .../distributed/task/OUpdateRecordTask.java | 12 +- .../binary/ONetworkProtocolBinary.java | 180 ++++++--- 38 files changed, 960 insertions(+), 951 deletions(-) delete mode 100755 distributed/src/main/java/com/orientechnologies/orient/server/hazelcast/ODistributedWorker.java create mode 100644 server/src/main/java/com/orientechnologies/orient/server/distributed/ORemoteServerController.java create mode 100644 server/src/main/java/com/orientechnologies/orient/server/distributed/task/ORemoteTask.java create mode 100644 server/src/main/java/com/orientechnologies/orient/server/distributed/task/ORemoteTaskFactory.java diff --git a/client/src/main/java/com/orientechnologies/orient/client/remote/OServerAdmin.java b/client/src/main/java/com/orientechnologies/orient/client/remote/OServerAdmin.java index 1782fb399c1..97d40170348 100755 --- a/client/src/main/java/com/orientechnologies/orient/client/remote/OServerAdmin.java +++ b/client/src/main/java/com/orientechnologies/orient/client/remote/OServerAdmin.java @@ -41,9 +41,9 @@ * Remote administration class of OrientDB Server instances. */ public class OServerAdmin { - private OStorageRemote storage; - private int sessionId = -1; - private byte[] sessionToken = null; + protected OStorageRemote storage; + private int sessionId = -1; + private byte[] sessionToken = null; /** * Creates the object passing a remote URL to connect. sessionToken @@ -86,7 +86,7 @@ public synchronized OServerAdmin connect(final String iUserName, final String iU @Override public Void execute(OChannelBinaryAsynchClient network) throws IOException { try { - storage.beginRequest(network,OChannelBinaryProtocol.REQUEST_CONNECT); + storage.beginRequest(network, OChannelBinaryProtocol.REQUEST_CONNECT); storage.sendClientInfo(network); @@ -110,7 +110,7 @@ public Void execute(OChannelBinaryAsynchClient network) throws IOException { return null; } - },"Cannot connect to the remote server/database '" + storage.getURL() + "'"); + }, "Cannot connect to the remote server/database '" + storage.getURL() + "'"); return this; } @@ -210,7 +210,8 @@ public synchronized OServerAdmin createDatabase(final String iDatabaseType, Stri * @return The instance itself. Useful to execute method in chain * @throws IOException */ - public synchronized OServerAdmin createDatabase(final String iDatabaseName, final String iDatabaseType, final String iStorageMode) throws IOException { + public synchronized OServerAdmin createDatabase(final String iDatabaseName, final String iDatabaseType, final String iStorageMode) + throws IOException { if (iDatabaseName == null || iDatabaseName.length() <= 0) { final String message = "Cannot create unnamed remote storage. Check your syntax"; @@ -396,7 +397,7 @@ public synchronized OServerAdmin freezeDatabase(final String storageType) throws @Override public Void execute(OChannelBinaryAsynchClient network) throws IOException { try { - storage.beginRequest(network,OChannelBinaryProtocol.REQUEST_DB_FREEZE); + storage.beginRequest(network, OChannelBinaryProtocol.REQUEST_DB_FREEZE); network.writeString(storage.getName()); network.writeString(storageType); } finally { @@ -406,7 +407,7 @@ public Void execute(OChannelBinaryAsynchClient network) throws IOException { storage.getResponse(network); return null; } - },"Cannot freeze the remote storage: " + storage.getName()); + }, "Cannot freeze the remote storage: " + storage.getName()); return this; } @@ -499,7 +500,6 @@ public Void execute(final OChannelBinaryAsynchClient network) throws IOException storage.endRequest(network); } - storage.getResponse(network); return null; } @@ -532,14 +532,15 @@ public ODocument clusterStatus() { * @return The instance itself. Useful to execute method in chain * @throws IOException */ - public synchronized OServerAdmin copyDatabase(final String databaseName, final String iDatabaseUserName, final String iDatabaseUserPassword, final String iRemoteName, final String iRemoteEngine) throws IOException { + public synchronized OServerAdmin copyDatabase(final String databaseName, final String iDatabaseUserName, + final String iDatabaseUserPassword, final String iRemoteName, final String iRemoteEngine) throws IOException { networkAdminOperation(new OStorageRemoteOperation() { @Override public Void execute(final OChannelBinaryAsynchClient network) throws IOException { try { - storage.beginRequest(network,OChannelBinaryProtocol.REQUEST_DB_COPY); + storage.beginRequest(network, OChannelBinaryProtocol.REQUEST_DB_COPY); network.writeString(databaseName); network.writeString(iDatabaseUserName); network.writeString(iDatabaseUserPassword); @@ -564,7 +565,7 @@ public synchronized Map getGlobalConfigurations() throws IOExcep @Override public Map execute(OChannelBinaryAsynchClient network) throws IOException { final Map config = new HashMap(); - storage.beginRequest(network,OChannelBinaryProtocol.REQUEST_CONFIG_LIST); + storage.beginRequest(network, OChannelBinaryProtocol.REQUEST_CONFIG_LIST); storage.endRequest(network); try { @@ -586,7 +587,7 @@ public synchronized String getGlobalConfiguration(final OGlobalConfiguration con return networkAdminOperation(new OStorageRemoteOperation() { @Override public String execute(OChannelBinaryAsynchClient network) throws IOException { - storage.beginRequest(network,OChannelBinaryProtocol.REQUEST_CONFIG_GET); + storage.beginRequest(network, OChannelBinaryProtocol.REQUEST_CONFIG_GET); network.writeString(config.getKey()); network.endRequest(); @@ -597,10 +598,11 @@ public String execute(OChannelBinaryAsynchClient network) throws IOException { storage.endResponse(network); } } - },"Cannot retrieve the configuration value: " + config.getKey()); + }, "Cannot retrieve the configuration value: " + config.getKey()); } - public synchronized OServerAdmin setGlobalConfiguration(final OGlobalConfiguration config, final Object iValue) throws IOException { + public synchronized OServerAdmin setGlobalConfiguration(final OGlobalConfiguration config, final Object iValue) + throws IOException { networkAdminOperation(new OStorageRemoteOperation() { @Override @@ -637,8 +639,8 @@ public boolean isConnected() { } protected ODocument sendRequest(final byte iRequest, final ODocument iPayLoad, final String iActivity) { - //Using here networkOperation because the original retry logic was lik networkOperation - storage.setSessionId(getURL(),sessionId,sessionToken); + // Using here networkOperation because the original retry logic was lik networkOperation + storage.setSessionId(getURL(), sessionId, sessionToken); return storage.networkOperation(new OStorageRemoteOperation() { @Override public ODocument execute(OChannelBinaryAsynchClient network) throws IOException { @@ -676,17 +678,16 @@ private boolean handleDBFreeze() { protected T networkAdminOperation(final OStorageRemoteOperation operation, final String errorMessage) { - OChannelBinaryAsynchClient network=null; - try { - storage.setSessionId(getURL(),sessionId,sessionToken); - //TODO:replace this api with one that get connection for only the specified url. - network = storage.getAvailableNetwork(getURL()); - return operation.execute(network); - } catch (Exception e) { - storage.close(true, false); - throw OException.wrapException(new OStorageException(errorMessage), e); - } + OChannelBinaryAsynchClient network = null; + try { + storage.setSessionId(getURL(), sessionId, sessionToken); + // TODO:replace this api with one that get connection for only the specified url. + network = storage.getAvailableNetwork(getURL()); + return operation.execute(network); + } catch (Exception e) { + storage.close(true, false); + throw OException.wrapException(new OStorageException(errorMessage), e); + } } - } diff --git a/client/src/main/java/com/orientechnologies/orient/client/remote/OStorageRemote.java b/client/src/main/java/com/orientechnologies/orient/client/remote/OStorageRemote.java index 7c5df80d15b..9d7c472ea08 100755 --- a/client/src/main/java/com/orientechnologies/orient/client/remote/OStorageRemote.java +++ b/client/src/main/java/com/orientechnologies/orient/client/remote/OStorageRemote.java @@ -19,22 +19,6 @@ */ package com.orientechnologies.orient.client.remote; -import java.io.IOException; -import java.io.InputStream; -import java.io.OutputStream; -import java.util.*; -import java.util.concurrent.Callable; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.concurrent.FutureTask; - -import javax.naming.NamingException; -import javax.naming.directory.Attribute; -import javax.naming.directory.Attributes; -import javax.naming.directory.DirContext; -import javax.naming.directory.InitialDirContext; - import com.orientechnologies.common.concur.OOfflineNodeException; import com.orientechnologies.common.concur.lock.OModificationOperationProhibitedException; import com.orientechnologies.common.exception.OException; @@ -59,13 +43,7 @@ import com.orientechnologies.orient.core.db.record.ORecordOperation; import com.orientechnologies.orient.core.db.record.ridbag.sbtree.OBonsaiCollectionPointer; import com.orientechnologies.orient.core.db.record.ridbag.sbtree.OSBTreeCollectionManager; -import com.orientechnologies.orient.core.exception.OCommandExecutionException; -import com.orientechnologies.orient.core.exception.OConfigurationException; -import com.orientechnologies.orient.core.exception.ODatabaseException; -import com.orientechnologies.orient.core.exception.ORecordNotFoundException; -import com.orientechnologies.orient.core.exception.OSecurityException; -import com.orientechnologies.orient.core.exception.OStorageException; -import com.orientechnologies.orient.core.exception.OTransactionException; +import com.orientechnologies.orient.core.exception.*; import com.orientechnologies.orient.core.id.ORID; import com.orientechnologies.orient.core.id.ORecordId; import com.orientechnologies.orient.core.metadata.security.OTokenException; @@ -78,20 +56,24 @@ import com.orientechnologies.orient.core.serialization.serializer.stream.OStreamSerializerAnyStreamable; import com.orientechnologies.orient.core.sql.query.OLiveQuery; import com.orientechnologies.orient.core.sql.query.OLiveResultListener; -import com.orientechnologies.orient.core.storage.OCluster; -import com.orientechnologies.orient.core.storage.OPhysicalPosition; -import com.orientechnologies.orient.core.storage.ORawBuffer; -import com.orientechnologies.orient.core.storage.ORecordCallback; -import com.orientechnologies.orient.core.storage.ORecordMetadata; -import com.orientechnologies.orient.core.storage.OStorageAbstract; -import com.orientechnologies.orient.core.storage.OStorageOperationResult; -import com.orientechnologies.orient.core.storage.OStorageProxy; +import com.orientechnologies.orient.core.storage.*; import com.orientechnologies.orient.core.storage.impl.local.paginated.ORecordSerializationContext; import com.orientechnologies.orient.core.tx.OTransaction; import com.orientechnologies.orient.core.tx.OTransactionAbstract; import com.orientechnologies.orient.enterprise.channel.binary.OChannelBinaryAsynchClient; import com.orientechnologies.orient.enterprise.channel.binary.OChannelBinaryProtocol; +import javax.naming.NamingException; +import javax.naming.directory.Attribute; +import javax.naming.directory.Attributes; +import javax.naming.directory.DirContext; +import javax.naming.directory.InitialDirContext; +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.util.*; +import java.util.concurrent.*; + /** * This object is bound to each remote ODatabase instances. */ @@ -2080,7 +2062,7 @@ protected OChannelBinaryAsynchClient beginRequest(final byte iCommand) throws IO return beginRequest(getAvailableNetwork(getNextAvailableServerURL(false)), iCommand); } - protected OChannelBinaryAsynchClient beginRequest(final OChannelBinaryAsynchClient network, final byte iCommand) + public OChannelBinaryAsynchClient beginRequest(final OChannelBinaryAsynchClient network, final byte iCommand) throws IOException { final OStorageRemoteThreadLocal instance = OStorageRemoteThreadLocal.INSTANCE; network.beginRequest(iCommand,instance.get(),tokens.get(network.getServerURL())); @@ -2189,7 +2171,7 @@ protected OChannelBinaryAsynchClient getAvailableNetwork(final String iCurrentUR /** * Starts listening the response. */ - protected void beginResponse(final OChannelBinaryAsynchClient iNetwork) throws IOException { + public void beginResponse(final OChannelBinaryAsynchClient iNetwork) throws IOException { byte[] newToken = iNetwork.beginResponse(getSessionId(), true); if (newToken != null && newToken.length > 0) { setSessionId(getServerURL(), getSessionId(), newToken); diff --git a/core/src/main/java/com/orientechnologies/orient/enterprise/channel/binary/OChannelBinaryProtocol.java b/core/src/main/java/com/orientechnologies/orient/enterprise/channel/binary/OChannelBinaryProtocol.java index eddf18b7176..12e4b8a1924 100755 --- a/core/src/main/java/com/orientechnologies/orient/enterprise/channel/binary/OChannelBinaryProtocol.java +++ b/core/src/main/java/com/orientechnologies/orient/enterprise/channel/binary/OChannelBinaryProtocol.java @@ -19,14 +19,14 @@ */ package com.orientechnologies.orient.enterprise.channel.binary; -import java.io.IOException; - import com.orientechnologies.orient.core.Orient; import com.orientechnologies.orient.core.db.record.OIdentifiable; import com.orientechnologies.orient.core.id.ORecordId; import com.orientechnologies.orient.core.record.ORecord; import com.orientechnologies.orient.core.record.ORecordInternal; +import java.io.IOException; + /** * The range of the requests is 1-79. * @@ -35,109 +35,113 @@ */ public class OChannelBinaryProtocol { // OUTGOING - public static final byte REQUEST_SHUTDOWN = 1; - public static final byte REQUEST_CONNECT = 2; - - public static final byte REQUEST_DB_OPEN = 3; - public static final byte REQUEST_DB_CREATE = 4; - public static final byte REQUEST_DB_CLOSE = 5; - public static final byte REQUEST_DB_EXIST = 6; - public static final byte REQUEST_DB_DROP = 7; - public static final byte REQUEST_DB_SIZE = 8; - public static final byte REQUEST_DB_COUNTRECORDS = 9; - public static final byte REQUEST_DB_REOPEN = 17; - - public static final byte REQUEST_DATACLUSTER_ADD = 10; - public static final byte REQUEST_DATACLUSTER_DROP = 11; - public static final byte REQUEST_DATACLUSTER_COUNT = 12; - public static final byte REQUEST_DATACLUSTER_DATARANGE = 13; - public static final byte REQUEST_DATACLUSTER_COPY = 14; - public static final byte REQUEST_DATACLUSTER_LH_CLUSTER_IS_USED = 16; // since 1.2.0 - - public static final byte REQUEST_DATASEGMENT_ADD = 20; - public static final byte REQUEST_DATASEGMENT_DROP = 21; - - public static final byte REQUEST_INCREMENTAL_BACKUP = 27; // since 2.2 - public static final byte REQUEST_INCREMENTAL_RESTORE = 28; // since 2.2 - - public static final byte REQUEST_RECORD_METADATA = 29; // since 1.4.0 - public static final byte REQUEST_RECORD_LOAD = 30; - public static final byte REQUEST_RECORD_CREATE = 31; - public static final byte REQUEST_RECORD_UPDATE = 32; - public static final byte REQUEST_RECORD_DELETE = 33; - public static final byte REQUEST_RECORD_COPY = 34; - public static final byte REQUEST_POSITIONS_HIGHER = 36; // since 1.3.0 - public static final byte REQUEST_POSITIONS_LOWER = 37; // since 1.3.0 - public static final byte REQUEST_RECORD_CLEAN_OUT = 38; // since 1.3.0 - public static final byte REQUEST_POSITIONS_FLOOR = 39; // since 1.3.0 - - public static final byte REQUEST_COUNT = 40; // DEPRECATED: USE - // REQUEST_DATACLUSTER_COUNT - public static final byte REQUEST_COMMAND = 41; - public static final byte REQUEST_POSITIONS_CEILING = 42; // since 1.3.0 - public static final byte REQUEST_RECORD_HIDE = 43; // since 1.7 - public static final byte REQUEST_RECORD_LOAD_IF_VERSION_NOT_LATEST = 44; // since 2.1 - - public static final byte REQUEST_TX_COMMIT = 60; - - public static final byte REQUEST_CONFIG_GET = 70; - public static final byte REQUEST_CONFIG_SET = 71; - public static final byte REQUEST_CONFIG_LIST = 72; - public static final byte REQUEST_DB_RELOAD = 73; // SINCE 1.0rc4 - public static final byte REQUEST_DB_LIST = 74; // SINCE 1.0rc6 - public static final byte REQUEST_SERVER_INFO = 75; // SINCE 2.2.0 - - public static final byte REQUEST_PUSH_DISTRIB_CONFIG = 80; - public static final byte REQUEST_PUSH_LIVE_QUERY = 81; // SINCE 2.1 + public static final byte REQUEST_SHUTDOWN = 1; + public static final byte REQUEST_CONNECT = 2; + + public static final byte REQUEST_DB_OPEN = 3; + public static final byte REQUEST_DB_CREATE = 4; + public static final byte REQUEST_DB_CLOSE = 5; + public static final byte REQUEST_DB_EXIST = 6; + public static final byte REQUEST_DB_DROP = 7; + public static final byte REQUEST_DB_SIZE = 8; + public static final byte REQUEST_DB_COUNTRECORDS = 9; + public static final byte REQUEST_DB_REOPEN = 17; + + public static final byte REQUEST_DATACLUSTER_ADD = 10; + public static final byte REQUEST_DATACLUSTER_DROP = 11; + public static final byte REQUEST_DATACLUSTER_COUNT = 12; + public static final byte REQUEST_DATACLUSTER_DATARANGE = 13; + public static final byte REQUEST_DATACLUSTER_COPY = 14; + public static final byte REQUEST_DATACLUSTER_LH_CLUSTER_IS_USED = 16; // since 1.2.0 + + public static final byte REQUEST_DATASEGMENT_ADD = 20; + public static final byte REQUEST_DATASEGMENT_DROP = 21; + + public static final byte REQUEST_INCREMENTAL_BACKUP = 27; // since 2.2 + public static final byte REQUEST_INCREMENTAL_RESTORE = 28; // since 2.2 + + public static final byte REQUEST_RECORD_METADATA = 29; // since 1.4.0 + public static final byte REQUEST_RECORD_LOAD = 30; + public static final byte REQUEST_RECORD_CREATE = 31; + public static final byte REQUEST_RECORD_UPDATE = 32; + public static final byte REQUEST_RECORD_DELETE = 33; + public static final byte REQUEST_RECORD_COPY = 34; + public static final byte REQUEST_POSITIONS_HIGHER = 36; // since 1.3.0 + public static final byte REQUEST_POSITIONS_LOWER = 37; // since 1.3.0 + public static final byte REQUEST_RECORD_CLEAN_OUT = 38; // since 1.3.0 + public static final byte REQUEST_POSITIONS_FLOOR = 39; // since 1.3.0 + + public static final byte REQUEST_COUNT = 40; // DEPRECATED: USE + // REQUEST_DATACLUSTER_COUNT + public static final byte REQUEST_COMMAND = 41; + public static final byte REQUEST_POSITIONS_CEILING = 42; // since 1.3.0 + public static final byte REQUEST_RECORD_HIDE = 43; // since 1.7 + public static final byte REQUEST_RECORD_LOAD_IF_VERSION_NOT_LATEST = 44; // since 2.1 + + public static final byte REQUEST_TX_COMMIT = 60; + + public static final byte REQUEST_CONFIG_GET = 70; + public static final byte REQUEST_CONFIG_SET = 71; + public static final byte REQUEST_CONFIG_LIST = 72; + public static final byte REQUEST_DB_RELOAD = 73; // SINCE 1.0rc4 + public static final byte REQUEST_DB_LIST = 74; // SINCE 1.0rc6 + public static final byte REQUEST_SERVER_INFO = 75; // SINCE 2.2.0 + + public static final byte REQUEST_PUSH_DISTRIB_CONFIG = 80; + public static final byte REQUEST_PUSH_LIVE_QUERY = 81; // SINCE 2.1 // DISTRIBUTED - public static final byte REQUEST_DB_COPY = 90; // SINCE 1.0rc8 - public static final byte REQUEST_REPLICATION = 91; // SINCE 1.0 - public static final byte REQUEST_CLUSTER = 92; // SINCE 1.0 - public static final byte REQUEST_DB_TRANSFER = 93; // SINCE 1.0.2 + public static final byte REQUEST_DB_COPY = 90; // SINCE 1.0rc8 + public static final byte REQUEST_REPLICATION = 91; // SINCE 1.0 + public static final byte REQUEST_CLUSTER = 92; // SINCE 1.0 + public static final byte REQUEST_DB_TRANSFER = 93; // SINCE 1.0.2 // Lock + sync - public static final byte REQUEST_DB_FREEZE = 94; // SINCE 1.1.0 - public static final byte REQUEST_DB_RELEASE = 95; // SINCE 1.1.0 + public static final byte REQUEST_DB_FREEZE = 94; // SINCE 1.1.0 + public static final byte REQUEST_DB_RELEASE = 95; // SINCE 1.1.0 - public static final byte REQUEST_DATACLUSTER_FREEZE = 96; - public static final byte REQUEST_DATACLUSTER_RELEASE = 97; + public static final byte REQUEST_DATACLUSTER_FREEZE = 96; + public static final byte REQUEST_DATACLUSTER_RELEASE = 97; // REMOTE SB-TREE COLLECTIONS - public static final byte REQUEST_CREATE_SBTREE_BONSAI = 110; - public static final byte REQUEST_SBTREE_BONSAI_GET = 111; - public static final byte REQUEST_SBTREE_BONSAI_FIRST_KEY = 112; - public static final byte REQUEST_SBTREE_BONSAI_GET_ENTRIES_MAJOR = 113; - public static final byte REQUEST_RIDBAG_GET_SIZE = 114; + public static final byte REQUEST_CREATE_SBTREE_BONSAI = 110; + public static final byte REQUEST_SBTREE_BONSAI_GET = 111; + public static final byte REQUEST_SBTREE_BONSAI_FIRST_KEY = 112; + public static final byte REQUEST_SBTREE_BONSAI_GET_ENTRIES_MAJOR = 113; + public static final byte REQUEST_RIDBAG_GET_SIZE = 114; + + // TASK + public static final byte DISTRIBUTED_REQUEST = 120; + public static final byte DISTRIBUTED_RESPONSE = 121; // INCOMING - public static final byte RESPONSE_STATUS_OK = 0; - public static final byte RESPONSE_STATUS_ERROR = 1; - public static final byte PUSH_DATA = 3; + public static final byte RESPONSE_STATUS_OK = 0; + public static final byte RESPONSE_STATUS_ERROR = 1; + public static final byte PUSH_DATA = 3; // CONSTANTS - public static final short RECORD_NULL = -2; - public static final short RECORD_RID = -3; + public static final short RECORD_NULL = -2; + public static final short RECORD_RID = -3; // FOR MORE INFO: https://github.com/orientechnologies/orientdb/wiki/Network-Binary-Protocol#wiki-Compatibility - public static final int PROTOCOL_VERSION_21 = 21; - - public static final int PROTOCOL_VERSION_24 = 24; - public static final int PROTOCOL_VERSION_25 = 25; - public static final int PROTOCOL_VERSION_26 = 26; - public static final int PROTOCOL_VERSION_27 = 27; - public static final int PROTOCOL_VERSION_28 = 28; // SENT AS SHORT AS FIRST PACKET AFTER - // SOCKET CONNECTION - public static final int PROTOCOL_VERSION_29 = 29; // ADDED PUSH SUPPORT FOR LIVE QUERY - public static final int PROTOCOL_VERSION_30 = 30; // NEW COMMAND TO READ RECORD ONLY IF - // VERSION IS NOT LATEST WAS ADD - public static final int PROTOCOL_VERSION_31 = 31; // CHANGED STORAGE CFG TO ADD - // ENCRYPTION - public static final int PROTOCOL_VERSION_32 = 32; // STREAMABLE RESULT SET - - public static final int PROTOCOL_VERSION_33 = 33; // INCREMENTAL BACKUP/RESTORE - - public static final int CURRENT_PROTOCOL_VERSION = PROTOCOL_VERSION_33; + public static final int PROTOCOL_VERSION_21 = 21; + + public static final int PROTOCOL_VERSION_24 = 24; + public static final int PROTOCOL_VERSION_25 = 25; + public static final int PROTOCOL_VERSION_26 = 26; + public static final int PROTOCOL_VERSION_27 = 27; + public static final int PROTOCOL_VERSION_28 = 28; // SENT AS SHORT AS FIRST PACKET AFTER + // SOCKET CONNECTION + public static final int PROTOCOL_VERSION_29 = 29; // ADDED PUSH SUPPORT FOR LIVE QUERY + public static final int PROTOCOL_VERSION_30 = 30; // NEW COMMAND TO READ RECORD ONLY IF + // VERSION IS NOT LATEST WAS ADD + public static final int PROTOCOL_VERSION_31 = 31; // CHANGED STORAGE CFG TO ADD + // ENCRYPTION + public static final int PROTOCOL_VERSION_32 = 32; // STREAMABLE RESULT SET + + public static final int PROTOCOL_VERSION_33 = 33; // INCREMENTAL BACKUP/RESTORE + + public static final int CURRENT_PROTOCOL_VERSION = PROTOCOL_VERSION_33; public static OIdentifiable readIdentifiable(final OChannelBinary network) throws IOException { final int classId = network.readShort(); diff --git a/distributed/src/main/java/com/orientechnologies/orient/server/hazelcast/ODistributedWorker.java b/distributed/src/main/java/com/orientechnologies/orient/server/hazelcast/ODistributedWorker.java deleted file mode 100755 index d2997fb7134..00000000000 --- a/distributed/src/main/java/com/orientechnologies/orient/server/hazelcast/ODistributedWorker.java +++ /dev/null @@ -1,343 +0,0 @@ -/* - * - * * Copyright 2014 Orient Technologies LTD (info(at)orientechnologies.com) - * * - * * Licensed under the Apache License, Version 2.0 (the "License"); - * * you may not use this file except in compliance with the License. - * * You may obtain a copy of the License at - * * - * * http://www.apache.org/licenses/LICENSE-2.0 - * * - * * Unless required by applicable law or agreed to in writing, software - * * distributed under the License is distributed on an "AS IS" BASIS, - * * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * * See the License for the specific language governing permissions and - * * limitations under the License. - * * - * * For more information: http://www.orientechnologies.com - * - */ -package com.orientechnologies.orient.server.hazelcast; - -import com.hazelcast.core.HazelcastInstanceNotActiveException; -import com.hazelcast.core.IQueue; -import com.hazelcast.spi.exception.DistributedObjectDestroyedException; -import com.orientechnologies.common.concur.lock.OModificationOperationProhibitedException; -import com.orientechnologies.common.exception.OException; -import com.orientechnologies.common.log.OLogManager; -import com.orientechnologies.orient.core.config.OGlobalConfiguration; -import com.orientechnologies.orient.core.db.OScenarioThreadLocal; -import com.orientechnologies.orient.core.db.document.ODatabaseDocumentTx; -import com.orientechnologies.orient.core.metadata.security.OSecurityUser; -import com.orientechnologies.orient.core.metadata.security.OUser; -import com.orientechnologies.orient.server.distributed.ODiscardedResponse; -import com.orientechnologies.orient.server.distributed.ODistributedException; -import com.orientechnologies.orient.server.distributed.ODistributedRequest; -import com.orientechnologies.orient.server.distributed.ODistributedServerLog; -import com.orientechnologies.orient.server.distributed.ODistributedServerLog.DIRECTION; -import com.orientechnologies.orient.server.distributed.task.OAbstractRemoteTask; - -import java.io.Serializable; -import java.util.Queue; -import java.util.concurrent.ArrayBlockingQueue; -import java.util.concurrent.TimeUnit; - -/** - * Hazelcast implementation of distributed peer. There is one instance per database. Each node creates own instance to talk with - * each others. - * - * @author Luca Garulli (l.garulli--at--orientechnologies.com) - * - */ -public class ODistributedWorker extends Thread { - - private final static int LOCAL_QUEUE_MAXSIZE = 1000; - protected final OHazelcastDistributedDatabase distributed; - protected final OHazelcastPlugin manager; - protected final OHazelcastDistributedMessageService msgService; - protected final String databaseName; - protected final IQueue requestQueue; - protected Queue localQueue = new ArrayBlockingQueue( - LOCAL_QUEUE_MAXSIZE); - protected volatile ODatabaseDocumentTx database; - protected volatile OUser lastUser; - protected volatile boolean running = true; - - public ODistributedWorker(final OHazelcastDistributedDatabase iDistributed, final IQueue iRequestQueue, - final String iDatabaseName, final int i) { - setName("OrientDB DistributedWorker node=" + iDistributed.getLocalNodeName() + " db=" + iDatabaseName + " id=" + i); - distributed = iDistributed; - requestQueue = iRequestQueue; - databaseName = iDatabaseName; - manager = distributed.manager; - msgService = distributed.msgService; - } - - @Override - public void run() { - for (long processedMessages = 0; running; processedMessages++) { - String senderNode = null; - ODistributedRequest message = null; - try { - message = readRequest(); - - if (message != null) { - message.getId(); - senderNode = message.getSenderNodeName(); - onMessage(message); - } - - } catch (InterruptedException e) { - // EXIT CURRENT THREAD - Thread.interrupted(); - break; - } catch (DistributedObjectDestroyedException e) { - Thread.interrupted(); - break; - } catch (HazelcastInstanceNotActiveException e) { - Thread.interrupted(); - break; - } catch (Throwable e) { - if (e.getCause() instanceof InterruptedException) - Thread.interrupted(); - else - ODistributedServerLog.error(this, manager.getLocalNodeName(), senderNode, DIRECTION.IN, - "error on executing distributed request %d: %s", e, message != null ? message.getId() : -1, - message != null ? message.getTask() : "-"); - } - } - - ODistributedServerLog.debug(this, manager.getLocalNodeName(), null, DIRECTION.NONE, "end of reading requests for database %s", - databaseName); - } - - public void initDatabaseInstance() { - if (database == null) { - // OPEN IT - database = (ODatabaseDocumentTx) manager.getServerInstance().openDatabase(databaseName, "internal", "internal", null, true); - - // AVOID RELOADING DB INFORMATION BECAUSE OF DEADLOCKS - // database.reload(); - - } else if (database.isClosed()) { - // DATABASE CLOSED, REOPEN IT - manager.getServerInstance().openDatabase(database, "internal", "internal", null, true); - - // AVOID RELOADING DB INFORMATION BECAUSE OF DEADLOCKS - // database.reload(); - } - } - - public void shutdown() { - final int pendingMsgs = localQueue.size(); - - if (pendingMsgs > 0) - ODistributedServerLog.warn(this, getLocalNodeName(), null, ODistributedServerLog.DIRECTION.NONE, - "Received shutdown signal, waiting for distributed worker queue is empty (pending msgs=%d)...", pendingMsgs); - - try { - running = false; - interrupt(); - - if (pendingMsgs > 0) - join(); - - ODistributedServerLog.warn(this, getLocalNodeName(), null, ODistributedServerLog.DIRECTION.NONE, - "Shutdown distributed worker completed"); - - localQueue.clear(); - - if (database != null) { - database.activateOnCurrentThread(); - database.close(); - } - - } catch (Exception e) { - ODistributedServerLog.warn(this, getLocalNodeName(), null, ODistributedServerLog.DIRECTION.NONE, - "Error on shutting down distributed worker", e); - - } - } - - public ODatabaseDocumentTx getDatabase() { - return database; - } - - protected ODistributedRequest readRequest() throws InterruptedException { - // GET FROM DISTRIBUTED QUEUE. IF EMPTY WAIT FOR A MESSAGE - ODistributedRequest req = nextMessage(); - - while (distributed.waitForMessageId.get() > -1) { - if (req != null) { - if (req.getId() >= distributed.waitForMessageId.get()) { - // ARRIVED, RESET IT - ODistributedServerLog.debug(this, manager.getLocalNodeName(), req.getSenderNodeName(), DIRECTION.IN, - "reached waited request %d on request=%s sourceNode=%s", distributed.waitForMessageId.get(), req, - req.getSenderNodeName()); - - distributed.waitForMessageId.set(-1); - break; - } else { - // SKIP IT - ODistributedServerLog.debug(this, manager.getLocalNodeName(), req.getSenderNodeName(), DIRECTION.IN, - "discarded request %d because waiting for %d request=%s sourceNode=%s", req.getId(), - distributed.waitForMessageId.get(), req, req.getSenderNodeName()); - - sendResponseBack(req, req.getTask(), new ODiscardedResponse()); - - // READ THE NEXT ONE - req = nextMessage(); - } - } - } - - if (ODistributedServerLog.isDebugEnabled()) - ODistributedServerLog.debug(this, manager.getLocalNodeName(), req.getSenderNodeName(), DIRECTION.IN, - "processing request=%s sourceNode=%s", req, req.getSenderNodeName()); - - return req; - } - - protected ODistributedRequest nextMessage() throws InterruptedException { - while (localQueue.isEmpty()) { - // WAIT FOR THE FIRST MESSAGE - localQueue.offer((ODistributedRequest) requestQueue.take()); - - // READ MULTIPLE MSGS IN ONE SHOT BY USING LOCAL QUEUE TO IMPROVE PERFORMANCE - requestQueue.drainTo(localQueue, LOCAL_QUEUE_MAXSIZE - 1); - } - - return localQueue.poll(); - } - - /** - * Execute the remote call on the local node and send back the result - */ - protected void onMessage(final ODistributedRequest iRequest) { - OScenarioThreadLocal.INSTANCE.set(OScenarioThreadLocal.RUN_MODE.RUNNING_DISTRIBUTED); - - try { - final OAbstractRemoteTask task = iRequest.getTask(); - - if (ODistributedServerLog.isDebugEnabled()) - ODistributedServerLog.debug(this, manager.getLocalNodeName(), iRequest.getSenderNodeName(), DIRECTION.OUT, - "received request: %s", iRequest); - - // EXECUTE IT LOCALLY - Serializable responsePayload; - OSecurityUser origin = null; - try { - // EXECUTE THE TASK - for (int retry = 1;; ++retry) { - if (task.isRequiredOpenDatabase()) - initDatabaseInstance(); - - database.activateOnCurrentThread(); - - task.setNodeSource(iRequest.getSenderNodeName()); - - // keep original user in database, check the username passed in request and set new user in DB, after document saved, - // reset - // to original user - if (database != null) { - origin = database.getUser(); - try { - if (lastUser == null || !(lastUser.getIdentity()).equals(iRequest.getUserRID())) - lastUser = database.getMetadata().getSecurity().getUser(iRequest.getUserRID()); - database.setUser(lastUser);// set to new user - } catch (Throwable ex) { - OLogManager.instance().error(this, "Failed on user switching " + ex.getMessage()); - } - } - - responsePayload = manager.executeOnLocalNode(iRequest, database); - - if (responsePayload instanceof OModificationOperationProhibitedException) { - // RETRY - try { - ODistributedServerLog.info(this, manager.getLocalNodeName(), iRequest.getSenderNodeName(), DIRECTION.OUT, - "Database is frozen, waiting and retrying. Request %s (retry=%d)", iRequest, retry); - - Thread.sleep(1000); - } catch (InterruptedException e) { - } - } else { - // OPERATION EXECUTED (OK OR ERROR), NO RETRY NEEDED - if (retry > 1) - ODistributedServerLog.info(this, manager.getLocalNodeName(), iRequest.getSenderNodeName(), DIRECTION.OUT, - "Request %s succeed after retry=%d", iRequest, retry); - - break; - } - - } - - } finally { - if (database != null && !database.isClosed()) { - database.activateOnCurrentThread(); - if (!database.isClosed()) { - database.rollback(); - database.getLocalCache().clear(); - database.setUser(origin); - } - } - } - - if (running) - sendResponseBack(iRequest, task, responsePayload); - - } finally { - OScenarioThreadLocal.INSTANCE.set(OScenarioThreadLocal.RUN_MODE.DEFAULT); - } - } - - /** - * Composes the undo queue name based on node name. - */ - protected String getPendingRequestMapName() { - final StringBuilder buffer = new StringBuilder(128); - buffer.append(distributed.NODE_QUEUE_PREFIX); - buffer.append(manager.getLocalNodeName()); - buffer.append(distributed.NODE_QUEUE_PENDING_POSTFIX); - return buffer.toString(); - } - - protected String getLocalNodeName() { - return manager.getLocalNodeName(); - } - - /** - * Checks if last pending operation must be re-executed or not. In some circustamces the exception - * OHotAlignmentNotPossibleException is raised because it's not possible to recover the database state. - * - * @throws OHotAlignmentNotPossibleException - */ - protected void hotAlignmentError(final ODistributedRequest iLastPendingRequest, final String iMessage, final Object... iParams) - throws OHotAlignmentNotPossibleException { - final String msg = String.format(iMessage, iParams); - - ODistributedServerLog.warn(this, getLocalNodeName(), iLastPendingRequest.getSenderNodeName(), DIRECTION.IN, "- " + msg); - throw new OHotAlignmentNotPossibleException(msg); - } - - private void sendResponseBack(final ODistributedRequest iRequest, final OAbstractRemoteTask task, Serializable responsePayload) { - ODistributedServerLog.debug(this, manager.getLocalNodeName(), iRequest.getSenderNodeName(), DIRECTION.OUT, - "sending back response '%s' to request %d (%s)", responsePayload, iRequest.getId(), task); - - final OHazelcastDistributedResponse response = new OHazelcastDistributedResponse(iRequest.getId(), manager.getLocalNodeName(), - iRequest.getSenderNodeName(), responsePayload); - - try { - // GET THE SENDER'S RESPONSE QUEUE - final IQueue queue = msgService - .getQueue(OHazelcastDistributedMessageService.getResponseQueueName(iRequest.getSenderNodeName())); - - if (!queue.offer(response, OGlobalConfiguration.DISTRIBUTED_QUEUE_TIMEOUT.getValueAsLong(), TimeUnit.MILLISECONDS)) - throw new ODistributedException("Timeout on dispatching response to the thread queue " + iRequest.getSenderNodeName()); - - } catch (Exception e) { - throw OException.wrapException( - new ODistributedException("Cannot dispatch response to the thread queue " + iRequest.getSenderNodeName()), e); - } - } -} diff --git a/distributed/src/main/java/com/orientechnologies/orient/server/hazelcast/OHazelcastDistributedDatabase.java b/distributed/src/main/java/com/orientechnologies/orient/server/hazelcast/OHazelcastDistributedDatabase.java index 692778de632..0189518ea31 100755 --- a/distributed/src/main/java/com/orientechnologies/orient/server/hazelcast/OHazelcastDistributedDatabase.java +++ b/distributed/src/main/java/com/orientechnologies/orient/server/hazelcast/OHazelcastDistributedDatabase.java @@ -24,38 +24,17 @@ import com.orientechnologies.common.util.OPair; import com.orientechnologies.orient.core.Orient; import com.orientechnologies.orient.core.command.OCommandDistributedReplicateRequest; -import com.orientechnologies.orient.core.config.OGlobalConfiguration; import com.orientechnologies.orient.core.id.ORID; import com.orientechnologies.orient.core.record.ORecord; -import com.orientechnologies.orient.server.distributed.ODistributedConfiguration; -import com.orientechnologies.orient.server.distributed.ODistributedDatabase; -import com.orientechnologies.orient.server.distributed.ODistributedException; -import com.orientechnologies.orient.server.distributed.ODistributedRequest; -import com.orientechnologies.orient.server.distributed.ODistributedResponse; -import com.orientechnologies.orient.server.distributed.ODistributedResponseManager; -import com.orientechnologies.orient.server.distributed.ODistributedServerLog; +import com.orientechnologies.orient.server.distributed.*; import com.orientechnologies.orient.server.distributed.ODistributedServerLog.DIRECTION; -import com.orientechnologies.orient.server.distributed.ODistributedServerManager; -import com.orientechnologies.orient.server.distributed.ODistributedSyncConfiguration; -import com.orientechnologies.orient.server.distributed.task.OAbstractRemoteTask; -import com.orientechnologies.orient.server.distributed.task.OCreateRecordTask; -import com.orientechnologies.orient.server.distributed.task.ODeleteRecordTask; -import com.orientechnologies.orient.server.distributed.task.OFixTxTask; -import com.orientechnologies.orient.server.distributed.task.OResurrectRecordTask; -import com.orientechnologies.orient.server.distributed.task.OSQLCommandTask; -import com.orientechnologies.orient.server.distributed.task.OTxTask; -import com.orientechnologies.orient.server.distributed.task.OUpdateRecordTask; +import com.orientechnologies.orient.server.distributed.task.*; import java.io.File; import java.io.IOException; -import java.util.ArrayList; -import java.util.Collection; -import java.util.Iterator; -import java.util.List; -import java.util.Map; +import java.util.*; import java.util.concurrent.Callable; import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.locks.Lock; @@ -80,7 +59,6 @@ public class OHazelcastDistributedDatabase implements ODistributedDatabase { protected final int numWorkers = 8; protected volatile boolean restoringMessages = false; protected AtomicBoolean status = new AtomicBoolean(false); - protected List workers = new ArrayList(); protected AtomicLong waitForMessageId = new AtomicLong(-1); protected ConcurrentHashMap lockManager = new ConcurrentHashMap(); protected ConcurrentHashMap queueSizes = new ConcurrentHashMap(); @@ -142,9 +120,8 @@ public ODistributedResponse send2Nodes(final ODistributedRequest iRequest, final expectedSynchronousResponses, quorum, waitLocalNode, iRequest.getTask().getSynchronousTimeout(expectedSynchronousResponses), iRequest.getTask().getTotalTimeout(queueSize), groupByResponse); - final long timeout = OGlobalConfiguration.DISTRIBUTED_QUEUE_TIMEOUT.getValueAsLong(); - - final int queueMaxSize = OGlobalConfiguration.DISTRIBUTED_QUEUE_MAXSIZE.getValueAsInteger(); +// final long timeout = OGlobalConfiguration.DISTRIBUTED_QUEUE_TIMEOUT.getValueAsLong(); +// final int queueMaxSize = OGlobalConfiguration.DISTRIBUTED_QUEUE_MAXSIZE.getValueAsInteger(); try { requestLock.lock(); @@ -163,70 +140,72 @@ public ODistributedResponse send2Nodes(final ODistributedRequest iRequest, final for (OPair entry : reqQueues) { final String node = entry.getKey(); - final IQueue queue = entry.getValue(); - - if (queue != null) { - int nodeQueueSize = queue.size(); - - if (queueMaxSize > 0 && nodeQueueSize > queueMaxSize) { - - Integer nodeQueuePrevSize = queueSizes.get(node); - if (nodeQueuePrevSize == null) { - nodeQueuePrevSize = 0; - } - Integer nodeQueueWarnings = queueWarningCounter.get(node); - if (nodeQueueWarnings == null) { - nodeQueueWarnings = 0; - } - - final ODistributedServerManager.DB_STATUS nodeStatus = manager.getDatabaseStatus(node, databaseName); - if (nodeStatus == ODistributedServerManager.DB_STATUS.SYNCHRONIZING - || nodeStatus == ODistributedServerManager.DB_STATUS.BACKUP) { - - // BACKUP, SYNCHRONIZING: SEND THE MESSAGE AS WELL - queue.offer(iRequest, timeout, TimeUnit.MILLISECONDS); - - queueWarningCounter.remove(node); - - } else if (nodeQueueSize < nodeQueuePrevSize || nodeQueueWarnings < 10) { - - // THE QUEUE IS NOT INCREASING IN SIZE OR IS UNDER THE WARNING THRESHOLD: SEND THE MESSAGE AS WELL - queue.offer(iRequest, timeout, TimeUnit.MILLISECONDS); - - if (System.currentTimeMillis() - manager.getLastClusterChangeOn() > 10000) { - queueWarningCounter.put(node, nodeQueueWarnings + 1); - ODistributedServerLog.debug(this, getLocalNodeName(), node, DIRECTION.OUT, - "queue '%s' has too many messages (%d), checking if the node is in stall (warnings=%d)", queue.getName(), - nodeQueueSize, nodeQueueWarnings); - } else { - queueWarningCounter.remove(node); - ODistributedServerLog.debug(this, getLocalNodeName(), node, DIRECTION.OUT, - "queue '%s' has too many messages (%d), but the cluster shape is changed recently (%d secs)", queue.getName(), - nodeQueueSize, ((System.currentTimeMillis() - manager.getLastClusterChangeOn()) / 1000)); - } - - } else { - // NODE SEEMS IN STALL FOR UNKNOWN REASON - ODistributedServerLog.warn(this, getLocalNodeName(), node, DIRECTION.OUT, - "queue '%s' has too many messages (%d), treating the node as in stall: trying to restart it...", - queue.getName(), nodeQueueSize); - - // CLEAR THE QUEUE TO AVOID AN OOM IN THE CLUSTER - queue.clear(); - queueWarningCounter.remove(node); - nodeQueueSize = 0; - - manager.disconnectNode(entry.getKey()); - } - } else { - // SEND THE MESSAGE - queue.offer(iRequest, timeout, TimeUnit.MILLISECONDS); - queueWarningCounter.remove(node); - } - - // SAVE LAST QUEUE SIZE VALUE - queueSizes.put(node, nodeQueueSize); - } + + final ORemoteServerController remoteServer = manager.getRemoteServer(node); + remoteServer.executeRequest(iRequest); + + // if (queue != null) { + // int nodeQueueSize = queue.size(); + // + // if (queueMaxSize > 0 && nodeQueueSize > queueMaxSize) { + // + // Integer nodeQueuePrevSize = queueSizes.get(node); + // if (nodeQueuePrevSize == null) { + // nodeQueuePrevSize = 0; + // } + // Integer nodeQueueWarnings = queueWarningCounter.get(node); + // if (nodeQueueWarnings == null) { + // nodeQueueWarnings = 0; + // } + // + // final ODistributedServerManager.DB_STATUS nodeStatus = manager.getDatabaseStatus(node, databaseName); + // if (nodeStatus == ODistributedServerManager.DB_STATUS.SYNCHRONIZING + // || nodeStatus == ODistributedServerManager.DB_STATUS.BACKUP) { + // + // // BACKUP, SYNCHRONIZING: SEND THE MESSAGE AS WELL + // queue.offer(iRequest, timeout, TimeUnit.MILLISECONDS); + // + // queueWarningCounter.remove(node); + // + // } else if (nodeQueueSize < nodeQueuePrevSize || nodeQueueWarnings < 10) { + // + // // THE QUEUE IS NOT INCREASING IN SIZE OR IS UNDER THE WARNING THRESHOLD: SEND THE MESSAGE AS WELL + // queue.offer(iRequest, timeout, TimeUnit.MILLISECONDS); + // + // if (System.currentTimeMillis() - manager.getLastClusterChangeOn() > 10000) { + // queueWarningCounter.put(node, nodeQueueWarnings + 1); + // ODistributedServerLog.debug(this, getLocalNodeName(), node, DIRECTION.OUT, + // "queue '%s' has too many messages (%d), checking if the node is in stall (warnings=%d)", queue.getName(), + // nodeQueueSize, nodeQueueWarnings); + // } else { + // queueWarningCounter.remove(node); + // ODistributedServerLog.debug(this, getLocalNodeName(), node, DIRECTION.OUT, + // "queue '%s' has too many messages (%d), but the cluster shape is changed recently (%d secs)", queue.getName(), + // nodeQueueSize, ((System.currentTimeMillis() - manager.getLastClusterChangeOn()) / 1000)); + // } + // + // } else { + // // NODE SEEMS IN STALL FOR UNKNOWN REASON + // ODistributedServerLog.warn(this, getLocalNodeName(), node, DIRECTION.OUT, + // "queue '%s' has too many messages (%d), treating the node as in stall: trying to restart it...", + // queue.getName(), nodeQueueSize); + // + // // CLEAR THE QUEUE TO AVOID AN OOM IN THE CLUSTER + // queue.clear(); + // queueWarningCounter.remove(node); + // nodeQueueSize = 0; + // + // manager.disconnectNode(entry.getKey()); + // } + // } else { + // // SEND THE MESSAGE + // queue.offer(iRequest, timeout, TimeUnit.MILLISECONDS); + // queueWarningCounter.remove(node); + // } + // + // // SAVE LAST QUEUE SIZE VALUE + // queueSizes.put(node, nodeQueueSize); + // } } } finally { @@ -282,10 +261,6 @@ public OHazelcastDistributedDatabase configureDatabase(final Callable iCal final String queueName = OHazelcastDistributedMessageService.getRequestQueueName(getLocalNodeName(), databaseName); final IQueue requestQueue = msgService.getQueue(queueName); - final ODistributedWorker listenerThread = unqueuePendingMessages(queueName, requestQueue, clearReqQueue); - - workers.add(listenerThread); - if (iCallback != null) try { iCallback.call(); @@ -382,24 +357,6 @@ public OHazelcastDistributedDatabase setWaitForMessage(final long iMessageId) { } public void shutdown() { - for (int i = 0; i < workers.size(); ++i) - workers.get(i).shutdown(); - } - - protected ODistributedWorker unqueuePendingMessages(final String queueName, final IQueue requestQueue, - final boolean clearReqQueue) { - if (ODistributedServerLog.isDebugEnabled()) - ODistributedServerLog.debug(this, getLocalNodeName(), null, DIRECTION.NONE, "listening for incoming requests on queue: %s", - queueName); - - msgService.checkForPendingMessages(requestQueue, queueName, clearReqQueue); - - final ODistributedWorker listenerThread = new ODistributedWorker(this, requestQueue, databaseName, 0); - listenerThread.initDatabaseInstance(); - - listenerThread.start(); - - return listenerThread; } protected void checkForServerOnline(ODistributedRequest iRequest) throws ODistributedException { @@ -498,7 +455,7 @@ protected ODistributedResponse waitForResponse(final ODistributedRequest iReques } protected OPair[] getRequestQueues(final String iDatabaseName, final Collection nodes, - final OAbstractRemoteTask iTask) { + final ORemoteTask iTask) { final OPair[] queues = new OPair[nodes.size()]; int i = 0; @@ -615,7 +572,7 @@ protected void removeNodeInConfiguration(final String iNode, final boolean iForc } } - protected boolean checkIfOperationHasBeenExecuted(final ODistributedRequest lastPendingRequest, final OAbstractRemoteTask task) { + protected boolean checkIfOperationHasBeenExecuted(final ODistributedRequest lastPendingRequest, final ORemoteTask task) { boolean executeLastPendingRequest = false; // ASK FOR RECORD @@ -644,7 +601,7 @@ protected boolean checkIfOperationHasBeenExecuted(final ODistributedRequest last hotAlignmentError(lastPendingRequest, "Not able to resurrect deleted record '%s'", ((OResurrectRecordTask) task).getRid()); } else if (task instanceof OTxTask) { // CHECK EACH TX ITEM IF HAS BEEN COMMITTED - for (OAbstractRemoteTask t : ((OTxTask) task).getTasks()) { + for (ORemoteTask t : ((OTxTask) task).getTasks()) { executeLastPendingRequest = checkIfOperationHasBeenExecuted(lastPendingRequest, t); if (executeLastPendingRequest) // REPEAT THE ENTIRE TX @@ -652,7 +609,7 @@ protected boolean checkIfOperationHasBeenExecuted(final ODistributedRequest last } } else if (task instanceof OFixTxTask) { // CHECK EACH FIX-TX ITEM IF HAS BEEN COMMITTED - for (OAbstractRemoteTask t : ((OFixTxTask) task).getTasks()) { + for (ORemoteTask t : ((OFixTxTask) task).getTasks()) { executeLastPendingRequest = checkIfOperationHasBeenExecuted(lastPendingRequest, t); if (executeLastPendingRequest) // REPEAT THE ENTIRE TX diff --git a/distributed/src/main/java/com/orientechnologies/orient/server/hazelcast/OHazelcastDistributedMessageService.java b/distributed/src/main/java/com/orientechnologies/orient/server/hazelcast/OHazelcastDistributedMessageService.java index 8feb9e9b56b..3c21485505a 100644 --- a/distributed/src/main/java/com/orientechnologies/orient/server/hazelcast/OHazelcastDistributedMessageService.java +++ b/distributed/src/main/java/com/orientechnologies/orient/server/hazelcast/OHazelcastDistributedMessageService.java @@ -22,12 +22,9 @@ import com.hazelcast.collection.impl.queue.QueueService; import com.hazelcast.config.QueueConfig; import com.hazelcast.core.DistributedObject; -import com.hazelcast.core.HazelcastException; -import com.hazelcast.core.HazelcastInstanceNotActiveException; import com.hazelcast.core.IAtomicLong; import com.hazelcast.core.IQueue; import com.hazelcast.monitor.LocalQueueStats; -import com.hazelcast.spi.exception.DistributedObjectDestroyedException; import com.orientechnologies.orient.core.Orient; import com.orientechnologies.orient.core.config.OGlobalConfiguration; import com.orientechnologies.orient.core.record.impl.ODocument; @@ -36,15 +33,9 @@ import com.orientechnologies.orient.server.distributed.ODistributedResponseManager; import com.orientechnologies.orient.server.distributed.ODistributedServerLog; import com.orientechnologies.orient.server.distributed.ODistributedServerLog.DIRECTION; -import com.orientechnologies.orient.server.distributed.task.OAbstractRemoteTask; -import java.util.ArrayList; -import java.util.Iterator; -import java.util.List; -import java.util.Map; +import java.util.*; import java.util.Map.Entry; -import java.util.Set; -import java.util.TimerTask; import java.util.concurrent.ConcurrentHashMap; /** @@ -96,65 +87,6 @@ public void run() { purgePendingMessages(); } }; - - // CREATE THREAD LISTENER AGAINST orientdb.node..response, ONE PER NODE, THEN DISPATCH THE MESSAGE INTERNALLY USING THE - // THREAD ID - responseThread = new Thread(new Runnable() { - @Override - public void run() { - Thread.currentThread().setName("OrientDB Node Response " + queueName); - - while (running) { - String senderNode = null; - ODistributedResponse message = null; - try { - message = (ODistributedResponse) nodeResponseQueue.take(); - - if (message != null) { - senderNode = message.getSenderNodeName(); - - final long reqId = message.getRequestId(); - if (reqId < 0) { - // REQUEST - final OAbstractRemoteTask task = (OAbstractRemoteTask) message.getPayload(); - task.execute(manager.getServerInstance(), manager, null); - } else { - // RESPONSE - final long responseTime = dispatchResponseToThread(message); - - if (responseTime > -1) - collectMetric(responseTime); - } - } - - } catch (InterruptedException e) { - // EXIT CURRENT THREAD - Thread.interrupted(); - break; - } catch (DistributedObjectDestroyedException e) { - Thread.interrupted(); - break; - } catch (HazelcastInstanceNotActiveException e) { - Thread.interrupted(); - break; - } catch (HazelcastException e) { - if (e.getCause() instanceof InterruptedException) - Thread.interrupted(); - else - ODistributedServerLog.error(this, manager.getLocalNodeName(), senderNode, DIRECTION.IN, - "error on reading distributed response", e, message != null ? message.getPayload() : "-"); - } catch (Throwable e) { - ODistributedServerLog.error(this, manager.getLocalNodeName(), senderNode, DIRECTION.IN, - "error on reading distributed response", e, message != null ? message.getPayload() : "-"); - } - } - - ODistributedServerLog.debug(this, manager.getLocalNodeName(), null, DIRECTION.NONE, "end of reading responses"); - } - }); - - responseThread.setDaemon(true); - responseThread.start(); } /** @@ -324,7 +256,7 @@ public Set getDatabases() { * * @param response */ - protected long dispatchResponseToThread(final ODistributedResponse response) { + public long dispatchResponseToThread(final ODistributedResponse response) { final long chrono = Orient.instance().getProfiler().startChrono(); try { diff --git a/distributed/src/main/java/com/orientechnologies/orient/server/hazelcast/OHazelcastDistributedRequest.java b/distributed/src/main/java/com/orientechnologies/orient/server/hazelcast/OHazelcastDistributedRequest.java index 40c6b758b94..b5208c07e82 100644 --- a/distributed/src/main/java/com/orientechnologies/orient/server/hazelcast/OHazelcastDistributedRequest.java +++ b/distributed/src/main/java/com/orientechnologies/orient/server/hazelcast/OHazelcastDistributedRequest.java @@ -21,9 +21,8 @@ import com.orientechnologies.orient.core.id.ORID; import com.orientechnologies.orient.server.distributed.ODistributedRequest; -import com.orientechnologies.orient.server.distributed.task.OAbstractRemoteTask; +import com.orientechnologies.orient.server.distributed.task.ORemoteTask; -import java.io.Externalizable; import java.io.IOException; import java.io.ObjectInput; import java.io.ObjectOutput; @@ -34,14 +33,14 @@ * @author Luca Garulli (l.garulli--at--orientechnologies.com) * */ -public class OHazelcastDistributedRequest implements ODistributedRequest, Externalizable { - private long id; - private EXECUTION_MODE executionMode; - private String senderNodeName; - private String databaseName; - private long senderThreadId; - private OAbstractRemoteTask task; - private ORID userRID; // KEEP ALSO THE RID TO AVOID SECURITY PROBLEM ON DELETE & RECREATE USERS +public class OHazelcastDistributedRequest implements ODistributedRequest { + private long id; + private EXECUTION_MODE executionMode; + private String senderNodeName; + private String databaseName; + private long senderThreadId; + private ORemoteTask task; + private ORID userRID; // KEEP ALSO THE RID TO AVOID SECURITY PROBLEM ON DELETE & RECREATE USERS /** * Constructor used by serializer. @@ -49,7 +48,7 @@ public class OHazelcastDistributedRequest implements ODistributedRequest, Extern public OHazelcastDistributedRequest() { } - public OHazelcastDistributedRequest(final String senderNodeName, final String databaseName, final OAbstractRemoteTask payload, + public OHazelcastDistributedRequest(final String senderNodeName, final String databaseName, final ORemoteTask payload, EXECUTION_MODE iExecutionMode) { this.senderNodeName = senderNodeName; this.databaseName = databaseName; @@ -79,12 +78,12 @@ public OHazelcastDistributedRequest setDatabaseName(final String databaseName) { } @Override - public OAbstractRemoteTask getTask() { + public ORemoteTask getTask() { return task; } @Override - public OHazelcastDistributedRequest setTask(final OAbstractRemoteTask payload) { + public OHazelcastDistributedRequest setTask(final ORemoteTask payload) { this.task = payload; return this; } @@ -132,7 +131,7 @@ public void readExternal(final ObjectInput in) throws IOException, ClassNotFound senderNodeName = in.readUTF(); senderThreadId = in.readLong(); databaseName = in.readUTF(); - task = (OAbstractRemoteTask) in.readObject(); + task = (ORemoteTask) in.readObject(); userRID = (ORID) in.readObject(); } diff --git a/distributed/src/main/java/com/orientechnologies/orient/server/hazelcast/OHazelcastDistributedResponse.java b/distributed/src/main/java/com/orientechnologies/orient/server/hazelcast/OHazelcastDistributedResponse.java index a26785112ee..740a3de55c9 100644 --- a/distributed/src/main/java/com/orientechnologies/orient/server/hazelcast/OHazelcastDistributedResponse.java +++ b/distributed/src/main/java/com/orientechnologies/orient/server/hazelcast/OHazelcastDistributedResponse.java @@ -19,22 +19,21 @@ */ package com.orientechnologies.orient.server.hazelcast; -import com.orientechnologies.orient.server.distributed.ODistributedResponse; - -import java.io.Externalizable; import java.io.IOException; import java.io.ObjectInput; import java.io.ObjectOutput; import java.io.Serializable; import java.util.Arrays; +import com.orientechnologies.orient.server.distributed.ODistributedResponse; + /** * Hazelcast implementation of distributed peer. * * @author Luca Garulli (l.garulli--at--orientechnologies.com) * */ -public class OHazelcastDistributedResponse implements ODistributedResponse, Externalizable { +public class OHazelcastDistributedResponse implements ODistributedResponse { private long requestId; private String executorNodeName; private String senderNodeName; @@ -97,8 +96,6 @@ public void writeExternal(final ObjectOutput out) throws IOException { out.writeLong(requestId); out.writeUTF(executorNodeName); out.writeUTF(senderNodeName); -// if (payload != null && !(payload instanceof Boolean)) -// OLogManager.instance().info(this, "PAYLOAD: " + (payload != null ? (payload.getClass().getName() + "/" + payload) : null)); out.writeObject(payload); } diff --git a/distributed/src/main/java/com/orientechnologies/orient/server/hazelcast/OHazelcastPlugin.java b/distributed/src/main/java/com/orientechnologies/orient/server/hazelcast/OHazelcastPlugin.java index 7f66376f239..38aa3dbd4d7 100755 --- a/distributed/src/main/java/com/orientechnologies/orient/server/hazelcast/OHazelcastPlugin.java +++ b/distributed/src/main/java/com/orientechnologies/orient/server/hazelcast/OHazelcastPlugin.java @@ -41,6 +41,7 @@ import com.orientechnologies.orient.core.db.document.ODatabaseDocument; import com.orientechnologies.orient.core.db.document.ODatabaseDocumentTx; import com.orientechnologies.orient.core.exception.OConfigurationException; +import com.orientechnologies.orient.core.exception.ODatabaseException; import com.orientechnologies.orient.core.id.ORecordId; import com.orientechnologies.orient.core.metadata.schema.OClass; import com.orientechnologies.orient.core.metadata.schema.OClassImpl; @@ -82,26 +83,27 @@ public class OHazelcastPlugin extends ODistributedAbstractPlugin implements MembershipListener, EntryListener, OCommandOutputListener { - public static final String CONFIG_DATABASE_PREFIX = "database."; + public static final String CONFIG_DATABASE_PREFIX = "database."; - protected static final String NODE_NAME_ENV = "ORIENTDB_NODE_NAME"; - protected static final String CONFIG_NODE_PREFIX = "node."; - protected static final String CONFIG_DBSTATUS_PREFIX = "dbstatus."; - protected static final int DEPLOY_DB_MAX_RETRIES = 10; - protected String nodeId; - protected String hazelcastConfigFile = "hazelcast.xml"; - protected Map activeNodes = new ConcurrentHashMap(); - protected OHazelcastDistributedMessageService messageService; - protected long timeOffset = 0; - protected Date startedOn = new Date(); + protected static final String NODE_NAME_ENV = "ORIENTDB_NODE_NAME"; + protected static final String CONFIG_NODE_PREFIX = "node."; + protected static final String CONFIG_DBSTATUS_PREFIX = "dbstatus."; + protected static final int DEPLOY_DB_MAX_RETRIES = 10; + protected String nodeId; + protected String hazelcastConfigFile = "hazelcast.xml"; + protected Map activeNodes = new ConcurrentHashMap(); + protected OHazelcastDistributedMessageService messageService; + protected long timeOffset = 0; + protected Date startedOn = new Date(); - protected volatile NODE_STATUS status = NODE_STATUS.OFFLINE; + protected volatile NODE_STATUS status = NODE_STATUS.OFFLINE; - protected String membershipListenerRegistration; + protected String membershipListenerRegistration; - protected volatile HazelcastInstance hazelcastInstance; - protected long lastClusterChangeOn; - protected List listeners = new ArrayList(); + protected volatile HazelcastInstance hazelcastInstance; + protected long lastClusterChangeOn; + protected List listeners = new ArrayList(); + protected Map remoteServers = new ConcurrentHashMap(); public OHazelcastPlugin() { } @@ -368,7 +370,7 @@ public void setDatabaseStatus(final String iNode, final String iDatabaseName, fi @Override public Object sendRequest(final String iDatabaseName, final Collection iClusterNames, - final Collection iTargetNodes, final OAbstractRemoteTask iTask, final EXECUTION_MODE iExecutionMode) { + final Collection iTargetNodes, final ORemoteTask iTask, final EXECUTION_MODE iExecutionMode) { checkForClusterRebalance(iDatabaseName); @@ -400,6 +402,49 @@ public Object sendRequest(final String iDatabaseName, final Collection i return null; } + @Override + public ODistributedRequest createRequest() { + return new OHazelcastDistributedRequest(); + } + + @Override + public ODistributedResponse createResponse() { + return new OHazelcastDistributedResponse(); + } + + @Override + public ODistributedResponse createResponse(final long requestId, final String executorNodeName, final String senderNodeName, + final Serializable payload) { + return new OHazelcastDistributedResponse(requestId, executorNodeName, senderNodeName, payload); + } + + public ORemoteServerController getRemoteServer(final String nodeName) throws IOException { + ORemoteServerController remoteServer = remoteServers.get(nodeName); + if (remoteServer == null) { + final ODocument cfg = getNodeConfigurationById(nodeName); + + final Collection> listeners = (Collection>) cfg.field("listeners"); + if (listeners == null) + throw new ODatabaseException( + "Cannot connect to a remote node because bad distributed configuration: missing 'listeners' array field"); + + String url = null; + for (Map listener : listeners) { + if (((String) listener.get("protocol")).equals("ONetworkProtocolBinary")) { + url = (String) listener.get("listen"); + break; + } + } + + if (url == null) + throw new ODatabaseException("Cannot connect to a remote node because the url was not found"); + + remoteServer = new ORemoteServerController(url); + remoteServers.put(nodeName, remoteServer); + } + return remoteServer; + } + public Set getManagedDatabases() { return messageService != null ? messageService.getDatabases() : Collections.EMPTY_SET; } @@ -845,7 +890,7 @@ public Serializable executeOnLocalNode(final ODistributedRequest req, final ODat throw new ODistributedException("Distributed storage was not installed for database '" + database.getName() + "'. Implementation found: " + database.getStorage().getClass().getName()); - final OAbstractRemoteTask task = req.getTask(); + final ORemoteTask task = req.getTask(); try { if (database != null) diff --git a/server/src/main/java/com/orientechnologies/orient/server/distributed/OAsynchDistributedOperation.java b/server/src/main/java/com/orientechnologies/orient/server/distributed/OAsynchDistributedOperation.java index dcfc5671078..0dcdb99165a 100755 --- a/server/src/main/java/com/orientechnologies/orient/server/distributed/OAsynchDistributedOperation.java +++ b/server/src/main/java/com/orientechnologies/orient/server/distributed/OAsynchDistributedOperation.java @@ -20,30 +20,30 @@ package com.orientechnologies.orient.server.distributed; import com.orientechnologies.common.util.OCallable; -import com.orientechnologies.orient.server.distributed.task.OAbstractRemoteTask; +import com.orientechnologies.orient.server.distributed.task.ORemoteTask; import java.util.Collection; import java.util.Set; /** - * Asynchronous sistributed operation. + * Asynchronous distributed operation. * * @author Luca Garulli (l.garulli--at--orientechnologies.com) */ public class OAsynchDistributedOperation { - private final String databaseName; - private final Set clusterNames; - private final Collection nodes; - private final OAbstractRemoteTask task; - private final OCallable callback; + private final String databaseName; + private final Set clusterNames; + private final Collection nodes; + private final ORemoteTask task; + private final OCallable callback; public OAsynchDistributedOperation(final String iDatabaseName, final Set iClusterNames, final Collection iNodes, - final OAbstractRemoteTask iTask) { + final ORemoteTask iTask) { this(iDatabaseName, iClusterNames, iNodes, iTask, null); } public OAsynchDistributedOperation(final String iDatabaseName, final Set iClusterNames, final Collection iNodes, - final OAbstractRemoteTask iTask, final OCallable iCallback) { + final ORemoteTask iTask, final OCallable iCallback) { databaseName = iDatabaseName; clusterNames = iClusterNames; nodes = iNodes; @@ -59,7 +59,7 @@ public Collection getNodes() { return nodes; } - public OAbstractRemoteTask getTask() { + public ORemoteTask getTask() { return task; } diff --git a/server/src/main/java/com/orientechnologies/orient/server/distributed/ODistributedMessageService.java b/server/src/main/java/com/orientechnologies/orient/server/distributed/ODistributedMessageService.java index 82a679b5ff2..5589efb1797 100644 --- a/server/src/main/java/com/orientechnologies/orient/server/distributed/ODistributedMessageService.java +++ b/server/src/main/java/com/orientechnologies/orient/server/distributed/ODistributedMessageService.java @@ -38,4 +38,6 @@ public interface ODistributedMessageService { ODistributedDatabase unregisterDatabase(String iDatabaseName); List getManagedQueueNames(); + + long dispatchResponseToThread(final ODistributedResponse response); } diff --git a/server/src/main/java/com/orientechnologies/orient/server/distributed/ODistributedRequest.java b/server/src/main/java/com/orientechnologies/orient/server/distributed/ODistributedRequest.java index d2be01ddbde..d0f37928872 100644 --- a/server/src/main/java/com/orientechnologies/orient/server/distributed/ODistributedRequest.java +++ b/server/src/main/java/com/orientechnologies/orient/server/distributed/ODistributedRequest.java @@ -20,37 +20,39 @@ package com.orientechnologies.orient.server.distributed; import com.orientechnologies.orient.core.id.ORID; -import com.orientechnologies.orient.server.distributed.task.OAbstractRemoteTask; +import com.orientechnologies.orient.server.distributed.task.ORemoteTask; + +import java.io.Externalizable; /** - * - * @author Luca Garulli (l.garulli--at--orientechnologies.com) - * - */ - public interface ODistributedRequest { - enum EXECUTION_MODE { - RESPONSE, NO_RESPONSE - } + * + * @author Luca Garulli (l.garulli--at--orientechnologies.com) + * + */ +public interface ODistributedRequest extends Externalizable { + enum EXECUTION_MODE { + RESPONSE, NO_RESPONSE + } - long getId(); + long getId(); - void setId(long iId); + void setId(long iId); - EXECUTION_MODE getExecutionMode(); + EXECUTION_MODE getExecutionMode(); - String getDatabaseName(); + String getDatabaseName(); - ODistributedRequest setDatabaseName(final String databaseName); + ODistributedRequest setDatabaseName(final String databaseName); - String getSenderNodeName(); + String getSenderNodeName(); - ODistributedRequest setSenderNodeName(String localNodeName); + ODistributedRequest setSenderNodeName(String localNodeName); - OAbstractRemoteTask getTask(); + ORemoteTask getTask(); - ODistributedRequest setTask(final OAbstractRemoteTask payload); + ODistributedRequest setTask(final ORemoteTask payload); - ORID getUserRID(); + ORID getUserRID(); - void setUserRID(ORID iUserRID); - } + void setUserRID(ORID iUserRID); +} diff --git a/server/src/main/java/com/orientechnologies/orient/server/distributed/ODistributedResponse.java b/server/src/main/java/com/orientechnologies/orient/server/distributed/ODistributedResponse.java index 88d0f701dcf..cc7891651d8 100644 --- a/server/src/main/java/com/orientechnologies/orient/server/distributed/ODistributedResponse.java +++ b/server/src/main/java/com/orientechnologies/orient/server/distributed/ODistributedResponse.java @@ -19,12 +19,14 @@ */ package com.orientechnologies.orient.server.distributed; +import java.io.Externalizable; + /** * * @author Luca Garulli (l.garulli--at--orientechnologies.com) * */ -public interface ODistributedResponse { +public interface ODistributedResponse extends Externalizable { String getExecutorNodeName(); diff --git a/server/src/main/java/com/orientechnologies/orient/server/distributed/ODistributedResponseManager.java b/server/src/main/java/com/orientechnologies/orient/server/distributed/ODistributedResponseManager.java index 3e6009cbbb6..a1db96fb5c2 100644 --- a/server/src/main/java/com/orientechnologies/orient/server/distributed/ODistributedResponseManager.java +++ b/server/src/main/java/com/orientechnologies/orient/server/distributed/ODistributedResponseManager.java @@ -26,10 +26,7 @@ import com.orientechnologies.orient.core.id.ORecordId; import com.orientechnologies.orient.core.record.ORecordInternal; import com.orientechnologies.orient.server.distributed.ODistributedServerLog.DIRECTION; -import com.orientechnologies.orient.server.distributed.task.OAbstractRemoteTask; -import com.orientechnologies.orient.server.distributed.task.OAbstractReplicatedTask; -import com.orientechnologies.orient.server.distributed.task.OCreateRecordTask; -import com.orientechnologies.orient.server.distributed.task.ODeleteRecordTask; +import com.orientechnologies.orient.server.distributed.task.*; import java.util.ArrayList; import java.util.Collection; @@ -581,9 +578,9 @@ protected void undoRequest() { if (!realignRecordClusters()) { for (ODistributedResponse r : getReceivedResponses()) { - final OAbstractRemoteTask task = request.getTask(); + final ORemoteTask task = request.getTask(); if (task instanceof OAbstractReplicatedTask) { - final OAbstractRemoteTask undoTask = ((OAbstractReplicatedTask) task).getUndoTask(request, r.getPayload()); + final ORemoteTask undoTask = ((OAbstractReplicatedTask) task).getUndoTask(request, r.getPayload()); if (undoTask != null) { ODistributedServerLog.warn(this, dManager.getLocalNodeName(), null, DIRECTION.NONE, @@ -604,7 +601,7 @@ private boolean realignRecordClusters() { long maxClusterPos = Long.MIN_VALUE; for (ODistributedResponse r : getReceivedResponses()) { - final OAbstractRemoteTask task = request.getTask(); + final ORemoteTask task = request.getTask(); if (task instanceof OCreateRecordTask) { final Object badResponse = r.getPayload(); if (badResponse instanceof Throwable) @@ -638,7 +635,7 @@ private boolean realignRecordClusters() { // FOUND HOLE(S) for (ODistributedResponse r : getReceivedResponses()) { - final OAbstractRemoteTask task = request.getTask(); + final ORemoteTask task = request.getTask(); final OPlaceholder origPh = (OPlaceholder) r.getPayload(); @@ -681,11 +678,11 @@ protected void fixNodesInConflict(final List bestResponses if (responseGroup != bestResponsesGroup) { // CONFLICT GROUP: FIX THEM ONE BY ONE for (ODistributedResponse r : responseGroup) { - final List fixTasks = ((OAbstractReplicatedTask) request.getTask()).getFixTask(request, + final List fixTasks = ((OAbstractReplicatedTask) request.getTask()).getFixTask(request, request.getTask(), r.getPayload(), goodResponse.getPayload(), r.getExecutorNodeName(), dManager); if (fixTasks != null) { - for (OAbstractRemoteTask fixTask : fixTasks) { + for (ORemoteTask fixTask : fixTasks) { ODistributedServerLog.warn(this, dManager.getLocalNodeName(), r.getExecutorNodeName(), DIRECTION.OUT, "sending fix message (%s) for response (%s) on request (%s) to be: %s", fixTask, r, request, goodResponse); diff --git a/server/src/main/java/com/orientechnologies/orient/server/distributed/ODistributedServerManager.java b/server/src/main/java/com/orientechnologies/orient/server/distributed/ODistributedServerManager.java index ef2ffeec56e..1e6e7fa53e1 100644 --- a/server/src/main/java/com/orientechnologies/orient/server/distributed/ODistributedServerManager.java +++ b/server/src/main/java/com/orientechnologies/orient/server/distributed/ODistributedServerManager.java @@ -22,8 +22,10 @@ import com.orientechnologies.orient.core.db.ODatabaseInternal; import com.orientechnologies.orient.core.record.impl.ODocument; import com.orientechnologies.orient.server.distributed.ODistributedRequest.EXECUTION_MODE; -import com.orientechnologies.orient.server.distributed.task.OAbstractRemoteTask; +import com.orientechnologies.orient.server.distributed.task.ORemoteTask; +import java.io.IOException; +import java.io.Serializable; import java.util.Collection; import java.util.Map; import java.util.concurrent.locks.Lock; @@ -46,10 +48,19 @@ enum DB_STATUS { boolean isEnabled(); + ODistributedRequest createRequest(); + + ODistributedResponse createResponse(); + + ODistributedResponse createResponse(final long requestId, final String executorNodeName, final String senderNodeName, + final Serializable payload); + ODistributedServerManager registerLifecycleListener(ODistributedLifecycleListener iListener); ODistributedServerManager unregisterLifecycleListener(ODistributedLifecycleListener iListener); + ORemoteServerController getRemoteServer(final String nodeName) throws IOException; + Map getConfigurationMap(); long getLastClusterChangeOn(); @@ -113,8 +124,8 @@ enum DB_STATUS { ODistributedConfiguration getDatabaseConfiguration(String iDatabaseName); - Object sendRequest(String iDatabaseName, Collection iClusterNames, Collection iTargetNodeNames, - OAbstractRemoteTask iTask, EXECUTION_MODE iExecutionMode); + Object sendRequest(String iDatabaseName, Collection iClusterNames, Collection iTargetNodeNames, ORemoteTask iTask, + EXECUTION_MODE iExecutionMode); ODocument getStats(); diff --git a/server/src/main/java/com/orientechnologies/orient/server/distributed/ODistributedStorage.java b/server/src/main/java/com/orientechnologies/orient/server/distributed/ODistributedStorage.java index 85c5aeda884..5942c2aad62 100755 --- a/server/src/main/java/com/orientechnologies/orient/server/distributed/ODistributedStorage.java +++ b/server/src/main/java/com/orientechnologies/orient/server/distributed/ODistributedStorage.java @@ -1134,7 +1134,7 @@ public Object call(final Object iArgument) { return null; } else if (iArgument instanceof Exception) { try { - final OAbstractRemoteTask undo = txTask.getUndoTaskForLocalStorage(iArgument); + final ORemoteTask undo = txTask.getUndoTaskForLocalStorage(iArgument); if (undo != null) try { diff --git a/server/src/main/java/com/orientechnologies/orient/server/distributed/ORemoteServerController.java b/server/src/main/java/com/orientechnologies/orient/server/distributed/ORemoteServerController.java new file mode 100644 index 00000000000..8bcbc8d22ad --- /dev/null +++ b/server/src/main/java/com/orientechnologies/orient/server/distributed/ORemoteServerController.java @@ -0,0 +1,103 @@ +/* + * + * * Copyright 2014 Orient Technologies LTD (info(at)orientechnologies.com) + * * + * * Licensed under the Apache License, Version 2.0 (the "License"); + * * you may not use this file except in compliance with the License. + * * You may obtain a copy of the License at + * * + * * http://www.apache.org/licenses/LICENSE-2.0 + * * + * * Unless required by applicable law or agreed to in writing, software + * * distributed under the License is distributed on an "AS IS" BASIS, + * * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * * See the License for the specific language governing permissions and + * * limitations under the License. + * * + * * For more information: http://www.orientechnologies.com + * + */ +package com.orientechnologies.orient.server.distributed; + +import com.orientechnologies.orient.client.remote.OServerAdmin; +import com.orientechnologies.orient.client.remote.OStorageRemoteOperation; +import com.orientechnologies.orient.enterprise.channel.binary.OChannelBinaryAsynchClient; +import com.orientechnologies.orient.enterprise.channel.binary.OChannelBinaryProtocol; + +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.ObjectOutputStream; + +/** + * Remote server controller. + * + * @author Luca Garulli + */ +public class ORemoteServerController extends OServerAdmin { + public ORemoteServerController(final String iURL) throws IOException { + super(iURL); + } + + public void executeRequest(final ODistributedRequest req) { + networkAdminOperation(new OStorageRemoteOperation() { + @Override + public Object execute(final OChannelBinaryAsynchClient network) throws IOException { + storage.beginRequest(network, OChannelBinaryProtocol.DISTRIBUTED_REQUEST); + + try { + final byte[] serializedRequest; + final ByteArrayOutputStream out = new ByteArrayOutputStream(); + try { + final ObjectOutputStream outStream = new ObjectOutputStream(out); + try { + req.writeExternal(outStream); + serializedRequest = out.toByteArray(); + + network.writeBytes(serializedRequest); + + } finally { + outStream.close(); + } + } finally { + out.close(); + } + + } finally { + storage.endRequest(network); + } + return null; + } + }, "Cannot send distributed request"); + } + + public void sendResponse(final ODistributedResponse response) { + networkAdminOperation(new OStorageRemoteOperation() { + @Override + public Object execute(final OChannelBinaryAsynchClient network) throws IOException { + storage.beginRequest(network, OChannelBinaryProtocol.DISTRIBUTED_RESPONSE); + try { + + final ByteArrayOutputStream out = new ByteArrayOutputStream(); + try { + final ObjectOutputStream outStream = new ObjectOutputStream(out); + try { + response.writeExternal(outStream); + final byte[] serializedResponse = out.toByteArray(); + + network.writeBytes(serializedResponse); + + } finally { + outStream.close(); + } + } finally { + out.close(); + } + + } finally { + storage.endRequest(network); + } + return null; + } + }, "Cannot send response back to the sender node '" + response.getSenderNodeName() + "'"); + } +} diff --git a/server/src/main/java/com/orientechnologies/orient/server/distributed/task/OAbstractRemoteTask.java b/server/src/main/java/com/orientechnologies/orient/server/distributed/task/OAbstractRemoteTask.java index 5f54a12d7e4..e1ee4773b94 100644 --- a/server/src/main/java/com/orientechnologies/orient/server/distributed/task/OAbstractRemoteTask.java +++ b/server/src/main/java/com/orientechnologies/orient/server/distributed/task/OAbstractRemoteTask.java @@ -25,47 +25,48 @@ import com.orientechnologies.orient.server.OServer; import com.orientechnologies.orient.server.distributed.ODistributedServerManager; -import java.io.Externalizable; - /** * Base class for Tasks to be executed remotely. * * @author Luca Garulli (l.garulli--at--orientechnologies.com) * */ -public abstract class OAbstractRemoteTask implements Externalizable { +public abstract class OAbstractRemoteTask implements ORemoteTask { private static final long serialVersionUID = 1L; protected transient String nodeSource; - public enum RESULT_STRATEGY { - ANY, UNION - } - /** * Constructor used from unmarshalling. */ public OAbstractRemoteTask() { } + @Override public abstract String getName(); + @Override public abstract OCommandDistributedReplicateRequest.QUORUM_TYPE getQuorumType(); + @Override public abstract Object execute(OServer iServer, ODistributedServerManager iManager, ODatabaseDocumentTx database) throws Exception; + @Override public long getDistributedTimeout() { return OGlobalConfiguration.DISTRIBUTED_CRUD_TASK_SYNCH_TIMEOUT.getValueAsLong(); } + @Override public long getSynchronousTimeout(final int iSynchNodes) { return getDistributedTimeout() * iSynchNodes; } + @Override public long getTotalTimeout(final int iTotalNodes) { return getDistributedTimeout() * iTotalNodes; } + @Override public RESULT_STRATEGY getResultStrategy() { return RESULT_STRATEGY.ANY; } @@ -75,18 +76,22 @@ public String toString() { return getName(); } + @Override public String getNodeSource() { return nodeSource; } + @Override public void setNodeSource(String nodeSource) { this.nodeSource = nodeSource; } + @Override public boolean isRequiredOpenDatabase() { return true; } + @Override public boolean isIdempotent() { return false; } diff --git a/server/src/main/java/com/orientechnologies/orient/server/distributed/task/OAbstractReplicatedTask.java b/server/src/main/java/com/orientechnologies/orient/server/distributed/task/OAbstractReplicatedTask.java index 13b4c20a31c..192d9840505 100644 --- a/server/src/main/java/com/orientechnologies/orient/server/distributed/task/OAbstractReplicatedTask.java +++ b/server/src/main/java/com/orientechnologies/orient/server/distributed/task/OAbstractReplicatedTask.java @@ -34,12 +34,12 @@ public abstract class OAbstractReplicatedTask extends OAbstractRemoteTask { private static final long serialVersionUID = 1L; - public List getFixTask(ODistributedRequest iRequest, OAbstractRemoteTask iOriginalTask, Object iBadResponse, + public List getFixTask(ODistributedRequest iRequest, ORemoteTask iOriginalTask, Object iBadResponse, Object iGoodResponse, String executorNodeName, ODistributedServerManager dManager) { return Collections.EMPTY_LIST; } - public OAbstractRemoteTask getUndoTask(ODistributedRequest iRequest, Object iBadResponse) { + public ORemoteTask getUndoTask(ODistributedRequest iRequest, Object iBadResponse) { return null; } diff --git a/server/src/main/java/com/orientechnologies/orient/server/distributed/task/OCompletedTxTask.java b/server/src/main/java/com/orientechnologies/orient/server/distributed/task/OCompletedTxTask.java index 032cceb7f74..13c146941a7 100644 --- a/server/src/main/java/com/orientechnologies/orient/server/distributed/task/OCompletedTxTask.java +++ b/server/src/main/java/com/orientechnologies/orient/server/distributed/task/OCompletedTxTask.java @@ -19,6 +19,13 @@ */ package com.orientechnologies.orient.server.distributed.task; +import java.io.IOException; +import java.io.ObjectInput; +import java.io.ObjectOutput; +import java.util.HashSet; +import java.util.List; +import java.util.Set; + import com.orientechnologies.orient.core.command.OCommandDistributedReplicateRequest; import com.orientechnologies.orient.core.config.OGlobalConfiguration; import com.orientechnologies.orient.core.db.ODatabaseRecordThreadLocal; @@ -31,13 +38,6 @@ import com.orientechnologies.orient.server.distributed.ODistributedServerLog.DIRECTION; import com.orientechnologies.orient.server.distributed.ODistributedServerManager; -import java.io.IOException; -import java.io.ObjectInput; -import java.io.ObjectOutput; -import java.util.HashSet; -import java.util.List; -import java.util.Set; - /** * Task to manage the end of distributed transaction when no fix is needed (OFixTxTask) and all the locks must be released. Locks * are necessary to prevent concurrent modification of records before the transaction is finished. @@ -47,6 +47,7 @@ */ public class OCompletedTxTask extends OAbstractReplicatedTask { private static final long serialVersionUID = 1L; + public static final int FACTORYID = 8; private final Set locks; public OCompletedTxTask() { @@ -80,14 +81,13 @@ public OCommandDistributedReplicateRequest.QUORUM_TYPE getQuorumType() { } @Override - public List getFixTask(final ODistributedRequest iRequest, OAbstractRemoteTask iOriginalTask, final Object iBadResponse, - final Object iGoodResponse, String executorNodeName, - ODistributedServerManager dManager) { + public List getFixTask(final ODistributedRequest iRequest, ORemoteTask iOriginalTask, final Object iBadResponse, + final Object iGoodResponse, String executorNodeName, ODistributedServerManager dManager) { return null; } @Override - public OAbstractRemoteTask getUndoTask(final ODistributedRequest iRequest, final Object iBadResponse) { + public ORemoteTask getUndoTask(final ODistributedRequest iRequest, final Object iBadResponse) { return null; } @@ -126,4 +126,10 @@ public String getName() { public String getPayload() { return null; } + + @Override + public int getFactoryId() { + return FACTORYID; + } + } diff --git a/server/src/main/java/com/orientechnologies/orient/server/distributed/task/OCopyDatabaseChunkTask.java b/server/src/main/java/com/orientechnologies/orient/server/distributed/task/OCopyDatabaseChunkTask.java index 802abe745fd..4fa930278fb 100644 --- a/server/src/main/java/com/orientechnologies/orient/server/distributed/task/OCopyDatabaseChunkTask.java +++ b/server/src/main/java/com/orientechnologies/orient/server/distributed/task/OCopyDatabaseChunkTask.java @@ -19,6 +19,11 @@ */ package com.orientechnologies.orient.server.distributed.task; +import java.io.File; +import java.io.IOException; +import java.io.ObjectInput; +import java.io.ObjectOutput; + import com.orientechnologies.common.io.OFileUtils; import com.orientechnologies.orient.core.command.OCommandDistributedReplicateRequest; import com.orientechnologies.orient.core.config.OGlobalConfiguration; @@ -29,11 +34,6 @@ import com.orientechnologies.orient.server.distributed.ODistributedServerLog; import com.orientechnologies.orient.server.distributed.ODistributedServerManager; -import java.io.File; -import java.io.IOException; -import java.io.ObjectInput; -import java.io.ObjectOutput; - /** * Ask for a database chunk. * @@ -42,11 +42,12 @@ */ public class OCopyDatabaseChunkTask extends OAbstractReplicatedTask { private static final long serialVersionUID = 1L; + public static final int FACTORYID = 15; - private String fileName; - private int chunkNum; - private long offset; - private boolean compressed; + private String fileName; + private int chunkNum; + private long offset; + private boolean compressed; public OCopyDatabaseChunkTask() { } @@ -121,4 +122,10 @@ public void readExternal(final ObjectInput in) throws IOException, ClassNotFound public boolean isRequiredOpenDatabase() { return false; } + + @Override + public int getFactoryId() { + return FACTORYID; + } + } diff --git a/server/src/main/java/com/orientechnologies/orient/server/distributed/task/OCreateRecordTask.java b/server/src/main/java/com/orientechnologies/orient/server/distributed/task/OCreateRecordTask.java index da2c3c2f316..a845856bea7 100644 --- a/server/src/main/java/com/orientechnologies/orient/server/distributed/task/OCreateRecordTask.java +++ b/server/src/main/java/com/orientechnologies/orient/server/distributed/task/OCreateRecordTask.java @@ -50,11 +50,11 @@ * Distributed create record task used for synchronization. * * @author Luca Garulli (l.garulli--at--orientechnologies.com) - * */ public class OCreateRecordTask extends OAbstractRecordReplicatedTask { public static final String SUFFIX_QUEUE_NAME = ".insert"; private static final long serialVersionUID = 1L; + public static final int FACTORYID = 0; protected byte[] content; protected byte recordType; protected int clusterId = -1; @@ -145,15 +145,15 @@ public OCommandDistributedReplicateRequest.QUORUM_TYPE getQuorumType() { } @Override - public List getFixTask(final ODistributedRequest iRequest, OAbstractRemoteTask iOriginalTask, - final Object iBadResponse, final Object iGoodResponse, final String executorNode, final ODistributedServerManager dManager) { + public List getFixTask(final ODistributedRequest iRequest, ORemoteTask iOriginalTask, final Object iBadResponse, + final Object iGoodResponse, final String executorNode, final ODistributedServerManager dManager) { if (iBadResponse instanceof Throwable) return null; final OPlaceholder badResult = (OPlaceholder) iBadResponse; final OPlaceholder goodResult = (OPlaceholder) iGoodResponse; - final List result = new ArrayList(2); + final List result = new ArrayList(2); if (!badResult.equals(goodResult)) { // CREATE RECORD FAILED TO HAVE THE SAME RIDS. FORCE REALIGNING OF DATA CLUSTERS @@ -248,4 +248,9 @@ public void readExternal(final ObjectInput in) throws IOException, ClassNotFound public String getName() { return "record_create"; } + + @Override + public int getFactoryId() { + return FACTORYID; + } } diff --git a/server/src/main/java/com/orientechnologies/orient/server/distributed/task/ODeleteRecordTask.java b/server/src/main/java/com/orientechnologies/orient/server/distributed/task/ODeleteRecordTask.java index 05c35b1da6e..5c541c29400 100755 --- a/server/src/main/java/com/orientechnologies/orient/server/distributed/task/ODeleteRecordTask.java +++ b/server/src/main/java/com/orientechnologies/orient/server/distributed/task/ODeleteRecordTask.java @@ -26,12 +26,8 @@ import com.orientechnologies.orient.core.id.ORecordId; import com.orientechnologies.orient.core.record.ORecord; import com.orientechnologies.orient.server.OServer; -import com.orientechnologies.orient.server.distributed.ODistributedDatabase; -import com.orientechnologies.orient.server.distributed.ODistributedRequest; -import com.orientechnologies.orient.server.distributed.ODistributedServerLog; +import com.orientechnologies.orient.server.distributed.*; import com.orientechnologies.orient.server.distributed.ODistributedServerLog.DIRECTION; -import com.orientechnologies.orient.server.distributed.ODistributedServerManager; -import com.orientechnologies.orient.server.distributed.ODistributedStorage; import java.io.IOException; import java.io.ObjectInput; @@ -43,12 +39,12 @@ * Distributed delete record task used for synchronization. * * @author Luca Garulli (l.garulli--at--orientechnologies.com) - * */ public class ODeleteRecordTask extends OAbstractRecordReplicatedTask { - private static final long serialVersionUID = 1L; - private boolean delayed = false; - private transient boolean lockRecord = true; + private static final long serialVersionUID = 1L; + public static final int FACTORYID = 4; + private boolean delayed = false; + private transient boolean lockRecord = true; public ODeleteRecordTask() { } @@ -65,8 +61,9 @@ public ORecord getRecord() { @Override public Object execute(final OServer iServer, ODistributedServerManager iManager, final ODatabaseDocumentTx database) throws Exception { - ODistributedServerLog.debug(this, iManager.getLocalNodeName(), null, DIRECTION.IN, "delete record %s/%s v.%d", - database.getName(), rid.toString(), version); + ODistributedServerLog + .debug(this, iManager.getLocalNodeName(), null, DIRECTION.IN, "delete record %s/%s v.%d", database.getName(), + rid.toString(), version); // TRY LOCKING RECORD final ODistributedDatabase ddb = iManager.getMessageService().getDatabase(database.getName()); @@ -102,16 +99,16 @@ public OCommandDistributedReplicateRequest.QUORUM_TYPE getQuorumType() { } @Override - public List getFixTask(final ODistributedRequest iRequest, final OAbstractRemoteTask iOriginalTask, final Object iBadResponse, final Object iGoodResponse, - String executorNodeName, ODistributedServerManager dManager) { - final List fixTasks = new ArrayList(1); + public List getFixTask(final ODistributedRequest iRequest, final ORemoteTask iOriginalTask, + final Object iBadResponse, final Object iGoodResponse, String executorNodeName, ODistributedServerManager dManager) { + final List fixTasks = new ArrayList(1); fixTasks.add(new OResurrectRecordTask(rid, version)); return fixTasks; } @Override - public OAbstractRemoteTask getUndoTask(final ODistributedRequest iRequest, final Object iBadResponse) { + public ORemoteTask getUndoTask(final ODistributedRequest iRequest, final Object iBadResponse) { return new OResurrectRecordTask(rid, version); } @@ -150,4 +147,10 @@ public ODeleteRecordTask setDelayed(final boolean delayed) { public String toString() { return getName() + "(" + rid + " delayed=" + delayed + ")"; } + + @Override + public int getFactoryId() { + return FACTORYID; + } + } diff --git a/server/src/main/java/com/orientechnologies/orient/server/distributed/task/OFixTxTask.java b/server/src/main/java/com/orientechnologies/orient/server/distributed/task/OFixTxTask.java index cecd0c8fbd9..1f5c4fabc1d 100644 --- a/server/src/main/java/com/orientechnologies/orient/server/distributed/task/OFixTxTask.java +++ b/server/src/main/java/com/orientechnologies/orient/server/distributed/task/OFixTxTask.java @@ -19,6 +19,15 @@ */ package com.orientechnologies.orient.server.distributed.task; +import java.io.IOException; +import java.io.ObjectInput; +import java.io.ObjectOutput; +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; +import java.util.Set; +import java.util.concurrent.Callable; + import com.orientechnologies.common.log.OLogManager; import com.orientechnologies.orient.core.command.OCommandDistributedReplicateRequest; import com.orientechnologies.orient.core.db.ODatabaseRecordThreadLocal; @@ -31,24 +40,16 @@ import com.orientechnologies.orient.server.distributed.ODistributedServerLog.DIRECTION; import com.orientechnologies.orient.server.distributed.ODistributedServerManager; -import java.io.IOException; -import java.io.ObjectInput; -import java.io.ObjectOutput; -import java.util.ArrayList; -import java.util.Collection; -import java.util.List; -import java.util.Set; -import java.util.concurrent.Callable; - /** * Distributed create record task used for synchronization. * * @author Luca Garulli (l.garulli--at--orientechnologies.com) */ public class OFixTxTask extends OAbstractRemoteTask { - private static final long serialVersionUID = 1L; - private List tasks = new ArrayList(); - private Set locks; + private static final long serialVersionUID = 1L; + public static final int FACTORYID = 9; + private List tasks = new ArrayList(); + private Set locks; public OFixTxTask() { } @@ -57,15 +58,15 @@ public OFixTxTask(final Set iLocks) { locks = iLocks; } - public List getTasks() { + public List getTasks() { return tasks; } - public void add(final OAbstractRemoteTask iTask) { + public void add(final ORemoteTask iTask) { tasks.add(iTask); } - public void addAll(final Collection iTasks) { + public void addAll(final Collection iTasks) { tasks.addAll(iTasks); } @@ -80,7 +81,7 @@ public Object execute(final OServer iServer, final ODistributedServerManager iMa OScenarioThreadLocal.executeAsDistributed(new Callable() { @Override public Object call() throws Exception { - for (OAbstractRemoteTask task : tasks) { + for (ORemoteTask task : tasks) { if (task instanceof OAbstractRecordReplicatedTask) // AVOID LOCKING RECORDS AGAIN BECAUSE ARE ALREADY LOCKED ((OAbstractRecordReplicatedTask) task).setLockRecord(false); @@ -114,7 +115,7 @@ public OCommandDistributedReplicateRequest.QUORUM_TYPE getQuorumType() { public void writeExternal(final ObjectOutput out) throws IOException { // TASKS out.writeInt(tasks.size()); - for (OAbstractRemoteTask task : tasks) + for (ORemoteTask task : tasks) out.writeObject(task); // LOCKS out.writeInt(locks.size()); @@ -127,7 +128,7 @@ public void readExternal(final ObjectInput in) throws IOException, ClassNotFound // TASKS final int size = in.readInt(); for (int i = 0; i < size; ++i) - tasks.add((OAbstractRemoteTask) in.readObject()); + tasks.add((ORemoteTask) in.readObject()); // LOCKS final int lockSize = in.readInt(); for (int i = 0; i < lockSize; ++i) @@ -138,4 +139,10 @@ public void readExternal(final ObjectInput in) throws IOException, ClassNotFound public String getName() { return "fix_tx"; } + + @Override + public int getFactoryId() { + return FACTORYID; + } + } diff --git a/server/src/main/java/com/orientechnologies/orient/server/distributed/task/OReadRecordIfNotLatestTask.java b/server/src/main/java/com/orientechnologies/orient/server/distributed/task/OReadRecordIfNotLatestTask.java index 6cdf4997105..7b593e54ba4 100755 --- a/server/src/main/java/com/orientechnologies/orient/server/distributed/task/OReadRecordIfNotLatestTask.java +++ b/server/src/main/java/com/orientechnologies/orient/server/distributed/task/OReadRecordIfNotLatestTask.java @@ -33,6 +33,7 @@ public class OReadRecordIfNotLatestTask extends OAbstractRemoteTask { private static final long serialVersionUID = 1L; + public static final int FACTORYID = 2; protected ORecordId rid; protected int recordVersion; @@ -81,4 +82,10 @@ public String getName() { public boolean isIdempotent() { return true; } + + @Override + public int getFactoryId() { + return FACTORYID; + } + } diff --git a/server/src/main/java/com/orientechnologies/orient/server/distributed/task/OReadRecordTask.java b/server/src/main/java/com/orientechnologies/orient/server/distributed/task/OReadRecordTask.java index 5ac19e94fe0..332455a84cf 100755 --- a/server/src/main/java/com/orientechnologies/orient/server/distributed/task/OReadRecordTask.java +++ b/server/src/main/java/com/orientechnologies/orient/server/distributed/task/OReadRecordTask.java @@ -19,10 +19,6 @@ */ package com.orientechnologies.orient.server.distributed.task; -import java.io.IOException; -import java.io.ObjectInput; -import java.io.ObjectOutput; - import com.orientechnologies.orient.core.command.OCommandDistributedReplicateRequest; import com.orientechnologies.orient.core.db.document.ODatabaseDocumentTx; import com.orientechnologies.orient.core.id.ORecordId; @@ -31,16 +27,20 @@ import com.orientechnologies.orient.server.OServer; import com.orientechnologies.orient.server.distributed.ODistributedServerManager; +import java.io.IOException; +import java.io.ObjectInput; +import java.io.ObjectOutput; + /** * Execute a read of a record from a distributed node. * * @author Luca Garulli (l.garulli--at--orientechnologies.com) - * */ public class OReadRecordTask extends OAbstractRemoteTask { private static final long serialVersionUID = 1L; + public static final int FACTORYID = 1; - protected ORecordId rid; + protected ORecordId rid; public OReadRecordTask() { } @@ -87,4 +87,10 @@ public String toString() { public boolean isIdempotent() { return true; } + + @Override + public int getFactoryId() { + return FACTORYID; + } + } diff --git a/server/src/main/java/com/orientechnologies/orient/server/distributed/task/ORemoteTask.java b/server/src/main/java/com/orientechnologies/orient/server/distributed/task/ORemoteTask.java new file mode 100644 index 00000000000..4cded2af083 --- /dev/null +++ b/server/src/main/java/com/orientechnologies/orient/server/distributed/task/ORemoteTask.java @@ -0,0 +1,62 @@ +/* + * + * * Copyright 2014 Orient Technologies LTD (info(at)orientechnologies.com) + * * + * * Licensed under the Apache License, Version 2.0 (the "License"); + * * you may not use this file except in compliance with the License. + * * You may obtain a copy of the License at + * * + * * http://www.apache.org/licenses/LICENSE-2.0 + * * + * * Unless required by applicable law or agreed to in writing, software + * * distributed under the License is distributed on an "AS IS" BASIS, + * * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * * See the License for the specific language governing permissions and + * * limitations under the License. + * * + * * For more information: http://www.orientechnologies.com + * + */ +package com.orientechnologies.orient.server.distributed.task; + +import com.orientechnologies.orient.core.command.OCommandDistributedReplicateRequest; +import com.orientechnologies.orient.core.db.document.ODatabaseDocumentTx; +import com.orientechnologies.orient.server.OServer; +import com.orientechnologies.orient.server.distributed.ODistributedServerManager; + +import java.io.Externalizable; + +/** + * Remote Task interface. + * + * @author Luca Garulli + */ +public interface ORemoteTask extends Externalizable { + enum RESULT_STRATEGY { + ANY, UNION + } + + String getName(); + + OCommandDistributedReplicateRequest.QUORUM_TYPE getQuorumType(); + + Object execute(OServer iServer, ODistributedServerManager iManager, ODatabaseDocumentTx database) throws Exception; + + long getDistributedTimeout(); + + long getSynchronousTimeout(final int iSynchNodes); + + long getTotalTimeout(final int iTotalNodes); + + OAbstractRemoteTask.RESULT_STRATEGY getResultStrategy(); + + String getNodeSource(); + + void setNodeSource(String nodeSource); + + boolean isRequiredOpenDatabase(); + + boolean isIdempotent(); + + int getFactoryId(); +} \ No newline at end of file diff --git a/server/src/main/java/com/orientechnologies/orient/server/distributed/task/ORemoteTaskFactory.java b/server/src/main/java/com/orientechnologies/orient/server/distributed/task/ORemoteTaskFactory.java new file mode 100644 index 00000000000..23b1d16bd63 --- /dev/null +++ b/server/src/main/java/com/orientechnologies/orient/server/distributed/task/ORemoteTaskFactory.java @@ -0,0 +1,81 @@ +/* + * + * * Copyright 2014 Orient Technologies LTD (info(at)orientechnologies.com) + * * + * * Licensed under the Apache License, Version 2.0 (the "License"); + * * you may not use this file except in compliance with the License. + * * You may obtain a copy of the License at + * * + * * http://www.apache.org/licenses/LICENSE-2.0 + * * + * * Unless required by applicable law or agreed to in writing, software + * * distributed under the License is distributed on an "AS IS" BASIS, + * * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * * See the License for the specific language governing permissions and + * * limitations under the License. + * * + * * For more information: http://www.orientechnologies.com + * + */ +package com.orientechnologies.orient.server.distributed.task; + +/** + * Factory of remote tasks. + * + * @author Luca Garulli (l.garulli--at--orientechnologies.com) + */ +public class ORemoteTaskFactory { + public static ORemoteTask createTask(final int code) { + switch (code) { + case OCreateRecordTask.FACTORYID: // 0 + return new OCreateRecordTask(); + + case OReadRecordTask.FACTORYID: // 1 + return new OReadRecordTask(); + + case OReadRecordIfNotLatestTask.FACTORYID: // 2 + return new OReadRecordIfNotLatestTask(); + + case OUpdateRecordTask.FACTORYID: // 3 + return new OUpdateRecordTask(); + + case ODeleteRecordTask.FACTORYID: // 4 + return new ODeleteRecordTask(); + + case OSQLCommandTask.FACTORYID: // 5 + return new OSQLCommandTask(); + + case OScriptTask.FACTORYID: // 6 + return new OScriptTask(); + + case OTxTask.FACTORYID: // 7 + return new OTxTask(); + + case OCompletedTxTask.FACTORYID: // 8 + return new OCompletedTxTask(); + + case OFixTxTask.FACTORYID: // 9 + return new OFixTxTask(); + + case ORestartNodeTask.FACTORYID: // 10 + return new ORestartNodeTask(); + + case OResurrectRecordTask.FACTORYID: // 11 + return new OResurrectRecordTask(); + + case OSyncClusterTask.FACTORYID: // 12 + return new OSyncClusterTask(); + + case OSyncDatabaseDeltaTask.FACTORYID: // 13 + return new OSyncDatabaseDeltaTask(); + + case OSyncDatabaseTask.FACTORYID: // 14 + return new OSyncDatabaseTask(); + + case OCopyDatabaseChunkTask.FACTORYID: // 15 + return new OCopyDatabaseChunkTask(); + } + + throw new IllegalArgumentException("Task with code " + code + " is not supported"); + } +} diff --git a/server/src/main/java/com/orientechnologies/orient/server/distributed/task/ORestartNodeTask.java b/server/src/main/java/com/orientechnologies/orient/server/distributed/task/ORestartNodeTask.java index 85da1107a0b..d76a5e19f14 100755 --- a/server/src/main/java/com/orientechnologies/orient/server/distributed/task/ORestartNodeTask.java +++ b/server/src/main/java/com/orientechnologies/orient/server/distributed/task/ORestartNodeTask.java @@ -19,6 +19,11 @@ */ package com.orientechnologies.orient.server.distributed.task; +import java.io.IOException; +import java.io.ObjectInput; +import java.io.ObjectOutput; +import java.util.TimerTask; + import com.orientechnologies.orient.core.Orient; import com.orientechnologies.orient.core.command.OCommandDistributedReplicateRequest; import com.orientechnologies.orient.core.db.document.ODatabaseDocumentTx; @@ -26,11 +31,6 @@ import com.orientechnologies.orient.server.distributed.ODistributedServerLog; import com.orientechnologies.orient.server.distributed.ODistributedServerManager; -import java.io.IOException; -import java.io.ObjectInput; -import java.io.ObjectOutput; -import java.util.TimerTask; - /** * Distributed task to restart a node. * @@ -39,6 +39,7 @@ */ public class ORestartNodeTask extends OAbstractRemoteTask { private static final long serialVersionUID = 1L; + public static final int FACTORYID = 10; public ORestartNodeTask() { } @@ -82,4 +83,10 @@ public void writeExternal(ObjectOutput out) throws IOException { @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException { } + + @Override + public int getFactoryId() { + return FACTORYID; + } + } diff --git a/server/src/main/java/com/orientechnologies/orient/server/distributed/task/OResurrectRecordTask.java b/server/src/main/java/com/orientechnologies/orient/server/distributed/task/OResurrectRecordTask.java index 921674c5314..4c81f2fae8f 100644 --- a/server/src/main/java/com/orientechnologies/orient/server/distributed/task/OResurrectRecordTask.java +++ b/server/src/main/java/com/orientechnologies/orient/server/distributed/task/OResurrectRecordTask.java @@ -19,6 +19,10 @@ */ package com.orientechnologies.orient.server.distributed.task; +import java.io.IOException; +import java.io.ObjectInput; +import java.io.ObjectOutput; + import com.orientechnologies.orient.core.command.OCommandDistributedReplicateRequest; import com.orientechnologies.orient.core.db.document.ODatabaseDocumentTx; import com.orientechnologies.orient.core.id.ORecordId; @@ -28,10 +32,6 @@ import com.orientechnologies.orient.server.distributed.ODistributedServerManager; import com.orientechnologies.orient.server.distributed.ODistributedStorage; -import java.io.IOException; -import java.io.ObjectInput; -import java.io.ObjectOutput; - /** * Distributed task to fix delete record in conflict on synchronization. * @@ -40,6 +40,7 @@ */ public class OResurrectRecordTask extends OAbstractRemoteTask { private static final long serialVersionUID = 1L; + public static final int FACTORYID = 11; private ORecordId rid; private int version; @@ -96,4 +97,10 @@ public void readExternal(final ObjectInput in) throws IOException, ClassNotFound public String getName() { return "fix_record_delete"; } + + @Override + public int getFactoryId() { + return FACTORYID; + } + } diff --git a/server/src/main/java/com/orientechnologies/orient/server/distributed/task/OSQLCommandTask.java b/server/src/main/java/com/orientechnologies/orient/server/distributed/task/OSQLCommandTask.java index 00aae1e91a7..38fb808b2a0 100755 --- a/server/src/main/java/com/orientechnologies/orient/server/distributed/task/OSQLCommandTask.java +++ b/server/src/main/java/com/orientechnologies/orient/server/distributed/task/OSQLCommandTask.java @@ -53,6 +53,7 @@ */ public class OSQLCommandTask extends OAbstractCommandTask { private static final long serialVersionUID = 1L; + public static final int FACTORYID = 5; protected String text; protected Map params; @@ -171,4 +172,9 @@ public String toString() { public String getPayload() { return text; } + @Override + public int getFactoryId() { + return FACTORYID; + } + } diff --git a/server/src/main/java/com/orientechnologies/orient/server/distributed/task/OScriptTask.java b/server/src/main/java/com/orientechnologies/orient/server/distributed/task/OScriptTask.java index fb43c7d8454..a26032dba3f 100644 --- a/server/src/main/java/com/orientechnologies/orient/server/distributed/task/OScriptTask.java +++ b/server/src/main/java/com/orientechnologies/orient/server/distributed/task/OScriptTask.java @@ -19,6 +19,11 @@ */ package com.orientechnologies.orient.server.distributed.task; +import java.io.IOException; +import java.io.ObjectInput; +import java.io.ObjectOutput; +import java.util.Map; + import com.orientechnologies.orient.core.command.OCommandDistributedReplicateRequest; import com.orientechnologies.orient.core.command.OCommandRequest; import com.orientechnologies.orient.core.command.OCommandRequestText; @@ -30,11 +35,6 @@ import com.orientechnologies.orient.server.distributed.ODistributedServerLog.DIRECTION; import com.orientechnologies.orient.server.distributed.ODistributedServerManager; -import java.io.IOException; -import java.io.ObjectInput; -import java.io.ObjectOutput; -import java.util.Map; - /** * Executes a script on distributed servers. * @@ -43,6 +43,7 @@ */ public class OScriptTask extends OAbstractCommandTask { private static final long serialVersionUID = 1L; + public static final int FACTORYID = 6; protected String text; protected Map params; @@ -118,4 +119,10 @@ public String toString() { public String getPayload() { return text; } + + @Override + public int getFactoryId() { + return FACTORYID; + } + } diff --git a/server/src/main/java/com/orientechnologies/orient/server/distributed/task/OSyncClusterTask.java b/server/src/main/java/com/orientechnologies/orient/server/distributed/task/OSyncClusterTask.java index 4e6816859e0..169e1e273aa 100644 --- a/server/src/main/java/com/orientechnologies/orient/server/distributed/task/OSyncClusterTask.java +++ b/server/src/main/java/com/orientechnologies/orient/server/distributed/task/OSyncClusterTask.java @@ -19,6 +19,10 @@ */ package com.orientechnologies.orient.server.distributed.task; +import java.io.*; +import java.util.UUID; +import java.util.concurrent.locks.Lock; + import com.orientechnologies.common.io.OFileUtils; import com.orientechnologies.common.log.OLogManager; import com.orientechnologies.orient.core.Orient; @@ -36,14 +40,6 @@ import com.orientechnologies.orient.server.distributed.ODistributedServerLog.DIRECTION; import com.orientechnologies.orient.server.distributed.ODistributedServerManager; -import java.io.File; -import java.io.FileOutputStream; -import java.io.IOException; -import java.io.ObjectInput; -import java.io.ObjectOutput; -import java.util.UUID; -import java.util.concurrent.locks.Lock; - /** * Ask for deployment of single cluster from a remote node. * @@ -53,6 +49,7 @@ public class OSyncClusterTask extends OAbstractReplicatedTask { public final static int CHUNK_MAX_SIZE = 4194304; // 4MB public static final String DEPLOYCLUSTER = "deploycluster."; + public static final int FACTORYID = 12; public enum MODE { FULL_REPLACE, MERGE @@ -235,4 +232,9 @@ public boolean isRequiredOpenDatabase() { return true; } + @Override + public int getFactoryId() { + return FACTORYID; + } + } diff --git a/server/src/main/java/com/orientechnologies/orient/server/distributed/task/OSyncDatabaseDeltaTask.java b/server/src/main/java/com/orientechnologies/orient/server/distributed/task/OSyncDatabaseDeltaTask.java index f684878c16d..edab87bf094 100644 --- a/server/src/main/java/com/orientechnologies/orient/server/distributed/task/OSyncDatabaseDeltaTask.java +++ b/server/src/main/java/com/orientechnologies/orient/server/distributed/task/OSyncDatabaseDeltaTask.java @@ -19,6 +19,11 @@ */ package com.orientechnologies.orient.server.distributed.task; +import java.io.*; +import java.util.UUID; +import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.atomic.AtomicReference; + import com.orientechnologies.common.exception.OException; import com.orientechnologies.common.io.OFileUtils; import com.orientechnologies.common.log.OLogManager; @@ -36,11 +41,6 @@ import com.orientechnologies.orient.server.distributed.ODistributedServerLog.DIRECTION; import com.orientechnologies.orient.server.distributed.ODistributedServerManager; -import java.io.*; -import java.util.UUID; -import java.util.concurrent.atomic.AtomicLong; -import java.util.concurrent.atomic.AtomicReference; - /** * Ask for synchronization of delta of chanegs on database from a remote node. * @@ -49,6 +49,7 @@ public class OSyncDatabaseDeltaTask extends OAbstractReplicatedTask { public final static int CHUNK_MAX_SIZE = 4194304; // 4MB public static final String DEPLOYDB = "deploydb."; + public static final int FACTORYID = 13; protected OLogSequenceNumber startLSN; protected long random; @@ -229,6 +230,7 @@ public void readExternal(final ObjectInput in) throws IOException, ClassNotFound public boolean isRequiredOpenDatabase() { return true; } + // // public static void dumpClusters(OAbstractPaginatedStorage storage) { // OLogManager.instance().flush(); @@ -241,4 +243,9 @@ public boolean isRequiredOpenDatabase() { // OLogManager.instance().info(storage, "***************************************"); // OLogManager.instance().flush(); // } + @Override + public int getFactoryId() { + return FACTORYID; + } + } diff --git a/server/src/main/java/com/orientechnologies/orient/server/distributed/task/OSyncDatabaseTask.java b/server/src/main/java/com/orientechnologies/orient/server/distributed/task/OSyncDatabaseTask.java index d17500defda..07c076e215b 100644 --- a/server/src/main/java/com/orientechnologies/orient/server/distributed/task/OSyncDatabaseTask.java +++ b/server/src/main/java/com/orientechnologies/orient/server/distributed/task/OSyncDatabaseTask.java @@ -19,6 +19,13 @@ */ package com.orientechnologies.orient.server.distributed.task; +import java.io.*; +import java.util.UUID; +import java.util.concurrent.Callable; +import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.atomic.AtomicReference; +import java.util.concurrent.locks.Lock; + import com.orientechnologies.common.io.OFileUtils; import com.orientechnologies.common.log.OLogManager; import com.orientechnologies.orient.core.Orient; @@ -29,23 +36,8 @@ import com.orientechnologies.orient.core.storage.impl.local.OAbstractPaginatedStorage; import com.orientechnologies.orient.core.storage.impl.local.paginated.wal.OLogSequenceNumber; import com.orientechnologies.orient.server.OServer; -import com.orientechnologies.orient.server.distributed.ODistributedDatabaseChunk; -import com.orientechnologies.orient.server.distributed.ODistributedException; -import com.orientechnologies.orient.server.distributed.ODistributedServerLog; +import com.orientechnologies.orient.server.distributed.*; import com.orientechnologies.orient.server.distributed.ODistributedServerLog.DIRECTION; -import com.orientechnologies.orient.server.distributed.ODistributedServerManager; -import com.orientechnologies.orient.server.distributed.ODistributedStorage; - -import java.io.File; -import java.io.FileOutputStream; -import java.io.IOException; -import java.io.ObjectInput; -import java.io.ObjectOutput; -import java.util.UUID; -import java.util.concurrent.Callable; -import java.util.concurrent.atomic.AtomicLong; -import java.util.concurrent.atomic.AtomicReference; -import java.util.concurrent.locks.Lock; /** * Ask for synchronization of database from a remote node. @@ -55,8 +47,9 @@ public class OSyncDatabaseTask extends OAbstractReplicatedTask implements OCommandOutputListener { public final static int CHUNK_MAX_SIZE = 4194304; // 4MB public static final String DEPLOYDB = "deploydb."; + public static final int FACTORYID = 14; - protected long random; + protected long random; public OSyncDatabaseTask() { random = UUID.randomUUID().getLeastSignificantBits(); @@ -258,4 +251,9 @@ public boolean isRequiredOpenDatabase() { return true; } + @Override + public int getFactoryId() { + return FACTORYID; + } + } diff --git a/server/src/main/java/com/orientechnologies/orient/server/distributed/task/OTxTask.java b/server/src/main/java/com/orientechnologies/orient/server/distributed/task/OTxTask.java index cea67245ab8..787a1e9f49f 100644 --- a/server/src/main/java/com/orientechnologies/orient/server/distributed/task/OTxTask.java +++ b/server/src/main/java/com/orientechnologies/orient/server/distributed/task/OTxTask.java @@ -19,6 +19,14 @@ */ package com.orientechnologies.orient.server.distributed.task; +import java.io.IOException; +import java.io.ObjectInput; +import java.io.ObjectOutput; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashSet; +import java.util.List; + import com.orientechnologies.common.concur.ONeedRetryException; import com.orientechnologies.common.log.OLogManager; import com.orientechnologies.orient.core.command.OCommandDistributedReplicateRequest; @@ -43,14 +51,6 @@ import com.orientechnologies.orient.server.distributed.ODistributedServerLog.DIRECTION; import com.orientechnologies.orient.server.distributed.ODistributedServerManager; -import java.io.IOException; -import java.io.ObjectInput; -import java.io.ObjectOutput; -import java.util.ArrayList; -import java.util.Collections; -import java.util.HashSet; -import java.util.List; - /** * Distributed transaction task. * @@ -59,6 +59,7 @@ */ public class OTxTask extends OAbstractReplicatedTask { private static final long serialVersionUID = 1L; + public static final int FACTORYID = 7; private List tasks = new ArrayList(); private transient OTxTaskResult result; @@ -197,8 +198,8 @@ public OCommandDistributedReplicateRequest.QUORUM_TYPE getQuorumType() { } @Override - public List getFixTask(final ODistributedRequest iRequest, OAbstractRemoteTask iOriginalTask, - final Object iBadResponse, final Object iGoodResponse, String executorNodeName, ODistributedServerManager dManager) { + public List getFixTask(final ODistributedRequest iRequest, ORemoteTask iOriginalTask, final Object iBadResponse, + final Object iGoodResponse, String executorNodeName, ODistributedServerManager dManager) { if (!(iBadResponse instanceof OTxTaskResult)) { // TODO: MANAGE ERROR ON LOCAL NODE ODistributedServerLog.debug(this, getNodeSource(), null, DIRECTION.NONE, @@ -213,7 +214,7 @@ public List getFixTask(final ODistributedRequest iRequest, return Collections.EMPTY_LIST; } - final List fixTasks = new ArrayList(); + final List fixTasks = new ArrayList(); final OFixTxTask fixTask = new OFixTxTask(((OTxTaskResult) iBadResponse).locks); fixTasks.add(fixTask); @@ -224,7 +225,7 @@ public List getFixTask(final ODistributedRequest iRequest, final Object badResult = ((OTxTaskResult) iBadResponse).results.get(i); final Object goodResult = ((OTxTaskResult) iGoodResponse).results.get(i); - final List tasks = t.getFixTask(iRequest, t, badResult, goodResult, executorNodeName, dManager); + final List tasks = t.getFixTask(iRequest, t, badResult, goodResult, executorNodeName, dManager); if (tasks != null) fixTask.addAll(tasks); } @@ -233,7 +234,7 @@ public List getFixTask(final ODistributedRequest iRequest, } @Override - public OAbstractRemoteTask getUndoTask(final ODistributedRequest iRequest, final Object iBadResponse) { + public ORemoteTask getUndoTask(final ODistributedRequest iRequest, final Object iBadResponse) { if (result == null) // NO RESULT: NO UNDO NEEDED return null; @@ -241,13 +242,13 @@ public OAbstractRemoteTask getUndoTask(final ODistributedRequest iRequest, final return getUndoTaskForLocalStorage(iBadResponse); } - public OAbstractRemoteTask getUndoTaskForLocalStorage(final Object iBadResponse) { + public ORemoteTask getUndoTaskForLocalStorage(final Object iBadResponse) { final OFixTxTask fixTask = new OFixTxTask(result != null ? result.locks : new HashSet()); for (int i = 0; i < tasks.size(); ++i) { final OAbstractRecordReplicatedTask t = tasks.get(i); - final OAbstractRemoteTask undoTask; + final ORemoteTask undoTask; if (iBadResponse instanceof List) undoTask = t.getUndoTask(null, ((List) iBadResponse).get(i)); else @@ -306,4 +307,10 @@ public String getPayload() { public List getTasks() { return tasks; } + + @Override + public int getFactoryId() { + return FACTORYID; + } + } diff --git a/server/src/main/java/com/orientechnologies/orient/server/distributed/task/OUpdateRecordTask.java b/server/src/main/java/com/orientechnologies/orient/server/distributed/task/OUpdateRecordTask.java index acaa8307035..5410d8740d8 100755 --- a/server/src/main/java/com/orientechnologies/orient/server/distributed/task/OUpdateRecordTask.java +++ b/server/src/main/java/com/orientechnologies/orient/server/distributed/task/OUpdateRecordTask.java @@ -49,6 +49,7 @@ */ public class OUpdateRecordTask extends OAbstractRecordReplicatedTask { private static final long serialVersionUID = 1L; + public static final int FACTORYID = 3; protected byte[] previousContent; protected int previousVersion; @@ -146,17 +147,17 @@ public OCommandDistributedReplicateRequest.QUORUM_TYPE getQuorumType() { } @Override - public List getFixTask(final ODistributedRequest iRequest, OAbstractRemoteTask iOriginalTask, final Object iBadResponse, final Object iGoodResponse, + public List getFixTask(final ODistributedRequest iRequest, ORemoteTask iOriginalTask, final Object iBadResponse, final Object iGoodResponse, String executorNodeName, ODistributedServerManager dManager) { final int versionCopy = ORecordVersionHelper.setRollbackMode(previousVersion); - final List fixTasks = new ArrayList(1); + final List fixTasks = new ArrayList(1); fixTasks.add(new OUpdateRecordTask(rid, null, -1, ((OUpdateRecordTask) iOriginalTask).content, versionCopy, recordType)); return fixTasks; } @Override - public OAbstractRemoteTask getUndoTask(final ODistributedRequest iRequest, final Object iBadResponse) { + public ORemoteTask getUndoTask(final ODistributedRequest iRequest, final Object iBadResponse) { final int versionCopy = ORecordVersionHelper.setRollbackMode(previousVersion); return new OUpdateRecordTask(rid, null, -1, previousContent, versionCopy, recordType); } @@ -210,4 +211,9 @@ public String toString() { public byte[] getContent() { return content; } + @Override + public int getFactoryId() { + return FACTORYID; + } + } diff --git a/server/src/main/java/com/orientechnologies/orient/server/network/protocol/binary/ONetworkProtocolBinary.java b/server/src/main/java/com/orientechnologies/orient/server/network/protocol/binary/ONetworkProtocolBinary.java index 5ba305a3b9e..4e3bd4ffa36 100755 --- a/server/src/main/java/com/orientechnologies/orient/server/network/protocol/binary/ONetworkProtocolBinary.java +++ b/server/src/main/java/com/orientechnologies/orient/server/network/protocol/binary/ONetworkProtocolBinary.java @@ -19,20 +19,10 @@ */ package com.orientechnologies.orient.server.network.protocol.binary; -import java.io.IOException; -import java.io.ObjectOutputStream; -import java.net.Socket; -import java.net.SocketException; -import java.util.Collection; -import java.util.HashSet; -import java.util.List; -import java.util.Map; -import java.util.Map.Entry; -import java.util.Set; -import java.util.UUID; - import com.orientechnologies.common.collection.OMultiValue; import com.orientechnologies.common.concur.lock.OLockException; +import com.orientechnologies.common.concur.lock.OModificationOperationProhibitedException; +import com.orientechnologies.common.exception.OException; import com.orientechnologies.common.exception.OSystemException; import com.orientechnologies.common.io.OIOException; import com.orientechnologies.common.log.OLogManager; @@ -57,11 +47,7 @@ import com.orientechnologies.orient.core.db.record.ridbag.sbtree.OBonsaiCollectionPointer; import com.orientechnologies.orient.core.db.record.ridbag.sbtree.OSBTreeCollectionManager; import com.orientechnologies.orient.core.db.record.ridbag.sbtree.OSBTreeRidBag; -import com.orientechnologies.orient.core.exception.OConfigurationException; -import com.orientechnologies.orient.core.exception.ODatabaseException; -import com.orientechnologies.orient.core.exception.OSecurityAccessException; -import com.orientechnologies.orient.core.exception.OStorageException; -import com.orientechnologies.orient.core.exception.OTransactionAbortedException; +import com.orientechnologies.orient.core.exception.*; import com.orientechnologies.orient.core.fetch.OFetchContext; import com.orientechnologies.orient.core.fetch.OFetchHelper; import com.orientechnologies.orient.core.fetch.OFetchListener; @@ -89,12 +75,7 @@ import com.orientechnologies.orient.core.sql.query.OResultSet; import com.orientechnologies.orient.core.sql.query.OSQLAsynchQuery; import com.orientechnologies.orient.core.sql.query.OSQLSynchQuery; -import com.orientechnologies.orient.core.storage.OCluster; -import com.orientechnologies.orient.core.storage.OPhysicalPosition; -import com.orientechnologies.orient.core.storage.ORecordDuplicatedException; -import com.orientechnologies.orient.core.storage.ORecordMetadata; -import com.orientechnologies.orient.core.storage.OStorage; -import com.orientechnologies.orient.core.storage.OStorageProxy; +import com.orientechnologies.orient.core.storage.*; import com.orientechnologies.orient.core.type.ODocumentWrapper; import com.orientechnologies.orient.enterprise.channel.binary.OChannelBinaryProtocol; import com.orientechnologies.orient.enterprise.channel.binary.OChannelBinaryServer; @@ -103,13 +84,19 @@ import com.orientechnologies.orient.server.OServer; import com.orientechnologies.orient.server.OServerInfo; import com.orientechnologies.orient.server.ShutdownHelper; -import com.orientechnologies.orient.server.distributed.ODistributedServerManager; +import com.orientechnologies.orient.server.distributed.*; +import com.orientechnologies.orient.server.distributed.task.ORemoteTask; import com.orientechnologies.orient.server.network.OServerNetworkListener; -import com.orientechnologies.orient.server.network.protocol.ONetworkProtocolData; import com.orientechnologies.orient.server.plugin.OServerPlugin; import com.orientechnologies.orient.server.plugin.OServerPluginHelper; import com.orientechnologies.orient.server.tx.OTransactionOptimisticProxy; +import java.io.*; +import java.net.Socket; +import java.net.SocketException; +import java.util.*; +import java.util.Map.Entry; + public class ONetworkProtocolBinary extends OBinaryNetworkProtocolAbstract { protected OClientConnection connection; @@ -125,7 +112,7 @@ public ONetworkProtocolBinary(final String iThreadName) { public void config(final OServerNetworkListener iListener, final OServer iServer, final Socket iSocket, final OContextConfiguration iConfig) throws IOException { // CREATE THE CLIENT CONNECTION -// connection = iServer.getClientConnectionManager().connect(this); + // connection = iServer.getClientConnectionManager().connect(this); super.config(iListener, iServer, iSocket, iConfig); @@ -160,7 +147,6 @@ public void shutdown() { protected void onBeforeRequest() throws IOException { waitNodeIsOnline(); - solveSession(); if (connection != null) { @@ -179,17 +165,17 @@ protected void onBeforeRequest() throws IOException { private void solveSession() throws IOException { connection = server.getClientConnectionManager().getConnection(clientTxId, this); - boolean noToken =false; - if(connection == null && clientTxId < 0 && requestType != OChannelBinaryProtocol.REQUEST_DB_REOPEN){ - //OPEN OF OLD STYLE SESSION. + boolean noToken = false; + if (connection == null && clientTxId < 0 && requestType != OChannelBinaryProtocol.REQUEST_DB_REOPEN) { + // OPEN OF OLD STYLE SESSION. noToken = true; } if (requestType == OChannelBinaryProtocol.REQUEST_CONNECT || requestType == OChannelBinaryProtocol.REQUEST_DB_OPEN - || requestType == OChannelBinaryProtocol.REQUEST_SHUTDOWN){ - //OPERATIONS THAT DON'T USE TOKEN + || requestType == OChannelBinaryProtocol.REQUEST_SHUTDOWN) { + // OPERATIONS THAT DON'T USE TOKEN noToken = true; } - if(connection != null && !Boolean.TRUE.equals(connection.getTokenBased()) ){ + if (connection != null && !Boolean.TRUE.equals(connection.getTokenBased())) { // CONNECTION WITHOUT TOKEN/OLD MODE noToken = true; } @@ -200,18 +186,18 @@ private void solveSession() throws IOException { connection.getData().sessionId = clientTxId; } if (connection != null) { - //This should not be needed + // This should not be needed connection.setTokenBytes(null); connection.acquire(); } } else { byte[] bytes = channel.readBytes(); - if(connection == null && bytes != null && bytes.length >0){ - //THIS IS THE CASE OF A TOKEN OPERATION WITHOUT HANDSHAKE ON THIS CONNECTION. + if (connection == null && bytes != null && bytes.length > 0) { + // THIS IS THE CASE OF A TOKEN OPERATION WITHOUT HANDSHAKE ON THIS CONNECTION. connection = server.getClientConnectionManager().connect(this); } - if(connection == null) { + if (connection == null) { throw new OTokenSecurityException("missing session and token"); } if (requestType != OChannelBinaryProtocol.REQUEST_DB_REOPEN) { @@ -434,6 +420,13 @@ protected boolean executeRequest() throws IOException { incrementalRestore(); break; + case OChannelBinaryProtocol.DISTRIBUTED_REQUEST: + executeDistributedRequest(); + break; + + case OChannelBinaryProtocol.DISTRIBUTED_RESPONSE: + executeDistributedResponse(); + default: setDataCommandInfo("Command not supported"); return false; @@ -577,8 +570,8 @@ protected void checkServerAccess(final String iResource) { throw new OSecurityAccessException("Server user not authenticated"); if (!server.isAllowed(connection.getServerUser().name, iResource)) - throw new OSecurityAccessException("User '" + connection.getServerUser().name + "' cannot access to the resource [" + iResource - + "]. Use another server user or change permission in the file config/orientdb-server-config.xml"); + throw new OSecurityAccessException("User '" + connection.getServerUser().name + "' cannot access to the resource [" + + iResource + "]. Use another server user or change permission in the file config/orientdb-server-config.xml"); } else { if (!connection.getData().serverUser) throw new OSecurityAccessException("Server user not authenticated"); @@ -838,14 +831,98 @@ private void incrementalRestore() throws IOException { } } + private void executeDistributedRequest() throws IOException { + setDataCommandInfo("Distributed request"); + + final byte[] serializedReq = channel.readBytes(); + + final ODistributedServerManager manager = server.getDistributedManager(); + final ODistributedRequest req = manager.createRequest(); + + for (int retry = 1;; ++retry) { + final ObjectInputStream in = new ObjectInputStream(new ByteArrayInputStream(serializedReq)); + try { + req.readExternal(in); + } catch (ClassNotFoundException e) { + throw new IOException("Error on unmarshalling of remote task", e); + } + + final Object responsePayload; + + final ORemoteTask task = req.getTask(); + + try { + responsePayload = task.execute(server, server.getDistributedManager(), connection.getDatabase()); + } catch (Exception e) { + throw new IOException("Error on executing remote task", e); + } + + if (responsePayload instanceof OModificationOperationProhibitedException) { + // RETRY + try { + ODistributedServerLog.info(this, manager.getLocalNodeName(), req.getSenderNodeName(), ODistributedServerLog.DIRECTION.OUT, + "Database is frozen, waiting and retrying. Request %s (retry=%d)", req, retry); + + Thread.sleep(1000); + } catch (InterruptedException e) { + } + } else { + // OPERATION EXECUTED (OK OR ERROR), NO RETRY NEEDED + if (retry > 1) + ODistributedServerLog.info(this, manager.getLocalNodeName(), req.getSenderNodeName(), ODistributedServerLog.DIRECTION.OUT, + "Request %s succeed after retry=%d", req, retry); + + ODistributedServerLog.debug(this, manager.getLocalNodeName(), req.getSenderNodeName(), ODistributedServerLog.DIRECTION.OUT, + "sending back response '%s' to request %d (%s)", responsePayload, req.getId(), task); + + final ODistributedResponse response = manager.createResponse(req.getId(), manager.getLocalNodeName(), + req.getSenderNodeName(), (Serializable) responsePayload); + + try { + // GET THE SENDER'S RESPONSE QUEUE + final ORemoteServerController remoteSenderServer = manager.getRemoteServer(req.getSenderNodeName()); + + remoteSenderServer.sendResponse(response); + + } catch (Exception e) { + throw OException + .wrapException(new ODistributedException("Cannot send response to the sender node " + req.getSenderNodeName()), e); + } + + break; + } + } + } + + private void executeDistributedResponse() throws IOException { + setDataCommandInfo("Distributed response"); + + final byte[] serializedResponse = channel.readBytes(); + + final ODistributedServerManager manager = server.getDistributedManager(); + final ODistributedResponse response = manager.createResponse(); + + final ObjectInputStream in = new ObjectInputStream(new ByteArrayInputStream(serializedResponse)); + try { + response.readExternal(in); + } catch (ClassNotFoundException e) { + throw new IOException("Error on unmarshalling of remote task", e); + } + + final Object payload = response.getPayload(); + + manager.getMessageService().dispatchResponseToThread(response); + } + protected void sendError(final int iClientTxId, final Throwable t) throws IOException { channel.acquireWriteLock(); try { channel.writeByte(OChannelBinaryProtocol.RESPONSE_STATUS_ERROR); channel.writeInt(iClientTxId); - if ((connection != null && connection.getTokenBased() != null) && (connection != null && Boolean.TRUE.equals( - connection.getTokenBased())) && requestType != OChannelBinaryProtocol.REQUEST_CONNECT + if ((connection != null && connection.getTokenBased() != null) + && (connection != null && Boolean.TRUE.equals(connection.getTokenBased())) + && requestType != OChannelBinaryProtocol.REQUEST_CONNECT && (requestType != OChannelBinaryProtocol.REQUEST_DB_OPEN && requestType != OChannelBinaryProtocol.REQUEST_SHUTDOWN || (connection != null && connection.getData() != null && connection.getData().protocolVersion <= OChannelBinaryProtocol.PROTOCOL_VERSION_32)) @@ -1675,13 +1752,15 @@ protected void readRecord() throws IOException { sendOk(clientTxId); channel.writeByte((byte) 1); if (connection.getData().protocolVersion <= OChannelBinaryProtocol.PROTOCOL_VERSION_27) { - channel.writeBytes(connection.getDatabase().getStorage().getConfiguration().toStream(connection.getData().protocolVersion)); + channel + .writeBytes(connection.getDatabase().getStorage().getConfiguration().toStream(connection.getData().protocolVersion)); channel.writeVersion(0); channel.writeByte(ORecordBytes.RECORD_TYPE); } else { channel.writeByte(ORecordBytes.RECORD_TYPE); channel.writeVersion(0); - channel.writeBytes(connection.getDatabase().getStorage().getConfiguration().toStream(connection.getData().protocolVersion)); + channel + .writeBytes(connection.getDatabase().getStorage().getConfiguration().toStream(connection.getData().protocolVersion)); } channel.writeByte((byte) 0); // NO MORE RECORDS } finally { @@ -1769,13 +1848,15 @@ protected void readRecordIfVersionIsNotLatest() throws IOException { sendOk(clientTxId); channel.writeByte((byte) 1); if (connection.getData().protocolVersion <= OChannelBinaryProtocol.PROTOCOL_VERSION_27) { - channel.writeBytes(connection.getDatabase().getStorage().getConfiguration().toStream(connection.getData().protocolVersion)); + channel + .writeBytes(connection.getDatabase().getStorage().getConfiguration().toStream(connection.getData().protocolVersion)); channel.writeVersion(0); channel.writeByte(ORecordBytes.RECORD_TYPE); } else { channel.writeByte(ORecordBytes.RECORD_TYPE); channel.writeVersion(0); - channel.writeBytes(connection.getDatabase().getStorage().getConfiguration().toStream(connection.getData().protocolVersion)); + channel + .writeBytes(connection.getDatabase().getStorage().getConfiguration().toStream(connection.getData().protocolVersion)); } channel.writeByte((byte) 0); // NO MORE RECORDS } finally { @@ -1881,8 +1962,8 @@ protected void readConnectionData() throws IOException { protected void sendOk(final int iClientTxId) throws IOException { channel.writeByte(OChannelBinaryProtocol.RESPONSE_STATUS_OK); channel.writeInt(iClientTxId); - if (connection != null && Boolean.TRUE.equals(connection.getTokenBased()) && connection.getToken() != null && requestType != OChannelBinaryProtocol.REQUEST_CONNECT - && requestType != OChannelBinaryProtocol.REQUEST_DB_OPEN) { + if (connection != null && Boolean.TRUE.equals(connection.getTokenBased()) && connection.getToken() != null + && requestType != OChannelBinaryProtocol.REQUEST_CONNECT && requestType != OChannelBinaryProtocol.REQUEST_DB_OPEN) { // TODO: Check if the token is expiring and if it is send a new token byte[] renewedToken = server.getTokenHandler().renewIfNeeded(connection.getToken()); channel.writeBytes(renewedToken); @@ -2015,7 +2096,7 @@ private void serializeExceptionObject(Throwable original) throws IOException { /** * Due to protocol thread is daemon, shutdown should be executed in separate thread to guarantee its complete execution. - * + *

* This method never returns normally. */ private void runShutdownInNonDaemonThread() { @@ -2187,7 +2268,8 @@ private void createSBTreeBonsai() throws IOException { int clusterId = channel.readInt(); - OBonsaiCollectionPointer collectionPointer = connection.getDatabase().getSbTreeCollectionManager().createSBTree(clusterId, null); + OBonsaiCollectionPointer collectionPointer = connection.getDatabase().getSbTreeCollectionManager().createSBTree(clusterId, + null); beginResponse(); try {