Skip to content

Commit

Permalink
Fixing distributed test cases (again) + improved HZ startup time
Browse files Browse the repository at this point in the history
  • Loading branch information
lvca committed Aug 4, 2015
1 parent 8e122d9 commit 4102440
Show file tree
Hide file tree
Showing 28 changed files with 144 additions and 98 deletions.
Expand Up @@ -861,7 +861,7 @@ public boolean cleanOutRecord(ORecordId recordId, ORecordVersion recordVersion,
}

@Override
public void backup(OutputStream out, Map<String, Object> options, Callable<Object> callable,
public List<String> backup(OutputStream out, Map<String, Object> options, Callable<Object> callable,
final OCommandOutputListener iListener, int compressionLevel, int bufferSize) throws IOException {
throw new UnsupportedOperationException(
"backup is not supported against remote storage. Open the database with plocal or use Enterprise Edition");
Expand Down Expand Up @@ -1905,11 +1905,16 @@ protected String openRemoteDatabase() throws IOException {
} catch (Exception e) {
if (network != null) {
// REMOVE THE NETWORK CONNECTION IF ANY
engine.getConnectionManager().remove(network);
try {
engine.getConnectionManager().remove(network);
} catch (Exception ex) {
// IGNORE ANY EXCEPTION
OLogManager.instance().debug(this, "Can not remove connection or database url=" + currentURL, e);
}
network = null;
}

OLogManager.instance().error(this, "Can not open database with url " + currentURL, e);
OLogManager.instance().error(this, "Can not open database url=" + currentURL, e);
}
} while (engine.getConnectionManager().getAvailableConnections(currentURL) > 0);

Expand Down
Expand Up @@ -23,12 +23,12 @@
import java.io.InputStream;
import java.io.OutputStream;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.atomic.AtomicInteger;

import com.orientechnologies.common.concur.resource.OSharedResourceAdaptiveExternal;
import com.orientechnologies.orient.core.Orient;
import com.orientechnologies.orient.core.command.OCommandOutputListener;
import com.orientechnologies.orient.core.command.OCommandRequestText;
Expand Down Expand Up @@ -218,8 +218,7 @@ public Set<String> getClusterNames() {
}

@Override
public void backup(OutputStream out, Map<String, Object> options, final Callable<Object> callable,
final OCommandOutputListener iListener, int compressionLevel, int bufferSize) throws IOException {
public List<String> backup(OutputStream out, Map<String, Object> options, final Callable<Object> callable, final OCommandOutputListener iListener, int compressionLevel, int bufferSize) throws IOException {
throw new UnsupportedOperationException("backup");
}

Expand Down
Expand Up @@ -32,7 +32,9 @@
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.util.ArrayList;
import java.util.Date;
import java.util.List;
import java.util.zip.ZipEntry;
import java.util.zip.ZipInputStream;
import java.util.zip.ZipOutputStream;
Expand All @@ -43,14 +45,18 @@
* @author Luca Garulli (l.garulli--at--orientechnologies.com)
*/
public class OZIPCompressionUtil {
public static int compressDirectory(final String sourceFolderName, final OutputStream output, final String[] iSkipFileExtensions,
final OCommandOutputListener iOutput, int compressionLevel) throws IOException {
public static List<String> compressDirectory(final String sourceFolderName, final OutputStream output,
final String[] iSkipFileExtensions, final OCommandOutputListener iOutput, int compressionLevel) throws IOException {

final List<String> compressedFiles = new ArrayList<String>();

final ZipOutputStream zos = new ZipOutputStream(output);
zos.setComment("OrientDB Backup executed on " + new Date());
try {
zos.setLevel(compressionLevel);
return addFolder(zos, sourceFolderName, sourceFolderName, iSkipFileExtensions, iOutput);
addFolder(zos, sourceFolderName, sourceFolderName, iSkipFileExtensions, iOutput, compressedFiles);

return compressedFiles;
} finally {
zos.close();
}
Expand Down Expand Up @@ -110,16 +116,15 @@ private static String getDirectoryPart(final String name) {
return s == -1 ? null : name.substring(0, s);
}

private static int addFolder(ZipOutputStream zos, String path, String baseFolderName, final String[] iSkipFileExtensions,
final OCommandOutputListener iOutput) throws IOException {
int total = 0;
private static void addFolder(ZipOutputStream zos, String path, String baseFolderName, final String[] iSkipFileExtensions,
final OCommandOutputListener iOutput, final List<String> iCompressedFiles) throws IOException {

File f = new File(path);
if (f.exists()) {
if (f.isDirectory()) {
File f2[] = f.listFiles();
for (int i = 0; i < f2.length; i++) {
total += addFolder(zos, f2[i].getAbsolutePath(), baseFolderName, iSkipFileExtensions, iOutput);
addFolder(zos, f2[i].getAbsolutePath(), baseFolderName, iSkipFileExtensions, iOutput, iCompressedFiles);
}
} else {
// add file
Expand All @@ -129,16 +134,16 @@ private static int addFolder(ZipOutputStream zos, String path, String baseFolder
if (iSkipFileExtensions != null)
for (String skip : iSkipFileExtensions)
if (entryName.endsWith(skip))
return 0;
return;

addFile(zos, path, entryName, iOutput);
iCompressedFiles.add(path);

total++;
addFile(zos, path, entryName, iOutput);
}

} else {
throw new IllegalArgumentException("Directory " + path + " not found");
}
return total;
}

public static void compressFile(final String folderName, final String entryName, final OutputStream output,
Expand Down
Expand Up @@ -37,6 +37,7 @@
import java.io.OutputStream;
import java.util.Collection;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.concurrent.Callable;
Expand Down Expand Up @@ -114,9 +115,9 @@ public OContextConfiguration getConfiguration() {
* @throws IOException
*/
@Override
public void backup(OutputStream out, Map<String, Object> options, Callable<Object> callable,
public List<String> backup(OutputStream out, Map<String, Object> options, Callable<Object> callable,
final OCommandOutputListener iListener, int compressionLevel, int bufferSize) throws IOException {
underlying.backup(out, options, callable, iListener, compressionLevel, bufferSize);
return underlying.backup(out, options, callable, iListener, compressionLevel, bufferSize);
}

/**
Expand Down
Expand Up @@ -279,7 +279,7 @@ public <DB extends ODatabase> DB open(final String iUserName, final String iUser
throw e;
} catch (Exception e) {
close();
throw new ODatabaseException("Cannot open database", e);
throw new ODatabaseException("Cannot open database url=" + getURL(), e);
}
return (DB) this;
}
Expand Down Expand Up @@ -2698,19 +2698,19 @@ public void replaceStorage(OStorage iNewStorage) {
}

@Override
public <V> V callInLock(Callable<V> iCallable, boolean iExclusiveLock) {
public <V> V callInLock(final Callable<V> iCallable, final boolean iExclusiveLock) {
return storage.callInLock(iCallable, iExclusiveLock);
}

@Override
public void backup(OutputStream out, Map<String, Object> options, Callable<Object> callable, OCommandOutputListener iListener,
int compressionLevel, int bufferSize) throws IOException {
storage.backup(out, options, callable, iListener, compressionLevel, bufferSize);
public List<String> backup(final OutputStream out, final Map<String, Object> options, final Callable<Object> callable,
final OCommandOutputListener iListener, final int compressionLevel, final int bufferSize) throws IOException {
return storage.backup(out, options, callable, iListener, compressionLevel, bufferSize);
}

@Override
public void restore(InputStream in, Map<String, Object> options, Callable<Object> callable, OCommandOutputListener iListener)
throws IOException {
public void restore(final InputStream in, final Map<String, Object> options, final Callable<Object> callable,
final OCommandOutputListener iListener) throws IOException {
if (storage == null)
storage = Orient.instance().loadStorage(url);

Expand Down
Expand Up @@ -33,9 +33,9 @@
import com.orientechnologies.orient.core.exception.OStorageException;
import com.orientechnologies.orient.core.index.engine.OHashTableIndexEngine;
import com.orientechnologies.orient.core.index.engine.OSBTreeIndexEngine;
import com.orientechnologies.orient.core.metadata.OMetadataDefault;
import com.orientechnologies.orient.core.storage.cache.OReadCache;
import com.orientechnologies.orient.core.storage.cache.local.OWOWCache;
import com.orientechnologies.orient.core.metadata.OMetadataDefault;
import com.orientechnologies.orient.core.storage.impl.local.OAbstractPaginatedStorage;
import com.orientechnologies.orient.core.storage.impl.local.OFreezableStorage;
import com.orientechnologies.orient.core.storage.impl.local.OStorageConfigurationSegment;
Expand All @@ -48,6 +48,7 @@
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
Expand All @@ -60,24 +61,24 @@
* @since 28.03.13
*/
public class OLocalPaginatedStorage extends OAbstractPaginatedStorage implements OFreezableStorage, OBackupable {
private static String[] ALL_FILE_EXTENSIONS = { ".ocf", ".pls", ".pcl", ".oda", ".odh", ".otx", ".ocs", ".oef", ".oem", ".oet",
ODiskWriteAheadLog.WAL_SEGMENT_EXTENSION, ODiskWriteAheadLog.MASTER_RECORD_EXTENSION,
private static String[] ALL_FILE_EXTENSIONS = { ".ocf", ".pls", ".pcl", ".oda", ".odh", ".otx", ".ocs", ".oef",
".oem", ".oet", ODiskWriteAheadLog.WAL_SEGMENT_EXTENSION, ODiskWriteAheadLog.MASTER_RECORD_EXTENSION,
OHashTableIndexEngine.BUCKET_FILE_EXTENSION, OHashTableIndexEngine.METADATA_FILE_EXTENSION,
OHashTableIndexEngine.TREE_FILE_EXTENSION, OHashTableIndexEngine.NULL_BUCKET_FILE_EXTENSION,
OClusterPositionMap.DEF_EXTENSION, OSBTreeIndexEngine.DATA_FILE_EXTENSION, OWOWCache.NAME_ID_MAP_EXTENSION,
OIndexRIDContainer.INDEX_FILE_EXTENSION, OSBTreeCollectionManagerShared.DEFAULT_EXTENSION,
OSBTreeIndexEngine.NULL_BUCKET_FILE_EXTENSION };
OSBTreeIndexEngine.NULL_BUCKET_FILE_EXTENSION };

private static final int ONE_KB = 1024;
private static final int ONE_KB = 1024;

private final int DELETE_MAX_RETRIES;
private final int DELETE_WAIT_TIME;
private final int DELETE_MAX_RETRIES;
private final int DELETE_WAIT_TIME;

private final OStorageVariableParser variableParser;
private final OPaginatedStorageDirtyFlag dirtyFlag;

private final String storagePath;
private ExecutorService checkpointExecutor;
private final String storagePath;
private ExecutorService checkpointExecutor;

public OLocalPaginatedStorage(final String name, final String filePath, final String mode, final int id, OReadCache readCache)
throws IOException {
Expand Down Expand Up @@ -142,7 +143,7 @@ public String getType() {
}

@Override
public void backup(OutputStream out, Map<String, Object> options, final Callable<Object> callable,
public List<String> backup(OutputStream out, Map<String, Object> options, final Callable<Object> callable,
final OCommandOutputListener iOutput, final int compressionLevel, final int bufferSize) throws IOException {
freeze(false);
try {
Expand All @@ -155,8 +156,8 @@ public void backup(OutputStream out, Map<String, Object> options, final Callable

final OutputStream bo = bufferSize > 0 ? new BufferedOutputStream(out, bufferSize) : out;
try {
OZIPCompressionUtil.compressDirectory(new File(getStoragePath()).getAbsolutePath(), bo, new String[] { ".wal" }, iOutput,
compressionLevel);
return OZIPCompressionUtil.compressDirectory(new File(getStoragePath()).getAbsolutePath(), bo, new String[] { ".wal" },
iOutput, compressionLevel);
} finally {
if (bufferSize > 0) {
bo.flush();
Expand Down Expand Up @@ -264,9 +265,12 @@ protected void postDeleteSteps() {
} else
return;

OLogManager.instance().debug(this,
"Cannot delete database files because they are still locked by the OrientDB process: waiting %d ms and retrying %d/%d...",
DELETE_WAIT_TIME, i, DELETE_MAX_RETRIES);
OLogManager
.instance()
.debug(
this,
"Cannot delete database files because they are still locked by the OrientDB process: waiting %d ms and retrying %d/%d...",
DELETE_WAIT_TIME, i, DELETE_MAX_RETRIES);
}

throw new OStorageException("Cannot delete database '" + name + "' located in: " + dbDir + ". Database files seem locked");
Expand Down Expand Up @@ -295,8 +299,8 @@ protected void initWalAndDiskCache() throws IOException {
writeAheadLog = null;

long diskCacheSize = OGlobalConfiguration.DISK_CACHE_SIZE.getValueAsLong() * 1024 * 1024;
long writeCacheSize = (long) Math
.floor((((double) OGlobalConfiguration.DISK_WRITE_CACHE_PART.getValueAsInteger()) / 100.0) * diskCacheSize);
long writeCacheSize = (long) Math.floor((((double) OGlobalConfiguration.DISK_WRITE_CACHE_PART.getValueAsInteger()) / 100.0)
* diskCacheSize);

final OWOWCache wowCache = new OWOWCache(false, OGlobalConfiguration.DISK_CACHE_PAGE_SIZE.getValueAsInteger() * ONE_KB,
OGlobalConfiguration.DISK_WRITE_CACHE_PAGE_TTL.getValueAsLong() * 1000, writeAheadLog,
Expand Down
Expand Up @@ -36,6 +36,7 @@
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Callable;

Expand Down Expand Up @@ -98,8 +99,7 @@ public void makeFullCheckpoint() throws IOException {
}

@Override
public void backup(OutputStream out, Map<String, Object> options, Callable<Object> callable, OCommandOutputListener iListener,
int compressionLevel, int bufferSize) throws IOException {
public List<String> backup(OutputStream out, Map<String, Object> options, Callable<Object> callable, OCommandOutputListener iListener, int compressionLevel, int bufferSize) throws IOException {
throw new UnsupportedOperationException();
}

Expand Down
Expand Up @@ -26,6 +26,7 @@
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Callable;

Expand Down Expand Up @@ -54,8 +55,7 @@ public interface OBackupable {
* @throws IOException
* @see ODatabaseExport
*/
void backup(OutputStream out, Map<String, Object> options, Callable<Object> callable, OCommandOutputListener iListener,
int compressionLevel, int bufferSize) throws IOException;
List<String> backup(OutputStream out, Map<String, Object> options, Callable<Object> callable, OCommandOutputListener iListener, int compressionLevel, int bufferSize) throws IOException;

/**
* Executes a restore of a database backup. During the restore the database will be frozen in read-only mode.
Expand Down
Expand Up @@ -72,7 +72,8 @@ public Void call() throws Exception {
checkRecord(database, i);
checkIndex(database, (String) person.field("name"), person.getIdentity());

Thread.sleep(delayWriter);
if (delayWriter > 0)
Thread.sleep(delayWriter);

} catch (InterruptedException e) {
System.out.println("Writer received interrupt (db=" + database.getURL());
Expand Down
Expand Up @@ -74,7 +74,8 @@ public Void call() throws Exception {
throw e;
}

Thread.sleep(delayWriter);
if (delayWriter > 0)
Thread.sleep(delayWriter);

} catch (InterruptedException e) {
System.out.println("Writer received interrupt (db=" + databaseUrl);
Expand Down
Expand Up @@ -90,7 +90,8 @@ public Void call() throws Exception {
if ((i + 1) % 100 == 0)
System.out.println("\nWriter " + database.getURL() + " managed " + (i + 1) + "/" + count + " records so far");

Thread.sleep(delayWriter);
if (delayWriter > 0)
Thread.sleep(delayWriter);

// OK
break;
Expand Down Expand Up @@ -132,7 +133,7 @@ public Void call() throws Exception {
private ODocument createRecord(ODatabaseDocumentTx database, int i) {
final String uniqueId = serverId + "-" + threadId + "-" + i;

// System.out.println("Creating person " + uniqueId);
// System.out.println("Creating person " + uniqueId);

ODocument person = new ODocument("Person").fields("id", UUID.randomUUID().toString(), "name", "Billy" + uniqueId, "surname",
"Mayes" + uniqueId, "birthday", new Date(), "children", uniqueId);
Expand Down Expand Up @@ -190,7 +191,9 @@ public Void call() throws Exception {
while (runningWriters.getCount() > 0) {
try {
printStats(databaseUrl);
Thread.sleep(delayReader);

if (delayReader > 0)
Thread.sleep(delayReader);

} catch (Exception e) {
break;
Expand Down

0 comments on commit 4102440

Please sign in to comment.