Skip to content

Commit

Permalink
Supported HA on shading (Issue #4411)
Browse files Browse the repository at this point in the history
  • Loading branch information
lvca committed Jun 21, 2015
1 parent 10ae3cf commit 46800ad
Show file tree
Hide file tree
Showing 3 changed files with 221 additions and 157 deletions.
Expand Up @@ -110,98 +110,112 @@ public Object execute(final Map<Object, Object> iArgs) {
if (dManager == null || !dManager.isEnabled())
throw new OCommandExecutionException("OrientDB is not started in distributed mode");

final String nodeName = dManager.getLocalNodeName();
final String databaseName = database.getName();

final ODistributedConfiguration cfg = dManager.getDatabaseConfiguration(databaseName);
try {
switch (mode) {
case FULL_REPLACE:
return replaceCluster(dManager, database, serverInstance, databaseName, clusterName);

switch (mode) {
case FULL_REPLACE:
final String dbPath = serverInstance.getDatabaseDirectory() + databaseName;
// case MERGE:
// int merged = 0;
// return String.format("Merged %d records", merged);
}
} catch (Exception e) {
throw new OCommandExecutionException("Cannot execute synchronization of cluster", e);
}

final List<String> nodesWhereClusterIsCfg = cfg.getServers(clusterName, null);
nodesWhereClusterIsCfg.remove(nodeName);
return "Mode not supported";
}

if (nodesWhereClusterIsCfg.isEmpty())
throw new OCommandExecutionException("Cannot synchronize cluster '" + clusterName
+ "' because is not configured on any running nodes");
public static Object replaceCluster(final OHazelcastPlugin dManager, final ODatabaseDocumentInternal database,
final OServer serverInstance, final String databaseName, final String clusterName) throws IOException {

final OSyncClusterTask task = new OSyncClusterTask(clusterName);
final Map<String, Object> results = (Map<String, Object>) dManager.sendRequest(databaseName, null, nodesWhereClusterIsCfg,
task, ODistributedRequest.EXECUTION_MODE.RESPONSE);
final OAbstractPaginatedStorage stg = (OAbstractPaginatedStorage) database.getStorage().getUnderlying();
final OPaginatedCluster cluster = (OPaginatedCluster) stg.getClusterByName(clusterName);
stg.getWriteCache().close(cluster.getFileId(), false);

final OAbstractPaginatedStorage stg = (OAbstractPaginatedStorage) database.getStorage().getUnderlying();
final OPaginatedCluster cluster = (OPaginatedCluster) stg.getClusterByName(clusterName);
final String fileName = cluster.getFileName();
return replaceCluster(dManager, serverInstance, databaseName, clusterName);
}

FileOutputStream out = null;
try {
final File tempFile = new File(Orient.getTempPath() + "/backup_" + database.getName() + "_" + clusterName + "_toInstall.zip");
if (tempFile.exists())
tempFile.delete();
else
tempFile.getParentFile().mkdirs();
tempFile.createNewFile();

long fileSize = 0;
out = new FileOutputStream(tempFile, false);
for (Map.Entry<String, Object> r : results.entrySet()) {
final Object value = r.getValue();

if (value instanceof Boolean) {
continue;
} else if (value instanceof Throwable) {
ODistributedServerLog.error(this, nodeName, r.getKey(), ODistributedServerLog.DIRECTION.IN,
"error on installing cluster %s in %s", (Exception) value, databaseName, dbPath);
} else if (value instanceof ODistributedDatabaseChunk) {
ODistributedDatabaseChunk chunk = (ODistributedDatabaseChunk) value;

stg.getWriteCache().close(cluster.getFileId(), false);

fileSize = writeDatabaseChunk(nodeName, 1, chunk, out);
for (int chunkNum = 2; !chunk.last; chunkNum++) {
final Object result = dManager.sendRequest(databaseName, null, Collections.singleton(r.getKey()),
new OCopyDatabaseChunkTask(chunk.filePath, chunkNum, chunk.offset + chunk.buffer.length),
ODistributedRequest.EXECUTION_MODE.RESPONSE);

if (result instanceof Boolean)
continue;
else if (result instanceof Exception) {
ODistributedServerLog.error(this, nodeName, r.getKey(), ODistributedServerLog.DIRECTION.IN,
"error on installing database %s in %s (chunk #%d)", (Exception) result, databaseName, dbPath, chunkNum);
} else if (result instanceof ODistributedDatabaseChunk) {
chunk = (ODistributedDatabaseChunk) result;

fileSize += writeDatabaseChunk(nodeName, chunkNum, chunk, out);
}
public static Object replaceCluster(final OHazelcastPlugin dManager, final OServer serverInstance, final String databaseName,
final String clusterName) {
final ODistributedConfiguration cfg = dManager.getDatabaseConfiguration(databaseName);
final String dbPath = serverInstance.getDatabaseDirectory() + databaseName;

final String nodeName = dManager.getLocalNodeName();

final List<String> nodesWhereClusterIsCfg = cfg.getServers(clusterName, null);
nodesWhereClusterIsCfg.remove(nodeName);

if (nodesWhereClusterIsCfg.isEmpty())
throw new OCommandExecutionException("Cannot synchronize cluster '" + clusterName
+ "' because is not configured on any running nodes");

final OSyncClusterTask task = new OSyncClusterTask(clusterName);
final Map<String, Object> results = (Map<String, Object>) dManager.sendRequest(databaseName, null, nodesWhereClusterIsCfg,
task, ODistributedRequest.EXECUTION_MODE.RESPONSE);

File tempFile = null;
FileOutputStream out = null;
try {
tempFile = new File(Orient.getTempPath() + "/backup_" + databaseName + "_" + clusterName + "_toInstall.zip");
if (tempFile.exists())
tempFile.delete();
else
tempFile.getParentFile().mkdirs();
tempFile.createNewFile();

long fileSize = 0;
out = new FileOutputStream(tempFile, false);
for (Map.Entry<String, Object> r : results.entrySet()) {
final Object value = r.getValue();

if (value instanceof Boolean) {
continue;
} else if (value instanceof Throwable) {
ODistributedServerLog.error(null, nodeName, r.getKey(), ODistributedServerLog.DIRECTION.IN,
"error on installing cluster %s in %s", (Exception) value, databaseName, dbPath);
} else if (value instanceof ODistributedDatabaseChunk) {
ODistributedDatabaseChunk chunk = (ODistributedDatabaseChunk) value;

fileSize = writeDatabaseChunk(nodeName, 1, chunk, out);
for (int chunkNum = 2; !chunk.last; chunkNum++) {
final Object result = dManager.sendRequest(databaseName, null, Collections.singleton(r.getKey()),
new OCopyDatabaseChunkTask(chunk.filePath, chunkNum, chunk.offset + chunk.buffer.length),
ODistributedRequest.EXECUTION_MODE.RESPONSE);

if (result instanceof Boolean)
continue;
else if (result instanceof Exception) {
ODistributedServerLog.error(null, nodeName, r.getKey(), ODistributedServerLog.DIRECTION.IN,
"error on installing database %s in %s (chunk #%d)", (Exception) result, databaseName, dbPath, chunkNum);
} else if (result instanceof ODistributedDatabaseChunk) {
chunk = (ODistributedDatabaseChunk) result;

fileSize += writeDatabaseChunk(nodeName, chunkNum, chunk, out);
}
}
}
}

OZIPCompressionUtil.uncompressDirectory(new FileInputStream(tempFile), dbPath, null);
OZIPCompressionUtil.uncompressDirectory(new FileInputStream(tempFile), dbPath, null);

return String.format("Cluster correctly replaced, transferred %d bytes", fileSize);
return String.format("Cluster correctly replaced, transferred %d bytes", fileSize);

} catch (Exception e) {
ODistributedServerLog.error(this, nodeName, null, ODistributedServerLog.DIRECTION.NONE,
"error on transferring database '%s' to '%s'", e, databaseName, fileName);
throw new ODistributedException("Error on transferring database", e);
} finally {
try {
if (out != null) {
out.flush();
out.close();
}
} catch (IOException e) {
} catch (Exception e) {
ODistributedServerLog.error(null, nodeName, null, ODistributedServerLog.DIRECTION.NONE,
"error on transferring database '%s' to '%s'", e, databaseName, tempFile);
throw new ODistributedException("Error on transferring database", e);
} finally {
try {
if (out != null) {
out.flush();
out.close();
}
} catch (IOException e) {
}

case MERGE:
int merged = 0;
return String.format("Merged %d records", merged);
}

return "Mode not supported";
}

@Override
Expand All @@ -224,10 +238,10 @@ public String getSyntax() {
return "SYNC CLUSTER <name> [-full_replace|-merge]";
}

protected long writeDatabaseChunk(final String iNodeName, final int iChunkId, final ODistributedDatabaseChunk chunk,
protected static long writeDatabaseChunk(final String iNodeName, final int iChunkId, final ODistributedDatabaseChunk chunk,
final FileOutputStream out) throws IOException {

ODistributedServerLog.warn(this, iNodeName, null, ODistributedServerLog.DIRECTION.NONE,
ODistributedServerLog.warn(null, iNodeName, null, ODistributedServerLog.DIRECTION.NONE,
"- writing chunk #%d offset=%d size=%s", iChunkId, chunk.offset, OFileUtils.getSizeAsString(chunk.buffer.length));
out.write(chunk.buffer);

Expand Down

0 comments on commit 46800ad

Please sign in to comment.