Skip to content

Commit

Permalink
Distributed: improved management of synchronization
Browse files Browse the repository at this point in the history
Supported new ODiscardedResponse to avoid waiting for them
  • Loading branch information
lvca committed Jan 15, 2015
1 parent cfefb4a commit 6f3e88b
Show file tree
Hide file tree
Showing 9 changed files with 150 additions and 113 deletions.
Expand Up @@ -37,6 +37,7 @@
import com.orientechnologies.orient.server.distributed.ODistributedException;
import com.orientechnologies.orient.server.distributed.ODistributedRequest;
import com.orientechnologies.orient.server.distributed.ODistributedResponse;
import com.orientechnologies.orient.server.distributed.ODiscardedResponse;
import com.orientechnologies.orient.server.distributed.ODistributedServerLog;
import com.orientechnologies.orient.server.distributed.ODistributedServerLog.DIRECTION;
import com.orientechnologies.orient.server.distributed.task.OAbstractRemoteTask;
Expand Down Expand Up @@ -93,7 +94,7 @@ public void run() {
long lastMessageId = -1;

for (long processedMessages = 0; running; processedMessages++) {
if (restoringMessages && processedMessages >= queuedMsg ) {
if (restoringMessages && processedMessages >= queuedMsg) {
// END OF RESTORING MESSAGES, SET IT ONLINE
ODistributedServerLog.debug(this, getLocalNodeName(), null, DIRECTION.NONE,
"executed all pending tasks in queue (%d), set restoringMessages=false and database '%s' as online. Last req=%d",
Expand Down Expand Up @@ -232,6 +233,8 @@ protected ODistributedRequest readRequest() throws InterruptedException {
"discarded request %d because waiting for %d request=%s sourceNode=%s", req.getId(),
distributed.waitForMessageId.get(), req, req.getSenderNodeName());

sendResponseBack(req, req.getTask(), new ODiscardedResponse());

// READ THE NEXT ONE
req = nextMessage();
}
Expand Down
Expand Up @@ -19,6 +19,26 @@
*/
package com.orientechnologies.orient.server.hazelcast;

import java.io.File;
import java.io.FileInputStream;
import java.io.FileNotFoundException;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Date;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.locks.Lock;

import com.hazelcast.config.FileSystemXmlConfig;
import com.hazelcast.config.QueueConfig;
import com.hazelcast.core.*;
Expand Down Expand Up @@ -67,26 +87,6 @@
import com.orientechnologies.orient.server.distributed.task.ODeployDatabaseTask;
import com.orientechnologies.orient.server.network.OServerNetworkListener;

import java.io.File;
import java.io.FileInputStream;
import java.io.FileNotFoundException;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Date;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.locks.Lock;

/**
* Hazelcast implementation for clustering.
*
Expand Down Expand Up @@ -220,9 +220,6 @@ public void shutdown() {
hazelcastInstance.getCluster().removeMembershipListener(membershipListenerRegistration);
}

// AVOID TO REMOVE THE CFG TO PREVENT OTHER NODES TO UN-REGISTER IT
// getConfigurationMap().remove(CONFIG_NODE_PREFIX + getLocalNodeId());

if (hazelcastInstance != null)
try {
hazelcastInstance.shutdown();
Expand Down Expand Up @@ -542,18 +539,22 @@ public void updateLastClusterChange() {
@Override
public void memberRemoved(final MembershipEvent iEvent) {
updateLastClusterChange();
ODistributedServerLog.warn(this, getLocalNodeName(), null, DIRECTION.NONE, "node removed id=%s name=%s", iEvent.getMember(),
getNodeName(iEvent.getMember()));

final Member member = iEvent.getMember();

final String nodeName = getNodeName(member);
if (nodeName != null) {
activeNodes.remove(nodeName);

// REMOVE NODE IN DB CFG
if (messageService != null)
messageService.handleUnreachableNode(nodeName);

ODistributedServerLog.warn(this, getLocalNodeName(), null, DIRECTION.NONE, "node removed id=%s name=%s", member, nodeName);

if (nodeName.startsWith("ext:"))
ODistributedServerLog.error(this, getLocalNodeName(), null, DIRECTION.NONE,
"removed node id=%s name=%s has not being recognized. Remove the node manually", member, nodeName);

}

OClientConnectionManager.instance().pushDistribCfg2Clients(getClusterConfiguration());
Expand Down Expand Up @@ -857,7 +858,7 @@ public boolean installDatabase(boolean iStartup, String databaseName, ODocument
ODistributedDatabaseChunk chunk = (ODistributedDatabaseChunk) value;

// DISCARD ALL THE MESSAGES BEFORE THE BACKUP
distrDatabase.setWaitForMessage(chunk.getLastOperationId()-1);
distrDatabase.setWaitForMessage(chunk.getLastOperationId());

final String fileName = Orient.getTempPath() + "install_" + databaseName + ".zip";

Expand Down
@@ -0,0 +1,31 @@
/*
*
* * Copyright 2014 Orient Technologies LTD (info(at)orientechnologies.com)
* *
* * Licensed under the Apache License, Version 2.0 (the "License");
* * you may not use this file except in compliance with the License.
* * You may obtain a copy of the License at
* *
* * http://www.apache.org/licenses/LICENSE-2.0
* *
* * Unless required by applicable law or agreed to in writing, software
* * distributed under the License is distributed on an "AS IS" BASIS,
* * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* * See the License for the specific language governing permissions and
* * limitations under the License.
* *
* * For more information: http://www.orientechnologies.com
*
*/

package com.orientechnologies.orient.server.distributed;

import java.io.Serializable;

/**
* Immutable class used to report back a discarded response.
*/
public class ODiscardedResponse implements Serializable {
public ODiscardedResponse() {
}
}
@@ -1,42 +1,42 @@
/*
*
* * Copyright 2014 Orient Technologies LTD (info(at)orientechnologies.com)
* *
* * Licensed under the Apache License, Version 2.0 (the "License");
* * you may not use this file except in compliance with the License.
* * You may obtain a copy of the License at
* *
* * http://www.apache.org/licenses/LICENSE-2.0
* *
* * Unless required by applicable law or agreed to in writing, software
* * distributed under the License is distributed on an "AS IS" BASIS,
* * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* * See the License for the specific language governing permissions and
* * limitations under the License.
* *
* * For more information: http://www.orientechnologies.com
*
*/
*
* * Copyright 2014 Orient Technologies LTD (info(at)orientechnologies.com)
* *
* * Licensed under the Apache License, Version 2.0 (the "License");
* * you may not use this file except in compliance with the License.
* * You may obtain a copy of the License at
* *
* * http://www.apache.org/licenses/LICENSE-2.0
* *
* * Unless required by applicable law or agreed to in writing, software
* * distributed under the License is distributed on an "AS IS" BASIS,
* * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* * See the License for the specific language governing permissions and
* * limitations under the License.
* *
* * For more information: http://www.orientechnologies.com
*
*/
package com.orientechnologies.orient.server.distributed;

/**
*
* @author Luca Garulli (l.garulli--at--orientechnologies.com)
*
*/
public interface ODistributedResponse {
*
* @author Luca Garulli (l.garulli--at--orientechnologies.com)
*
*/
public interface ODistributedResponse {

String getExecutorNodeName();
String getExecutorNodeName();

String getSenderNodeName();
ODistributedResponse setExecutorNodeName(String iExecutor);

Object getPayload();
String getSenderNodeName();

long getRequestId();
Object getPayload();

ODistributedResponse setExecutorNodeName(String iExecutor);
ODistributedResponse setPayload(Object iPayload);

ODistributedResponse setPayload(Object iPayload);
long getRequestId();

boolean isExecutedOnLocalNode();
}
boolean isExecutedOnLocalNode();
}
Expand Up @@ -62,6 +62,7 @@ public class ODistributedResponseManager {
private final int quorum;
private final boolean waitForLocalNode;
private volatile int receivedResponses = 0;
private volatile int discardedResponses = 0;
private volatile boolean receivedCurrentNode;
private Object responseLock = new Object();

Expand Down Expand Up @@ -132,8 +133,10 @@ public boolean collectResponse(final ODistributedResponse response) {
"received response '%s' for request (%s) (receivedCurrentNode=%s receivedResponses=%d expectedSynchronousResponses=%d quorum=%d)",
response, request, receivedCurrentNode, receivedResponses, expectedSynchronousResponses, quorum);

// PUT THE RESPONSE IN THE RIGHT RESPONSE GROUP
if (groupResponsesByResult) {
if (response.getPayload() instanceof ODiscardedResponse)
discardedResponses++;
else if (groupResponsesByResult) {
// PUT THE RESPONSE IN THE RIGHT RESPONSE GROUP
// TODO: AVOID TO KEEP ALL THE RESULT FOR THE SAME RESP GROUP, BUT RATHER THE FIRST ONE + COUNTER
boolean foundBucket = false;
for (int i = 0; i < responseGroups.size(); ++i) {
Expand Down Expand Up @@ -288,20 +291,25 @@ public boolean waitForSynchronousResponses() throws InterruptedException {

if (missingActiveNodes == 0) {
// NO MORE ACTIVE NODES TO WAIT
ODistributedServerLog.debug(this, dManager.getLocalNodeName(), null, DIRECTION.NONE, "no more active nodes to wait for request (%s): anticipate timeout (saved %d ms)", request, currentTimeout);
ODistributedServerLog.debug(this, dManager.getLocalNodeName(), null, DIRECTION.NONE,
"no more active nodes to wait for request (%s): anticipate timeout (saved %d ms)", request, currentTimeout);
break;
}

final long lastClusterChange = dManager.getLastClusterChangeOn();
if (lastClusterChange > 0 && now - lastClusterChange < (synchTimeout + ADDITIONAL_TIMEOUT_CLUSTER_SHAPE)) {
// CHANGED CLUSTER SHAPE DURING WAIT: ENLARGE TIMEOUT
currentTimeout = synchTimeout;
ODistributedServerLog.debug(this, dManager.getLocalNodeName(), null, DIRECTION.NONE, "cluster shape changed during request (%s): enlarge timeout +%dms, wait again for %dms", request, synchTimeout, currentTimeout);
ODistributedServerLog.debug(this, dManager.getLocalNodeName(), null, DIRECTION.NONE,
"cluster shape changed during request (%s): enlarge timeout +%dms, wait again for %dms", request, synchTimeout,
currentTimeout);
continue;
} else if (synchronizingNodes > 0) {
// SOME NODE IS SYNCHRONIZING: WAIT FOR THEM
currentTimeout = synchTimeout;
ODistributedServerLog.debug(this, dManager.getLocalNodeName(), null, DIRECTION.NONE, "%d nodes are in synchronization mode during request (%s): enlarge timeout +%dms, wait again for %dms", synchronizingNodes, request, synchTimeout, currentTimeout);
ODistributedServerLog.debug(this, dManager.getLocalNodeName(), null, DIRECTION.NONE,
"%d nodes are in synchronization mode during request (%s): enlarge timeout +%dms, wait again for %dms",
synchronizingNodes, request, synchTimeout, currentTimeout);
}
}

Expand Down Expand Up @@ -467,10 +475,10 @@ protected boolean isMinimumQuorumReached(final boolean iCheckAvailableNodes) {
return receivedResponses >= quorum;

for (List<ODistributedResponse> group : responseGroups)
if (group.size() >= quorum)
if (group.size() + discardedResponses >= quorum)
return true;

if (getReceivedResponsesCount() < quorum && iCheckAvailableNodes) {
if (receivedResponses < quorum && iCheckAvailableNodes) {
final ODistributedConfiguration dbConfig = dManager.getDatabaseConfiguration(getDatabaseName());
if (!dbConfig.getFailureAvailableNodesLessQuorum("*")) {
// CHECK IF ANY NODE IS OFFLINE
Expand Down Expand Up @@ -516,7 +524,7 @@ protected void manageConflicts() {
final List<ODistributedResponse> bestResponsesGroup = responseGroups.get(bestResponsesGroupIndex);

final int maxCoherentResponses = bestResponsesGroup.size();
final int conflicts = getExpectedResponses() - maxCoherentResponses;
final int conflicts = getExpectedResponses() - ( maxCoherentResponses + discardedResponses );

if (isMinimumQuorumReached(true)) {
// QUORUM SATISFIED
Expand All @@ -528,12 +536,7 @@ protected void manageConflicts() {
if (checkNoWinnerCase(bestResponsesGroup))
return;

// NO FIFTY/FIFTY CASE: FIX THE CONFLICTED NODES BY OVERWRITING THE RECORD WITH THE WINNER'S RESULT
ODistributedServerLog.warn(this, dManager.getLocalNodeName(), null, DIRECTION.NONE,
"detected %d conflicts, but the quorum (%d) has been reached. Fixing remote records. Request (%s)", conflicts, quorum,
request);

fixNodesInConflict(bestResponsesGroup);
fixNodesInConflict(bestResponsesGroup, conflicts);

} else {
// QUORUM HASN'T BEEN REACHED
Expand Down Expand Up @@ -590,7 +593,12 @@ protected void undoRequest() {
}
}

protected void fixNodesInConflict(final List<ODistributedResponse> bestResponsesGroup) {
protected void fixNodesInConflict(final List<ODistributedResponse> bestResponsesGroup, final int conflicts) {
// NO FIFTY/FIFTY CASE: FIX THE CONFLICTED NODES BY OVERWRITING THE RECORD WITH THE WINNER'S RESULT
ODistributedServerLog.warn(this, dManager.getLocalNodeName(), null, DIRECTION.NONE,
"detected %d conflicts, but the quorum (%d) has been reached. Fixing remote records. Request (%s)", conflicts, quorum,
request);

final ODistributedResponse goodResponse = bestResponsesGroup.get(0);

for (List<ODistributedResponse> responseGroup : responseGroups) {
Expand Down
Expand Up @@ -19,10 +19,6 @@
*/
package com.orientechnologies.orient.server.distributed.task;

import java.io.IOException;
import java.io.ObjectInput;
import java.io.ObjectOutput;

import com.orientechnologies.orient.core.Orient;
import com.orientechnologies.orient.core.db.document.ODatabaseDocumentTx;
import com.orientechnologies.orient.core.db.record.OPlaceholder;
Expand All @@ -37,6 +33,10 @@
import com.orientechnologies.orient.server.distributed.ODistributedServerLog.DIRECTION;
import com.orientechnologies.orient.server.distributed.ODistributedServerManager;

import java.io.IOException;
import java.io.ObjectInput;
import java.io.ObjectOutput;

/**
* Distributed create record task used for synchronization.
*
Expand Down Expand Up @@ -95,9 +95,7 @@ public ODeleteRecordTask getFixTask(final ODistributedRequest iRequest, final Ob
if (iBadResponse instanceof Throwable)
return null;

// TODO: NO ROLLBACK, PUT THE NODE AS OFFLINE
final OPlaceholder badResult = (OPlaceholder) iBadResponse;

return new ODeleteRecordTask(new ORecordId(badResult.getIdentity()), badResult.getRecordVersion()).setDelayed(false);
}

Expand Down

0 comments on commit 6f3e88b

Please sign in to comment.