Skip to content

Commit

Permalink
Not finished transactions are rolled back
Browse files Browse the repository at this point in the history
  • Loading branch information
laa committed Jan 19, 2018
1 parent 8383ef1 commit 08d5724
Show file tree
Hide file tree
Showing 13 changed files with 502 additions and 159 deletions.
Expand Up @@ -102,6 +102,7 @@
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.atomic.LongAdder;
import java.util.concurrent.locks.Lock;
import java.util.zip.ZipEntry;
import java.util.zip.ZipOutputStream;
Expand Down Expand Up @@ -215,7 +216,7 @@ public abstract class OAbstractPaginatedStorage extends OStorageAbstract
private final List<OIndexEngine> indexEngines = new ArrayList<>();
private boolean wereDataRestoredAfterOpen = false;

private volatile long fullCheckpointCount;
private final LongAdder fullCheckpointCount = new LongAdder();

private final AtomicLong recordCreated = new AtomicLong(0);
private final AtomicLong recordUpdated = new AtomicLong(0);
Expand Down Expand Up @@ -905,17 +906,16 @@ public void onException(Throwable e) {
* record, only if record is not deleted - 4 bytes</li> <li>Binary presentation of the record, only if record is not deleted -
* length of content is provided in above entity</li> </ol>
*
* @param lsn LSN from which we should find changed records
* @param stream Stream which will contain found records
* @param excludedClusterIds Array of cluster ids to exclude from the export
* @param lsn LSN from which we should find changed records
* @param stream Stream which will contain found records
*
* @return Last LSN processed during examination of changed records, or <code>null</code> if it was impossible to find changed
* records: write ahead log is absent, record with start LSN was not found in WAL, etc.
*
* @see OGlobalConfiguration#STORAGE_TRACK_CHANGED_RECORDS_IN_WAL
*/
public OLogSequenceNumber recordsChangedAfterLSN(final OLogSequenceNumber lsn, final OutputStream stream,
final Set<String> excludedClusterIds, final OCommandOutputListener outputListener) {
final OCommandOutputListener outputListener) {
try {
if (!getConfiguration().getContextConfiguration()
.getValueAsBoolean(OGlobalConfiguration.STORAGE_TRACK_CHANGED_RECORDS_IN_WAL))
Expand Down Expand Up @@ -997,10 +997,7 @@ public OLogSequenceNumber recordsChangedAfterLSN(final OLogSequenceNumber lsn, f
final ORecordOperationMetadata recordOperationMetadata = (ORecordOperationMetadata) atomicUnitEndRecord
.getAtomicOperationMetadata().get(ORecordOperationMetadata.RID_METADATA_KEY);
final Set<ORID> rids = recordOperationMetadata.getValue();
for (ORID rid : rids) {
//if(includeClusterNames.contains(getPhysicalClusterNameById(rid.getClusterId())))
sortedRids.add(rid);
}
sortedRids.addAll(rids);
}
}

Expand Down Expand Up @@ -1108,8 +1105,8 @@ public OLogSequenceNumber recordsChangedAfterLSN(final OLogSequenceNumber lsn, f
*
* @see OGlobalConfiguration#STORAGE_TRACK_CHANGED_RECORDS_IN_WAL
*/
public Set<ORecordId> recordsChangedRecently(final int maxEntries, final Set<String> includeClusterNames) {
final SortedSet<ORecordId> result = new TreeSet<ORecordId>();
public Set<ORecordId> recordsChangedRecently(final int maxEntries) {
final SortedSet<ORecordId> result = new TreeSet<>();

try {
if (!OGlobalConfiguration.STORAGE_TRACK_CHANGED_RECORDS_IN_WAL.getValueAsBoolean())
Expand Down Expand Up @@ -1158,7 +1155,7 @@ public Set<ORecordId> recordsChangedRecently(final int maxEntries, final Set<Str
OLogSequenceNumber currentLsn = startLsn;

// KEEP LAST MAX-ENTRIES TRANSACTIONS' LSN
final List<OLogSequenceNumber> lastTx = new LinkedList<OLogSequenceNumber>();
final List<OLogSequenceNumber> lastTx = new LinkedList<>();
while (currentLsn != null && endLsn.compareTo(currentLsn) >= 0) {
walRecord = writeAheadLog.read(currentLsn);

Expand All @@ -1182,7 +1179,6 @@ public Set<ORecordId> recordsChangedRecently(final int maxEntries, final Set<Str
.getAtomicOperationMetadata().get(ORecordOperationMetadata.RID_METADATA_KEY);
final Set<ORID> rids = recordOperationMetadata.getValue();
for (ORID rid : rids) {
//if (includeClusterNames.contains(getPhysicalClusterNameById(rid.getClusterId())))
result.add((ORecordId) rid);
}
}
Expand Down Expand Up @@ -3987,15 +3983,15 @@ protected void makeFullCheckpoint() {
throw OException.wrapException(new OStorageException("Error during checkpoint creation for storage " + name), ioe);
}

fullCheckpointCount++;
fullCheckpointCount.increment();
} finally {
if (statistic != null)
statistic.stopFullCheckpointTimer();
}
}

public long getFullCheckpointCount() {
return fullCheckpointCount;
return fullCheckpointCount.sum();
}

protected void preOpenSteps() throws IOException {
Expand Down Expand Up @@ -4981,11 +4977,14 @@ private OLogSequenceNumber restoreFromFullCheckPoint(OFullCheckpointStartRecord
}

private OLogSequenceNumber restoreFromFuzzyCheckPoint(OFuzzyCheckpointStartRecord checkPointRecord) throws IOException {
OLogManager.instance().info(this, "Data restore procedure from FUZZY checkpoint is started.");
OLogManager.instance().infoNoDb(this, "Data restore procedure from FUZZY checkpoint is started.");
OLogSequenceNumber flushedLsn = checkPointRecord.getFlushedLsn();

if (flushedLsn.compareTo(writeAheadLog.begin()) < 0)
if (flushedLsn.compareTo(writeAheadLog.begin()) < 0) {
OLogManager.instance().errorNoDb(this,
"Fuzzy checkpoint points to removed part of the log, " + "will try to restore data from the rest of the WAL", null);
flushedLsn = writeAheadLog.begin();
}

return restoreFrom(flushedLsn, writeAheadLog);
}
Expand Down Expand Up @@ -5035,36 +5034,54 @@ protected OLogSequenceNumber restoreFrom(OLogSequenceNumber lsn, OWriteAheadLog
} else if (walRecord instanceof OOperationUnitRecord) {
OOperationUnitRecord operationUnitRecord = (OOperationUnitRecord) walRecord;

// in case of data restore from fuzzy checkpoint part of operations may be already flushed to the disk
List<OWALRecord> operationList = operationUnits
.computeIfAbsent(operationUnitRecord.getOperationUnitId(), k -> new ArrayList<>());
List<OWALRecord> operationList = operationUnits.get(operationUnitRecord.getOperationUnitId());

if (operationList == null || operationList.isEmpty()) {
OLogManager.instance().errorNoDb(this, "'Start transaction' record is absent for atomic operation", null);

if (operationList == null) {
operationList = new ArrayList<>();
operationUnits.put(operationUnitRecord.getOperationUnitId(), operationList);
}
}

operationList.add(operationUnitRecord);
} else if (walRecord instanceof ONonTxOperationPerformedWALRecord) {
if (!wereNonTxOperationsPerformedInPreviousOpen) {
OLogManager.instance().warn(this, "Non tx operation was used during data modification we will need index rebuild.");
OLogManager.instance().warnNoDb(this, "Non tx operation was used during data modification we will need index rebuild.");
wereNonTxOperationsPerformedInPreviousOpen = true;
}
} else
OLogManager.instance().warn(this, "Record %s will be skipped during data restore", walRecord);
OLogManager.instance().warnNoDb(this, "Record %s will be skipped during data restore", walRecord);

recordsProcessed++;

final long currentTime = System.currentTimeMillis();
if (reportBatchSize > 0 && recordsProcessed % reportBatchSize == 0
|| currentTime - lastReportTime > WAL_RESTORE_REPORT_INTERVAL) {
OLogManager.instance().info(this, "%d operations were processed, current LSN is %s last LSN is %s", recordsProcessed, lsn,
writeAheadLog.end());
OLogManager.instance()
.infoNoDb(this, "%d operations were processed, current LSN is %s last LSN is %s", recordsProcessed, lsn,
writeAheadLog.end());
lastReportTime = currentTime;
}

lsn = writeAheadLog.next(lsn);
}

if (!operationUnits.isEmpty()) {
OLogManager.instance()
.infoNoDb(this, "There are %d unfinished atomic operations left, they will be rolled back", operationUnits.size());

for (List<OWALRecord> atomicOperation : operationUnits.values()) {
revertAtomicUnit(atomicOperation, atLeastOnePageUpdate);
}
}
} catch (OWALPageBrokenException e) {
OLogManager.instance()
.error(this, "Data restore was paused because broken WAL page was found. The rest of changes will be rolled back.", e);
.errorNoDb(this, "Data restore was paused because broken WAL page was found. The rest of changes will be rolled back.",
e);
} catch (RuntimeException e) {
OLogManager.instance().error(this,
OLogManager.instance().errorNoDb(this,
"Data restore was paused because of exception. The rest of changes will be rolled back and WAL files will be backed up."
+ " Please report issue about this exception to bug tracker and provide WAL files which are backed up in 'wal_backup' directory.",
e);
Expand Down Expand Up @@ -5149,6 +5166,70 @@ private void archiveEntry(ZipOutputStream archiveZipOutputStream, String walSegm
}
}

private void revertAtomicUnit(List<OWALRecord> atomicUnit, OModifiableBoolean atLeastOnePageUpdate) throws IOException {
final ListIterator<OWALRecord> recordsIterator = atomicUnit.listIterator(atomicUnit.size());

while (recordsIterator.hasPrevious()) {
final OWALRecord record = recordsIterator.previous();

if (record instanceof OFileDeletedWALRecord) {
OLogManager.instance().infoNoDb(this, "Deletion of file can not be rolled back");
} else if (record instanceof OFileCreatedWALRecord) {
final OFileCreatedWALRecord fileCreatedWALRecord = (OFileCreatedWALRecord) record;
OLogManager.instance().infoNoDb(this, "File %s is going to be deleted", fileCreatedWALRecord.getFileName());

if (writeCache.exists(fileCreatedWALRecord.getFileId())) {
readCache.deleteFile(fileCreatedWALRecord.getFileId(), writeCache);
OLogManager.instance().infoNoDb(this, "File %s was deleted", fileCreatedWALRecord.getFileName());
} else {
OLogManager.instance().infoNoDb(this, "File %d is absent and can not be deleted", fileCreatedWALRecord.getFileName());
}
} else if (record instanceof OUpdatePageRecord) {
final OUpdatePageRecord updatePageRecord = (OUpdatePageRecord) record;

long fileId = updatePageRecord.getFileId();
if (!writeCache.exists(fileId)) {
final String fileName = writeCache.restoreFileById(fileId);

throw new OStorageException("File with id " + fileId + " and name " + fileName
+ " was deleted from storage, the rest of operations can not be restored");

}

final long pageIndex = updatePageRecord.getPageIndex();
fileId = writeCache.externalFileId(writeCache.internalFileId(fileId));

OCacheEntry cacheEntry = readCache.loadForWrite(fileId, pageIndex, true, writeCache, 1, false);
if (cacheEntry == null) {
//page may not exist because it was not flushed that is OK, we just go forward
continue;
}

try {
final ODurablePage durablePage = new ODurablePage(cacheEntry);
durablePage.rollbackChanges(updatePageRecord.getChanges());
durablePage.setLsn(updatePageRecord.getPrevLsn());
} finally {
readCache.releaseFromWrite(cacheEntry, writeCache);
}

atLeastOnePageUpdate.setValue(true);
} else if (record instanceof OAtomicUnitStartRecord) {
//noinspection UnnecessaryContinue
continue;
} else if (record instanceof OAtomicUnitEndRecord) {
//noinspection UnnecessaryContinue
continue;
} else {
OLogManager.instance()
.errorNoDb(this, "Invalid WAL record type was passed %s. Given record will be skipped.", null, record.getClass());

assert false : "Invalid WAL record type was passed " + record.getClass().getName();
}
}

}

@SuppressWarnings("WeakerAccess")
protected void restoreAtomicUnit(List<OWALRecord> atomicUnit, OModifiableBoolean atLeastOnePageUpdate) throws IOException {
assert atomicUnit.get(atomicUnit.size() - 1) instanceof OAtomicUnitEndRecord;
Expand Down Expand Up @@ -5306,6 +5387,7 @@ private void checkLowDiskSpaceRequestsAndReadOnlyConditions() {
}
}

@SuppressWarnings("unused")
public void setStorageConfigurationUpdateListener(OStorageConfigurationUpdateListener storageConfigurationUpdateListener) {
this.getConfiguration().setConfigurationUpdateListener(storageConfigurationUpdateListener);
}
Expand Down
Expand Up @@ -113,6 +113,8 @@ public OCacheEntry loadPage(long fileId, long pageIndex, boolean checkPinnedPage
} else {
OCacheEntry delegate = readCache.loadForRead(fileId, pageIndex, checkPinnedPages, writeCache, pageCount, true);
pageChangesContainer.delegate = delegate;
pageChangesContainer.originalLsn = ODurablePage
.getLogSequenceNumberFromPage(delegate.getCachePointer().getSharedBuffer());
return pageChangesContainer;
}
}
Expand Down Expand Up @@ -162,7 +164,7 @@ public void pinPage(OCacheEntry cacheEntry) {
pageChangesContainer.pinPage = true;
}

public OCacheEntry addPage(long fileId) throws IOException {
public OCacheEntry addPage(long fileId) {
fileId = checkFileIdCompatibility(fileId, storageId);

if (deletedFiles.contains(fileId))
Expand Down Expand Up @@ -198,7 +200,7 @@ public void releasePage(OCacheEntry cacheEntry) {
}
}

public long filledUpTo(long fileId) throws IOException {
public long filledUpTo(long fileId) {
fileId = checkFileIdCompatibility(fileId, storageId);

if (deletedFiles.contains(fileId))
Expand Down Expand Up @@ -357,8 +359,8 @@ else if (fileChanges.truncate)
final long pageIndex = filePageChangesEntry.getKey();
final OCacheEntryChanges filePageChanges = filePageChangesEntry.getValue();

filePageChanges.lsn = writeAheadLog
.log(new OUpdatePageRecord(pageIndex, fileId, operationUnitId, filePageChanges.changes));
filePageChanges.lsn = writeAheadLog.log(
new OUpdatePageRecord(pageIndex, fileId, operationUnitId, filePageChanges.changes, filePageChanges.originalLsn));
} else
filePageChangesIterator.remove();
}
Expand Down
Expand Up @@ -11,6 +11,11 @@
*/
public class OCacheEntryChanges implements OCacheEntry {

/**
* Value of LSN of page before it was modified. LSN of new page equals to (0, 0)
*/
OLogSequenceNumber originalLsn = new OLogSequenceNumber(0, 0);

OCacheEntry delegate;
final OWALChanges changes = new OWALPageChangesPortion();
OLogSequenceNumber lsn = null;
Expand All @@ -20,8 +25,9 @@ public class OCacheEntryChanges implements OCacheEntry {
public OCacheEntryChanges(OCacheEntry entry) {
delegate = entry;
}
public OCacheEntryChanges(){

@SuppressWarnings("WeakerAccess")
public OCacheEntryChanges() {
}

@Override
Expand Down
Expand Up @@ -98,6 +98,7 @@ public static OLogSequenceNumber getLogSequenceNumberFromPage(ByteBuffer buffer)
* @param offset Offset of data inside page
* @param length Length of data to be copied
*/
@SuppressWarnings("unused")
public static void getPageData(ByteBuffer buffer, byte[] data, int offset, int length) {
buffer.position(0);
buffer.get(data, offset, length);
Expand All @@ -111,6 +112,7 @@ public static void getPageData(ByteBuffer buffer, byte[] data, int offset, int l
* @param offset Offset inside of byte array from which LSN value will be read.
* @param data Byte array from which LSN value will be read.
*/
@SuppressWarnings("unused")
public static OLogSequenceNumber getLogSequenceNumber(int offset, byte[] data) {
final long segment = OLongSerializer.INSTANCE.deserializeNative(data, offset + WAL_SEGMENT_OFFSET);
final long position = OLongSerializer.INSTANCE.deserializeNative(data, offset + WAL_POSITION_OFFSET);
Expand Down Expand Up @@ -290,6 +292,17 @@ public void restoreChanges(OWALChanges changes) {
cacheEntry.markDirty();
}

public void rollbackChanges(OWALChanges changes) {
assert cacheEntry.getCachePointer().getSharedBuffer() == null || cacheEntry.isLockAcquiredByCurrentThread();

final ByteBuffer buffer = cacheEntry.getCachePointer().getExclusiveBuffer();

buffer.position(0);
changes.applyOriginalValues(buffer);

cacheEntry.markDirty();
}

public void setLsn(OLogSequenceNumber lsn) {
assert cacheEntry.getCachePointer().getSharedBuffer() == null || cacheEntry.isLockAcquiredByCurrentThread();

Expand Down

0 comments on commit 08d5724

Please sign in to comment.