Skip to content

Commit

Permalink
Issue #29, automatic client rollback, writing discard.txt if server i…
Browse files Browse the repository at this point in the history
…s older.
  • Loading branch information
bluestreak01 committed Feb 13, 2015
1 parent 88fd54e commit 387de24
Show file tree
Hide file tree
Showing 43 changed files with 1,602 additions and 1,569 deletions.
675 changes: 349 additions & 326 deletions nfsdb-core/src/main/java/com/nfsdb/Journal.java

Large diffs are not rendered by default.

195 changes: 115 additions & 80 deletions nfsdb-core/src/main/java/com/nfsdb/JournalWriter.java
Original file line number Diff line number Diff line change
Expand Up @@ -25,22 +25,34 @@
import com.nfsdb.concurrent.TimerCache;
import com.nfsdb.exceptions.JournalException;
import com.nfsdb.exceptions.JournalRuntimeException;
import com.nfsdb.export.FlexBufferSink;
import com.nfsdb.export.RecordSourcePrinter;
import com.nfsdb.factory.configuration.Constants;
import com.nfsdb.factory.configuration.JournalMetadata;
import com.nfsdb.iterators.ConcurrentIterator;
import com.nfsdb.iterators.MergingIterator;
import com.nfsdb.iterators.PeekingIterator;
import com.nfsdb.lang.cst.impl.jsrc.JournalSourceImpl;
import com.nfsdb.lang.cst.impl.psrc.JournalTailPartitionSource;
import com.nfsdb.lang.cst.impl.qry.Record;
import com.nfsdb.lang.cst.impl.qry.RecordSource;
import com.nfsdb.lang.cst.impl.rsrc.AllRowSource;
import com.nfsdb.locks.Lock;
import com.nfsdb.locks.LockManager;
import com.nfsdb.logging.Logger;
import com.nfsdb.tx.*;
import com.nfsdb.tx.Tx;
import com.nfsdb.tx.TxListener;
import com.nfsdb.tx.TxLog;
import com.nfsdb.utils.Dates;
import com.nfsdb.utils.Files;
import com.nfsdb.utils.Interval;
import com.nfsdb.utils.Rows;

import java.io.File;
import java.io.FileFilter;
import java.io.IOException;
import java.io.RandomAccessFile;
import java.nio.channels.FileChannel;
import java.util.Arrays;
import java.util.Iterator;
import java.util.List;
Expand All @@ -57,7 +69,6 @@ public class JournalWriter<T> extends Journal<T> {
private final JournalEntryWriterImpl journalEntryWriter;
private Lock writeLock;
private TxListener txListener;
private TxAsyncListener txAsyncListener;
private boolean txActive = false;
private int txPartitionIndex = -1;
private long appendTimestampLo = -1;
Expand Down Expand Up @@ -200,6 +211,16 @@ public JournalMode getMode() {
return JournalMode.APPEND;
}

@Override
public int hashCode() {
return getKey().hashCode();
}

@Override
public boolean equals(Object o) {
return this == o || !(o == null || getClass() != o.getClass()) && getKey().equals(((Journal) o).getKey());
}

@Override
void closePartitions() {
super.closePartitions();
Expand All @@ -215,7 +236,7 @@ protected void configure() throws JournalException {
throw new JournalException("Journal is already open for APPEND at %s", getLocation());
}
if (txLog.isEmpty()) {
commit(Tx.TX_NORMAL);
commit(Tx.TX_NORMAL, 0L, 0L);
}
txLog.head(tx);

Expand Down Expand Up @@ -243,34 +264,21 @@ protected void configure() throws JournalException {

}

@Override
public boolean equals(Object o) {
return this == o || !(o == null || getClass() != o.getClass()) && getKey().equals(((Journal) o).getKey());
}

@Override
public int hashCode() {
return getKey().hashCode();
}

public void commit() throws JournalException {
commit(false);
commit(false, -1L, -1L);
}

public TxFuture commitAsync() throws JournalException {
TxFuture future = null;
public void commit(boolean force, long txn, long txPin) throws JournalException {
if (txActive) {
commit(Tx.TX_NORMAL);
if (txAsyncListener != null) {
future = txAsyncListener.onCommitAsync();
}
commit(force ? Tx.TX_FORCE : Tx.TX_NORMAL, txn, txPin);
notifyTxListener();
expireOpenFiles();
txActive = false;
}
return future;
}

public void commitDurable() throws JournalException {
commit(true);
commit(true, -1L, -1L);
}

public void compact() throws JournalException {
Expand Down Expand Up @@ -564,8 +572,11 @@ public boolean accept(File f) {

for (int i = 0; i < files.length; i++) {

if (!txLog.isEmpty() && files[i].getName().equals(txLog.head(tx).lagName)) {
continue;
if (!txLog.isEmpty()) {
txLog.head(tx);
if (files[i].getName().equals(tx.lagName)) {
continue;
}
}

// get exclusive lock
Expand Down Expand Up @@ -600,51 +611,13 @@ public void removeIrregularPartition() {

public void rollback() throws JournalException {
if (txActive) {
rollback(txLog.headAddress());
rollback0(txLog.getCurrentTxAddress(), false);
txActive = false;
}
}

public void rollback(long address) throws JournalException {

txLog.get(address, tx);

if (tx.address == 0) {
throw new JournalException("Invalid transaction address");
}
// partitions need to be dealt with first to make sure new lag is assigned a correct partitionIndex
rollbackPartitions(tx);

Partition<T> lag = getIrregularPartition();
if (tx.lagName != null && tx.lagName.length() > 0 && (lag == null || !tx.lagName.equals(lag.getName()))) {
Partition<T> newLag = createTempPartition(tx.lagName);
setIrregularPartition(newLag);
newLag.applyTx(tx.lagSize, tx.lagIndexPointers);
} else if (lag != null && tx.lagName == null) {
removeIrregularPartitionInternal();
} else if (lag != null) {
lag.truncate(tx.lagSize);
}


if (tx.symbolTableSizes.length == 0) {
for (int i = 0, sz = getSymbolTableCount(); i < sz; i++) {
getSymbolTable(i).truncate();
}
} else {
for (int i = 0, sz = getSymbolTableCount(); i < sz; i++) {
getSymbolTable(i).truncate(tx.symbolTableSizes[i]);
}
}
appendTimestampLo = -1;
appendTimestampHi = -1;
appendPartition = null;
txLog.setTxAddress(tx.address);
txActive = false;
}

public void setTxAsyncListener(TxAsyncListener txAsyncListener) {
this.txAsyncListener = txAsyncListener;
public void rollback(long txn, long txPin) throws JournalException {
rollback0(txLog.findAddress(txn, txPin), true);
}

public void setTxListener(TxListener txListener) {
Expand All @@ -670,23 +643,16 @@ public void truncate() throws JournalException {
commitDurable();
}

private void commit(boolean force) throws JournalException {
if (txActive) {
commit(force ? Tx.TX_FORCE : Tx.TX_NORMAL);
notifyTxListener();
expireOpenFiles();
txActive = false;
}
}

private void commit(byte command) throws JournalException {
private void commit(byte command, long txn, long txPin) throws JournalException {
boolean force = command == Tx.TX_FORCE;
Partition<T> partition = lastNonEmptyNonLag();
Partition<T> lag = getIrregularPartition();

tx.command = command;
tx.prevTxAddress = txLog.getTxAddress();
tx.journalMaxRowID = partition == null ? 0 : Rows.toRowID(partition.getPartitionIndex(), partition.size());
tx.txn = txn;
tx.txPin = txPin;
tx.prevTxAddress = txLog.getCurrentTxAddress();
tx.journalMaxRowID = partition == null ? -1 : Rows.toRowID(partition.getPartitionIndex(), partition.size());
tx.lastPartitionTimestamp = partition == null || partition.getInterval() == null ? 0 : partition.getInterval().getLo();
tx.lagSize = lag == null ? 0 : lag.open().size();
tx.lagName = lag == null ? null : lag.getName();
Expand Down Expand Up @@ -725,7 +691,7 @@ private void commit(byte command) throws JournalException {
lag.getIndexPointers(tx.lagIndexPointers);
}

txLog.create(tx);
txLog.write(tx, txn != -1);
if (force) {
txLog.force();
}
Expand Down Expand Up @@ -762,6 +728,53 @@ private void replaceIrregularPartition(Partition<T> temp) {
purgeTempPartitions();
}

private void rollback0(long address, boolean writeDiscard) throws JournalException {

if (address == -1L) {
throw new JournalException("Wrong txn/txPin. Incompatible journals?");
}
txLog.read(address, tx);

if (tx.address == 0) {
throw new JournalException("Invalid transaction address");
}

if (writeDiscard) {
LOGGER.info("Journal %s is rolling back to transaction #%d, timestamp %s", metadata.getId(), tx.txn, Dates.toString(tx.timestamp));
writeDiscardFile(tx.journalMaxRowID);
}

// partitions need to be dealt with first to make sure new lag is assigned a correct partitionIndex
rollbackPartitions(tx);

Partition<T> lag = getIrregularPartition();
if (tx.lagName != null && tx.lagName.length() > 0 && (lag == null || !tx.lagName.equals(lag.getName()))) {
Partition<T> newLag = createTempPartition(tx.lagName);
setIrregularPartition(newLag);
newLag.applyTx(tx.lagSize, tx.lagIndexPointers);
} else if (lag != null && tx.lagName == null) {
removeIrregularPartitionInternal();
} else if (lag != null) {
lag.truncate(tx.lagSize);
}


if (tx.symbolTableSizes.length == 0) {
for (int i = 0, sz = getSymbolTableCount(); i < sz; i++) {
getSymbolTable(i).truncate();
}
} else {
for (int i = 0, sz = getSymbolTableCount(); i < sz; i++) {
getSymbolTable(i).truncate(tx.symbolTableSizes[i]);
}
}
appendTimestampLo = -1;
appendTimestampHi = -1;
appendPartition = null;
txLog.writeTxAddress(tx.address);
txActive = false;
}

private void rollbackPartitionDirs() throws JournalException {
File[] files = getLocation().listFiles(new FileFilter() {
public boolean accept(File f) {
Expand All @@ -778,12 +791,12 @@ public boolean accept(File f) {
}

private void rollbackPartitions(Tx tx) throws JournalException {
int partitionIndex = Rows.toPartitionIndex(tx.journalMaxRowID);
int partitionIndex = tx.journalMaxRowID == -1 ? 0 : Rows.toPartitionIndex(tx.journalMaxRowID);
for (Iterator<Partition<T>> it = partitions.iterator(); it.hasNext(); ) {
Partition<T> partition = it.next();
if (partition.getPartitionIndex() == partitionIndex) {
partition.open();
partition.truncate(Rows.toLocalRowID(tx.journalMaxRowID));
partition.truncate(tx.journalMaxRowID == -1 ? 0 : Rows.toLocalRowID(tx.journalMaxRowID));
} else if (partition.getPartitionIndex() > partitionIndex) {
it.remove();
partition.close();
Expand Down Expand Up @@ -845,4 +858,26 @@ void updateTsLo(long ts) {
appendTimestampLo = ts;
}
}

private void writeDiscardFile(long rowid) throws JournalException {

File f = new File(metadata.getLocation(), "discard.txt");
RecordSource<? extends Record> rs = new JournalSourceImpl(
new JournalTailPartitionSource(this, false, rowid)
, new AllRowSource()
);


try (RandomAccessFile raf = new RandomAccessFile(f, "rw")) {
try (FileChannel channel = raf.getChannel()) {
try (FlexBufferSink sink = new FlexBufferSink(channel.position(channel.size()), 1024 * 1024)) {
RecordSourcePrinter printer = new RecordSourcePrinter(sink);
printer.print(rs);
LOGGER.info("Discarded records are appended to %s", f);
}
}
} catch (IOException e) {
throw new JournalException(e);
}
}
}
31 changes: 1 addition & 30 deletions nfsdb-core/src/main/java/com/nfsdb/collections/Lists.java
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2014-2015. Vlad Ilyushchenko
* Copyright (c) 2014. Vlad Ilyushchenko
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -16,7 +16,6 @@

package com.nfsdb.collections;

import java.util.Iterator;
import java.util.List;

public final class Lists {
Expand All @@ -29,32 +28,4 @@ public static void advance(List<?> list, int index) {
list.add(null);
}
}

public static <T> ImmutableIteratorWrapper<T> immutableIterator(Iterable<T> source) {
return new ImmutableIteratorWrapper<>(source);
}

public static class ImmutableIteratorWrapper<T> extends AbstractImmutableIterator<T> {
private final Iterable<T> source;
private Iterator<T> underlying;

public ImmutableIteratorWrapper(Iterable<T> source) {
this.source = source;
}

@Override
public boolean hasNext() {
return underlying.hasNext();
}

@Override
public T next() {
return underlying.next();
}

public ImmutableIteratorWrapper<T> reset() {
underlying = source.iterator();
return this;
}
}
}

0 comments on commit 387de24

Please sign in to comment.