Skip to content

Commit

Permalink
Issue #2029 1. Only single backup may be started simultaneously. 2. E…
Browse files Browse the repository at this point in the history
…rrors in calculation of LSN were fixed. 3. Minor improvement of backup speed.
  • Loading branch information
laa committed Oct 6, 2015
1 parent ff57e7f commit ddb51a3
Show file tree
Hide file tree
Showing 6 changed files with 104 additions and 47 deletions.
4 changes: 3 additions & 1 deletion core/src/main/java/com/orientechnologies/common/exception/OErrorCategory.java 100644 → 100755
Expand Up @@ -7,7 +7,9 @@ public enum OErrorCategory {

SQL_GENERIC(1),

SQL_PARSING(2);
SQL_PARSING(2),

STORAGE(3);

protected final int code;

Expand Down
@@ -1,18 +1,20 @@
package com.orientechnologies.common.exception;

import com.orientechnologies.common.util.OApi;
import com.orientechnologies.orient.core.exception.OBackupInProgressException;
import com.orientechnologies.orient.core.exception.OQueryParsingException;

import java.lang.reflect.InvocationTargetException;

/**
* @author Luigi Dell'Aquila
*/
@OApi(maturity= OApi.MATURITY.NEW)
@OApi(maturity = OApi.MATURITY.NEW)
public enum OErrorCode {

// eg.
QUERY_PARSE_ERROR(OErrorCategory.SQL_PARSING, 1, "query parse error", OQueryParsingException.class);
QUERY_PARSE_ERROR(OErrorCategory.SQL_PARSING, 1, "query parse error", OQueryParsingException.class), BACKUP_IN_PROGRESS(
OErrorCategory.STORAGE, 2, "You trying to start backup, but it is already in progress", OBackupInProgressException.class);

protected final OErrorCategory category;
protected final int code;
Expand Down
@@ -0,0 +1,18 @@
package com.orientechnologies.orient.core.exception;

import com.orientechnologies.common.exception.OErrorCode;
import com.orientechnologies.common.exception.OHighLevelException;

/**
* @author Andrey Lomakin <lomakin.andrey@gmail.com>.
* @since 10/5/2015
*/
public class OBackupInProgressException extends OCoreException implements OHighLevelException {
public OBackupInProgressException(OBackupInProgressException exception) {
super(exception);
}

public OBackupInProgressException(String message, String componentName, OErrorCode errorCode) {
super(message, componentName, errorCode);
}
}
Expand Up @@ -23,6 +23,7 @@
import com.orientechnologies.common.concur.lock.OLockManager;
import com.orientechnologies.common.concur.lock.OModificationOperationProhibitedException;
import com.orientechnologies.common.directmemory.ODirectMemoryPointer;
import com.orientechnologies.common.exception.OErrorCode;
import com.orientechnologies.common.exception.OException;
import com.orientechnologies.common.io.OIOUtils;
import com.orientechnologies.common.log.OLogManager;
Expand Down Expand Up @@ -52,13 +53,7 @@
import com.orientechnologies.orient.core.db.record.ORecordOperation;
import com.orientechnologies.orient.core.db.record.ridbag.sbtree.OSBTreeCollectionManager;
import com.orientechnologies.orient.core.db.record.ridbag.sbtree.OSBTreeCollectionManagerShared;
import com.orientechnologies.orient.core.exception.OCommandExecutionException;
import com.orientechnologies.orient.core.exception.OConcurrentModificationException;
import com.orientechnologies.orient.core.exception.OConfigurationException;
import com.orientechnologies.orient.core.exception.OFastConcurrentModificationException;
import com.orientechnologies.orient.core.exception.OLowDiskSpaceException;
import com.orientechnologies.orient.core.exception.ORecordNotFoundException;
import com.orientechnologies.orient.core.exception.OStorageException;
import com.orientechnologies.orient.core.exception.*;
import com.orientechnologies.orient.core.id.ORID;
import com.orientechnologies.orient.core.id.ORecordId;
import com.orientechnologies.orient.core.index.OIndexCursor;
Expand Down Expand Up @@ -168,6 +163,7 @@ public abstract class OAbstractPaginatedStorage extends OStorageAbstract impleme
private volatile boolean checkpointRequest = false;

private final int id;
private final AtomicBoolean backupInProgress = new AtomicBoolean(false);

private Map<String, OIndexEngine> indexEngineNameMap = new HashMap<String, OIndexEngine>();
private List<OIndexEngine> indexEngines = new ArrayList<OIndexEngine>();
Expand Down Expand Up @@ -2468,33 +2464,40 @@ public void incrementalBackup(final File backupDirectory) {
final File ibuFile = new File(backupDirectory, fileName);

rndIBUFile = new RandomAccessFile(ibuFile, "rw");
final FileChannel ibuChannel = rndIBUFile.getChannel();
try {
final FileChannel ibuChannel = rndIBUFile.getChannel();

ibuChannel.position(3 * OLongSerializer.LONG_SIZE + OByteSerializer.BYTE_SIZE);
ibuChannel.position(3 * OLongSerializer.LONG_SIZE + OByteSerializer.BYTE_SIZE);

final OLogSequenceNumber maxLsn = incrementalBackup(Channels.newOutputStream(ibuChannel), lastLsn);
final ByteBuffer dataBuffer = ByteBuffer.allocate(3 * OLongSerializer.LONG_SIZE + OByteSerializer.BYTE_SIZE);
final OLogSequenceNumber maxLsn = incrementalBackup(Channels.newOutputStream(ibuChannel), lastLsn);
final ByteBuffer dataBuffer = ByteBuffer.allocate(3 * OLongSerializer.LONG_SIZE + OByteSerializer.BYTE_SIZE);

dataBuffer.putLong(nextIndex);
dataBuffer.putLong(maxLsn.getSegment());
dataBuffer.putLong(maxLsn.getPosition());
dataBuffer.putLong(nextIndex);
dataBuffer.putLong(maxLsn.getSegment());
dataBuffer.putLong(maxLsn.getPosition());

if (lastLsn == null)
dataBuffer.put((byte) 1);
else
dataBuffer.put((byte) 0);
if (lastLsn == null)
dataBuffer.put((byte) 1);
else
dataBuffer.put((byte) 0);

dataBuffer.rewind();
dataBuffer.rewind();

ibuChannel.position(0);
ibuChannel.write(dataBuffer);
ibuChannel.position(0);
ibuChannel.write(dataBuffer);
} catch (RuntimeException e) {
rndIBUFile.close();

if (!ibuFile.delete()) {
OLogManager.instance().error(this, ibuFile.getAbsolutePath() + " is closed but can not be deleted");
}
}
} catch (IOException e) {
throw OException.wrapException(new OStorageException("Error during incremental backup"), e);
} finally {
try {
if (rndIBUFile != null)
rndIBUFile.close();

} catch (IOException e) {
throw OException.wrapException(new OStorageException("Error during incremental backup"), e);
}
Expand Down Expand Up @@ -2583,10 +2586,16 @@ private long extractIndexFromIBUFile(final File backupDirectory, final String fi
}

public OLogSequenceNumber incrementalBackup(final OutputStream stream, final OLogSequenceNumber fromLsn) throws IOException {
final OLogSequenceNumber lastLsn;
OLogSequenceNumber lastLsn;

checkOpeness();

if (!backupInProgress.compareAndSet(false, true)) {
throw new OBackupInProgressException(
"You are trying to start incremental backup but it is in progress now, please wait till it will be finished", getName(),
OErrorCode.BACKUP_IN_PROGRESS);
}

stateLock.acquireReadLock();
try {
checkOpeness();
Expand Down Expand Up @@ -2627,7 +2636,11 @@ public OLogSequenceNumber incrementalBackup(final OutputStream stream, final OLo
zipOutputStream.write(btConf);
zipOutputStream.closeEntry();

finalizeIncrementalBackup(zipOutputStream, startSegment);
final OLogSequenceNumber lastWALLsn = copyWALToIncrementalBackup(zipOutputStream, startSegment);

if (lastWALLsn != null && (lastLsn == null || lastWALLsn.compareTo(lastLsn) > 0)) {
lastLsn = lastWALLsn;
}
} finally {
writeAheadLog.preventCutTill(null);
}
Expand All @@ -2643,12 +2656,15 @@ public OLogSequenceNumber incrementalBackup(final OutputStream stream, final OLo
}
} finally {
stateLock.releaseReadLock();

backupInProgress.set(false);
}

return lastLsn;
}

protected abstract void finalizeIncrementalBackup(ZipOutputStream zipOutputStream, long startSegment) throws IOException;
protected abstract OLogSequenceNumber copyWALToIncrementalBackup(ZipOutputStream zipOutputStream, long startSegment)
throws IOException;

protected abstract boolean isWritesAllowedDuringBackup();

Expand Down Expand Up @@ -2823,7 +2839,7 @@ public void restoreFromIncrementalBackup(final InputStream inputStream, final bo
}

if (!writeCache.fileIdsAreEqual(expectedFileId, fileId))
throw new OStorageException("Can not restore database from backup because expected and actaul file ids are not the same");
throw new OStorageException("Can not restore database from backup because expected and actual file ids are not the same");

while (zipInputStream.available() > 0) {
final byte[] data = new byte[pageSize + OLongSerializer.LONG_SIZE];
Expand Down Expand Up @@ -2900,18 +2916,24 @@ public void restoreFromIncrementalBackup(final InputStream inputStream, final bo
readCache.deleteFile(fileId, writeCache);
}

if (maxLsn != null && writeAheadLog != null) {
writeAheadLog.moveLsnAfter(maxLsn);
}

final OWriteAheadLog restoreLog = createWalFromIBUFiles(walTempDir);
OLogSequenceNumber restoreLsn = null;

if (restoreLog != null) {
final OLogSequenceNumber beginLsn = restoreLog.begin();
restoreFrom(beginLsn, restoreLog);
restoreLsn = restoreFrom(beginLsn, restoreLog);

restoreLog.delete();
}

if (maxLsn != null && writeAheadLog != null) {
if (restoreLsn != null && restoreLsn.compareTo(maxLsn) > 0) {
maxLsn = restoreLsn;
}

writeAheadLog.moveLsnAfter(maxLsn);
}

if (walTempDir != null) {
if (!walTempDir.delete()) {
OLogManager.instance().error(this, "Can not remove temporary backup directory " + walTempDir.getAbsolutePath());
Expand Down Expand Up @@ -3161,7 +3183,7 @@ private void restoreIfNeeded() throws Exception {
if (isDirty()) {
OLogManager.instance().warn(this, "Storage " + name + " was not closed properly. Will try to restore from write ahead log.");
try {
wereDataRestoredAfterOpen = restoreFromWAL();
wereDataRestoredAfterOpen = restoreFromWAL() != null;
} catch (Exception e) {
OLogManager.instance().error(this, "Exception during storage data restore", e);
throw e;
Expand Down Expand Up @@ -3804,15 +3826,15 @@ private void checkClusterSegmentIndexRange(final int iClusterId) {
throw new IllegalArgumentException("Cluster segment #" + iClusterId + " does not exist in database '" + name + "'");
}

private boolean restoreFromWAL() throws IOException {
private OLogSequenceNumber restoreFromWAL() throws IOException {
if (writeAheadLog == null) {
OLogManager.instance().error(this, "Restore is not possible because write ahead logging is switched off.");
return true;
return null;
}

if (writeAheadLog.begin() == null) {
OLogManager.instance().error(this, "Restore is not possible because write ahead log is empty.");
return false;
return null;
}

OLogManager.instance().info(this, "Looking for last checkpoint...");
Expand Down Expand Up @@ -3927,7 +3949,7 @@ private boolean checkFuzzyCheckPointIsComplete(OLogSequenceNumber lastCheckPoint
return false;
}

private boolean restoreFromCheckPoint(OAbstractCheckPointStartRecord checkPointRecord) throws IOException {
private OLogSequenceNumber restoreFromCheckPoint(OAbstractCheckPointStartRecord checkPointRecord) throws IOException {
if (checkPointRecord instanceof OFuzzyCheckpointStartRecord) {
return restoreFromFuzzyCheckPoint((OFuzzyCheckpointStartRecord) checkPointRecord);
}
Expand All @@ -3939,15 +3961,15 @@ private boolean restoreFromCheckPoint(OAbstractCheckPointStartRecord checkPointR
throw new OStorageException("Unknown checkpoint record type " + checkPointRecord.getClass().getName());
}

private boolean restoreFromFullCheckPoint(OFullCheckpointStartRecord checkPointRecord) throws IOException {
private OLogSequenceNumber restoreFromFullCheckPoint(OFullCheckpointStartRecord checkPointRecord) throws IOException {
OLogManager.instance().info(this, "Data restore procedure from full checkpoint is started. Restore is performed from LSN %s",
checkPointRecord.getLsn());

final OLogSequenceNumber lsn = writeAheadLog.next(checkPointRecord.getLsn());
return restoreFrom(lsn, writeAheadLog);
}

private boolean restoreFromFuzzyCheckPoint(OFuzzyCheckpointStartRecord checkPointRecord) throws IOException {
private OLogSequenceNumber restoreFromFuzzyCheckPoint(OFuzzyCheckpointStartRecord checkPointRecord) throws IOException {
OLogManager.instance().info(this, "Data restore procedure from FUZZY checkpoint is started.");
OLogSequenceNumber flushedLsn = checkPointRecord.getFlushedLsn();

Expand All @@ -3957,15 +3979,16 @@ private boolean restoreFromFuzzyCheckPoint(OFuzzyCheckpointStartRecord checkPoin
return restoreFrom(flushedLsn, writeAheadLog);
}

private boolean restoreFromBegging() throws IOException {
private OLogSequenceNumber restoreFromBegging() throws IOException {
OLogManager.instance().info(this, "Data restore procedure is started.");
OLogSequenceNumber lsn = writeAheadLog.begin();

return restoreFrom(lsn, writeAheadLog);
}

private boolean restoreFrom(OLogSequenceNumber lsn, OWriteAheadLog writeAheadLog) throws IOException {
final OModifiableBoolean atLeastOnePageUpdate = new OModifiableBoolean(false);
private OLogSequenceNumber restoreFrom(OLogSequenceNumber lsn, OWriteAheadLog writeAheadLog) throws IOException {
OLogSequenceNumber logSequenceNumber = null;
OModifiableBoolean atLeastOnePageUpdate = new OModifiableBoolean();

long recordsProcessed = 0;

Expand All @@ -3974,6 +3997,8 @@ private boolean restoreFrom(OLogSequenceNumber lsn, OWriteAheadLog writeAheadLog

try {
while (lsn != null) {
logSequenceNumber = lsn;

OWALRecord walRecord = writeAheadLog.read(lsn);

if (walRecord instanceof OAtomicUnitEndRecord) {
Expand Down Expand Up @@ -4035,7 +4060,10 @@ private boolean restoreFrom(OLogSequenceNumber lsn, OWriteAheadLog writeAheadLog
backUpWAL(e);
}

return atLeastOnePageUpdate.getValue();
if (atLeastOnePageUpdate.getValue())
return logSequenceNumber;

return null;
}

private void backUpWAL(Exception e) {
Expand Down
Expand Up @@ -43,6 +43,7 @@
import com.orientechnologies.orient.core.storage.impl.local.OStorageConfigurationSegment;
import com.orientechnologies.orient.core.storage.impl.local.OStorageVariableParser;
import com.orientechnologies.orient.core.storage.impl.local.paginated.wal.ODiskWriteAheadLog;
import com.orientechnologies.orient.core.storage.impl.local.paginated.wal.OLogSequenceNumber;
import com.orientechnologies.orient.core.storage.impl.local.paginated.wal.OWriteAheadLog;

import java.io.*;
Expand Down Expand Up @@ -193,14 +194,15 @@ public void restore(InputStream in, Map<String, Object> options, final Callable<
}

@Override
protected void finalizeIncrementalBackup(ZipOutputStream zipOutputStream, long startSegment) throws IOException {
protected OLogSequenceNumber copyWALToIncrementalBackup(ZipOutputStream zipOutputStream, long startSegment) throws IOException {
File[] nonActiveSegments = writeAheadLog.nonActiveSegments(startSegment);

int n = 0;
int i = 0;
boolean newSegment = false;

long freezeId = -1;
OLogSequenceNumber lastLSN = null;

try {
while (true) {
Expand Down Expand Up @@ -252,6 +254,8 @@ protected void finalizeIncrementalBackup(ZipOutputStream zipOutputStream, long s
"Incremental backup in progress");
}

lastLSN = writeAheadLog.end();

writeAheadLog.newSegment();
newSegment = true;

Expand All @@ -266,6 +270,7 @@ protected void finalizeIncrementalBackup(ZipOutputStream zipOutputStream, long s
getAtomicOperationsManager().releaseAtomicOperations(freezeId);
}

return lastLSN;
}

@Override
Expand Down
Expand Up @@ -26,6 +26,7 @@
import com.orientechnologies.orient.core.storage.impl.local.OAbstractPaginatedStorage;
import com.orientechnologies.orient.core.storage.impl.local.paginated.OPaginatedCluster;
import com.orientechnologies.orient.core.storage.impl.local.paginated.OStorageMemoryConfiguration;
import com.orientechnologies.orient.core.storage.impl.local.paginated.wal.OLogSequenceNumber;
import com.orientechnologies.orient.core.storage.impl.local.paginated.wal.OMemoryWriteAheadLog;
import com.orientechnologies.orient.core.storage.impl.local.paginated.wal.OWriteAheadLog;

Expand Down Expand Up @@ -98,7 +99,8 @@ public void restore(InputStream in, Map<String, Object> options, Callable<Object
}

@Override
protected void finalizeIncrementalBackup(ZipOutputStream zipOutputStream, long startSegment) throws IOException {
protected OLogSequenceNumber copyWALToIncrementalBackup(ZipOutputStream zipOutputStream, long startSegment) throws IOException {
return null;
}

@Override
Expand Down

0 comments on commit ddb51a3

Please sign in to comment.