Skip to content

Commit

Permalink
Fixed issue with creation of distributed classes + fixed sharding Tes…
Browse files Browse the repository at this point in the history
…t Case
  • Loading branch information
lvca committed Feb 6, 2015
1 parent 9ce6c2e commit a9807f6
Show file tree
Hide file tree
Showing 18 changed files with 352 additions and 275 deletions.
Expand Up @@ -88,4 +88,6 @@ public interface OCommandExecutor {
* @return
*/
public int getSecurityOperationType();

boolean involveSchema();
}
Expand Up @@ -55,19 +55,6 @@ public OCommandExecutorAbstract init(final OCommandRequestText iRequest) {
return this;
}

protected String upperCase(String text) {
StringBuilder result = new StringBuilder(text.length());
for (char c : text.toCharArray()) {
String upper = ("" + c).toUpperCase(Locale.ENGLISH);
if (upper.length() > 1) {
result.append(c);
} else {
result.append(upper);
}
}
return result.toString();
}

@Override
public String toString() {
return getClass().getSimpleName() + " [text=" + parserText + "]";
Expand Down Expand Up @@ -119,4 +106,21 @@ public Set<String> getInvolvedClusters() {
public int getSecurityOperationType() {
return ORole.PERMISSION_READ;
}

public boolean involveSchema() {
return false;
}

protected String upperCase(String text) {
StringBuilder result = new StringBuilder(text.length());
for (char c : text.toCharArray()) {
String upper = ("" + c).toUpperCase(Locale.ENGLISH);
if (upper.length() > 1) {
result.append(c);
} else {
result.append(upper);
}
}
return result.toString();
}
}
Expand Up @@ -20,13 +20,6 @@

package com.orientechnologies.orient.core.db.document;

import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.text.SimpleDateFormat;
import java.util.*;
import java.util.concurrent.Callable;

import com.orientechnologies.common.exception.OException;
import com.orientechnologies.common.listener.OListenerManger;
import com.orientechnologies.common.log.OLogManager;
Expand Down Expand Up @@ -114,6 +107,13 @@
import com.orientechnologies.orient.core.version.ORecordVersion;
import com.orientechnologies.orient.core.version.OVersionFactory;

import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.text.SimpleDateFormat;
import java.util.*;
import java.util.concurrent.Callable;

@SuppressWarnings("unchecked")
public class ODatabaseDocumentTx extends OListenerManger<ODatabaseListener> implements ODatabaseDocumentInternal {

Expand Down
Expand Up @@ -118,4 +118,9 @@ public Object execute(final Map<Object, Object> iArgs) {
public String getSyntax() {
return "ALTER CLASS <class> <attribute-name> <attribute-value>";
}

@Override
public boolean involveSchema() {
return true;
}
}
Expand Up @@ -19,8 +19,6 @@
*/
package com.orientechnologies.orient.core.sql;

import java.util.Map;

import com.orientechnologies.orient.core.command.OCommandDistributedReplicateRequest;
import com.orientechnologies.orient.core.command.OCommandRequest;
import com.orientechnologies.orient.core.command.OCommandRequestText;
Expand All @@ -29,6 +27,8 @@
import com.orientechnologies.orient.core.exception.OCommandExecutionException;
import com.orientechnologies.orient.core.metadata.schema.OClass;

import java.util.Map;

/**
* SQL CREATE CLASS command: Creates a new property in the target class.
*
Expand Down Expand Up @@ -150,4 +150,9 @@ public Object execute(final Map<Object, Object> iArgs) {
public String getSyntax() {
return "CREATE CLASS <class> [EXTENDS <super-class>] [CLUSTER <clusterId>*] [ABSTRACT]";
}

@Override
public boolean involveSchema(){
return true;
}
}
Expand Up @@ -132,4 +132,9 @@ public Object execute(final Map<Object, Object> iArgs) {
public String getSyntax() {
return "DROP CLASS <class> [UNSAFE]";
}

@Override
public boolean involveSchema() {
return true;
}
}
Expand Up @@ -19,26 +19,6 @@
*/
package com.orientechnologies.orient.server.hazelcast;

import java.io.File;
import java.io.FileInputStream;
import java.io.FileNotFoundException;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Date;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.locks.Lock;

import com.hazelcast.config.FileSystemXmlConfig;
import com.hazelcast.config.QueueConfig;
import com.hazelcast.core.*;
Expand Down Expand Up @@ -81,12 +61,33 @@
import com.orientechnologies.orient.server.distributed.ODistributedServerLog;
import com.orientechnologies.orient.server.distributed.ODistributedServerLog.DIRECTION;
import com.orientechnologies.orient.server.distributed.ODistributedStorage;
import com.orientechnologies.orient.server.distributed.OLocalClusterStrategy;
import com.orientechnologies.orient.server.distributed.task.OAbstractRemoteTask;
import com.orientechnologies.orient.server.distributed.task.OCopyDatabaseChunkTask;
import com.orientechnologies.orient.server.distributed.task.OCreateRecordTask;
import com.orientechnologies.orient.server.distributed.task.ODeployDatabaseTask;
import com.orientechnologies.orient.server.network.OServerNetworkListener;

import java.io.File;
import java.io.FileInputStream;
import java.io.FileNotFoundException;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Date;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.locks.Lock;

/**
* Hazelcast implementation for clustering.
*
Expand Down Expand Up @@ -441,7 +442,7 @@ public void onCreateClass(final ODatabaseInternal iDatabase, final OClass iClass
if (cfg == null)
return;

installClustersPerClass(iDatabase, cfg, iClass);
installClustersOfClass(iDatabase, cfg, iClass);
}

@Override
Expand Down Expand Up @@ -946,6 +947,100 @@ public Void call() throws Exception {

}

/**
* Guarantees, foreach class, that has own master cluster.
*/
@Override
public void propagateSchemaChanges(final ODatabaseInternal iDatabase) {
final ODistributedConfiguration cfg = getDatabaseConfiguration(iDatabase.getName());
if (cfg == null)
return;

for (OClass c : iDatabase.getMetadata().getSchema().getClasses()) {
if (!(c.getClusterSelection() instanceof OLocalClusterStrategy))
// INSTALL ONLY ON NON-ENHANCED CLASSES
installClustersOfClass(iDatabase, cfg, c);
}
}

/**
* Guarantees that each class has own master cluster.
*/
public synchronized void installClustersOfClass(final ODatabaseInternal iDatabase, final ODistributedConfiguration cfg,
final OClass iClass) {

if (!(iClass.getClusterSelection() instanceof OLocalClusterStrategy))
// INJECT LOCAL CLUSTER STRATEGY
((OClassImpl) iClass).setClusterSelectionInternal(new OLocalClusterStrategy(this, iDatabase.getName(), iClass));

if (iClass.isAbstract())
return;

final int[] clusterIds = iClass.getClusterIds();
final List<String> clusterNames = new ArrayList<String>(clusterIds.length);
for (int clusterId : clusterIds)
clusterNames.add(iDatabase.getClusterNameById(clusterId));

boolean distributedCfgDirty = false;

// CHECK IF EACH NODE HAS IS MASTER OF ONE CLUSTER
final Set<String> servers = cfg.getServers(null);
for (String server : servers) {
final String bestCluster = cfg.getLocalCluster(clusterNames, server);
if (bestCluster == null) {
// TRY TO FIND A CLUSTER PREVIOUSLY ASSIGNED TO THE LOCAL NODE
final String newClusterName = (iClass.getName() + "_" + server).toLowerCase();

final Set<String> cfgClusterNames = new HashSet<String>();
for (String cl : cfg.getClusterNames())
cfgClusterNames.add(cl);

if (cfgClusterNames.contains(newClusterName)) {
// FOUND A CLUSTER PREVIOUSLY ASSIGNED TO THE LOCAL ONE: CHANGE ASSIGNMENT TO LOCAL NODE AGAIN
ODistributedServerLog.info(this, nodeName, null, DIRECTION.NONE,
"class %s, change mastership of cluster '%s' (id=%d) to node '%s'", iClass, newClusterName,
iDatabase.getClusterIdByName(newClusterName), server);
cfg.setMasterServer(newClusterName, server);
distributedCfgDirty = true;
} else {

// CREATE A NEW CLUSTER WHERE CURRENT NODE IS THE MASTER
ODistributedServerLog.info(this, nodeName, null, DIRECTION.NONE, "class %s, creation of new cluster '%s' (id=%d)",
iClass, newClusterName, iDatabase.getClusterIdByName(newClusterName));

final OScenarioThreadLocal.RUN_MODE currentDistributedMode = OScenarioThreadLocal.INSTANCE.get();
if (currentDistributedMode != OScenarioThreadLocal.RUN_MODE.DEFAULT)
OScenarioThreadLocal.INSTANCE.set(OScenarioThreadLocal.RUN_MODE.DEFAULT);

try {
iClass.addCluster(newClusterName);
} catch (OCommandSQLParsingException e) {
if (!e.getMessage().endsWith("already exists"))
throw e;
} catch (Exception e) {
ODistributedServerLog.error(this, nodeName, null, DIRECTION.NONE, "error on creating cluster '%s' in class '%s'",
newClusterName, iClass);
throw new ODistributedException("Error on creating cluster '" + newClusterName + "' in class '" + iClass + "'");
} finally {

if (currentDistributedMode != OScenarioThreadLocal.RUN_MODE.DEFAULT)
// RESTORE PREVIOUS MODE
OScenarioThreadLocal.INSTANCE.set(OScenarioThreadLocal.RUN_MODE.RUNNING_DISTRIBUTED);
}

ODistributedServerLog.info(this, nodeName, null, DIRECTION.NONE,
"class '%s', set mastership of cluster '%s' (id=%d) to '%s'", iClass, newClusterName,
iDatabase.getClusterIdByName(newClusterName), server);
cfg.setMasterServer(newClusterName, server);
distributedCfgDirty = true;
}
}
}

if (distributedCfgDirty)
updateCachedDatabaseConfiguration(iDatabase.getName(), cfg.serialize(), true, true);
}

protected boolean installDbClustersForLocalNode(final ODatabaseInternal iDatabase, final ODistributedConfiguration cfg) {
if (iDatabase.isClosed())
getServerInstance().openDatabase(iDatabase);
Expand Down Expand Up @@ -1166,7 +1261,7 @@ protected ODocument loadDatabaseConfiguration(final String iDatabaseName, final
/**
* Pauses the request if the distributed cluster need to be rebalanced because change of shape (add/remove nodes) or a node that
* is much slower than the average.
*
*
* @param iDatabaseName
*/
protected void checkForClusterRebalance(final String iDatabaseName) {
Expand Down Expand Up @@ -1260,75 +1355,4 @@ private synchronized boolean installLocalClusterPerClass(final ODatabaseInternal

return false;
}

private synchronized void installClustersPerClass(final ODatabaseInternal iDatabase, final ODistributedConfiguration cfg,
final OClass iClass) {
((OClassImpl) iClass).setClusterSelectionInternal(new OLocalClusterStrategy(this, iDatabase.getName(), iClass));
if (iClass.isAbstract())
return;

final int[] clusterIds = iClass.getClusterIds();
final List<String> clusterNames = new ArrayList<String>(clusterIds.length);
for (int clusterId : clusterIds)
clusterNames.add(iDatabase.getClusterNameById(clusterId));

boolean distributedCfgDirty = false;

// CHECK IF EACH NODE HAS IS MASTER OF ONE CLUSTER
final Set<String> servers = cfg.getServers(null);
for (String server : servers) {
String bestCluster = cfg.getLocalCluster(clusterNames, server);
if (bestCluster == null) {
// TRY TO FIND A CLUSTER PREVIOUSLY ASSIGNED TO THE LOCAL NODE
final String newClusterName = (iClass.getName() + "_" + server).toLowerCase();

final Set<String> cfgClusterNames = new HashSet<String>();
for (String cl : cfg.getClusterNames())
cfgClusterNames.add(cl);

if (cfgClusterNames.contains(newClusterName)) {
// FOUND A CLUSTER PREVIOUSLY ASSIGNED TO THE LOCAL ONE: CHANGE ASSIGNMENT TO LOCAL NODE AGAIN
ODistributedServerLog.info(this, nodeName, null, DIRECTION.NONE,
"class %s, change mastership of cluster '%s' (id=%d) to node '%s'", iClass, newClusterName,
iDatabase.getClusterIdByName(newClusterName), server);
cfg.setMasterServer(newClusterName, server);
distributedCfgDirty = true;
} else {

// CREATE A NEW CLUSTER WHERE CURRENT NODE IS THE MASTER
ODistributedServerLog.info(this, nodeName, null, DIRECTION.NONE, "class %s, creation of new cluster '%s' (id=%d)",
iClass, newClusterName, iDatabase.getClusterIdByName(newClusterName));

final OScenarioThreadLocal.RUN_MODE currentDistributedMode = OScenarioThreadLocal.INSTANCE.get();
if (currentDistributedMode != OScenarioThreadLocal.RUN_MODE.DEFAULT)
OScenarioThreadLocal.INSTANCE.set(OScenarioThreadLocal.RUN_MODE.DEFAULT);

try {
iClass.addCluster(newClusterName);
} catch (OCommandSQLParsingException e) {
if (!e.getMessage().endsWith("already exists"))
throw e;
} catch (Exception e) {
ODistributedServerLog.error(this, nodeName, null, DIRECTION.NONE, "error on creating cluster '%s' in class '%s'",
newClusterName, iClass);
throw new ODistributedException("Error on creating cluster '" + newClusterName + "' in class '" + iClass + "'");
} finally {

if (currentDistributedMode != OScenarioThreadLocal.RUN_MODE.DEFAULT)
// RESTORE PREVIOUS MODE
OScenarioThreadLocal.INSTANCE.set(OScenarioThreadLocal.RUN_MODE.RUNNING_DISTRIBUTED);
}

ODistributedServerLog.info(this, nodeName, null, DIRECTION.NONE,
"class '%s', set mastership of cluster '%s' (id=%d) to '%s'", iClass, newClusterName,
iDatabase.getClusterIdByName(newClusterName), server);
cfg.setMasterServer(newClusterName, server);
distributedCfgDirty = true;
}
}
}

if (distributedCfgDirty)
updateCachedDatabaseConfiguration(iDatabase.getName(), cfg.serialize(), true, true);
}
}
Expand Up @@ -608,7 +608,7 @@ protected void fixNodesInConflict(final List<ODistributedResponse> bestResponses
ODistributedServerLog.warn(this, dManager.getLocalNodeName(), null, DIRECTION.NONE,
"fixing response (%s) for request (%s) in server %s to be: %s", r, request, r.getExecutorNodeName(), goodResponse);

final OAbstractRemoteTask fixTask = ((OAbstractReplicatedTask) request.getTask()).getFixTask(request, r.getPayload(),
final OAbstractRemoteTask fixTask = ((OAbstractReplicatedTask) request.getTask()).getFixTask(request, request.getTask(), r.getPayload(),
goodResponse.getPayload());

if (fixTask != null)
Expand Down

0 comments on commit a9807f6

Please sign in to comment.