Skip to content

Commit

Permalink
Refactoring log file pruning to be more generic
Browse files Browse the repository at this point in the history
* We want to use this in core-edge for pruning the RAFT log but
  there we append entries rather than transactions. Appropriate
  classes are renamed accordingly.
  • Loading branch information
Mark Needham committed Feb 11, 2016
1 parent 4483902 commit 937c0de
Show file tree
Hide file tree
Showing 14 changed files with 280 additions and 255 deletions.
Expand Up @@ -604,7 +604,7 @@ public long getTimestampForVersion( long version ) throws IOException
}
};
final LogFileInformation logFileInformation =
new PhysicalLogFileInformation( logFiles, logHeaderCache, transactionIdStore, logInformation );
new PhysicalLogFileInformation( logFiles, logHeaderCache, transactionIdStore::getLastCommittedTransactionId, logInformation );

String pruningConf = config.get(
config.get( GraphDatabaseFacadeFactory.Configuration.ephemeral )
Expand Down
Expand Up @@ -24,25 +24,25 @@
public interface LogFileInformation
{
/**
* @return the reachable transaction that is farthest back of them all, in any existing version.
* @return the reachable entry that is farthest back of them all, in any existing version.
*/
long getFirstExistingTxId() throws IOException;
long getFirstExistingEntryId() throws IOException;

/**
* @param version the log version to get first committed tx for.
* @return the first committed transaction id for the log with {@code version}.
* @return the first committed entry id for the log with {@code version}.
* If that log doesn't exist -1 is returned.
*/
long getFirstCommittedTxId( long version ) throws IOException;
long getFirstEntryId( long version ) throws IOException;

/**
* @return the last committed transaction id for this Log
* @return the last committed entry id for this Log
*/
long getLastCommittedTxId();
long getLastEntryId();

/**
* @param version the log version to get first tx timestamp for.
* @return the timestamp for the start record for the first encountered transaction
* @param version the log version to get first entry timestamp for.
* @return the timestamp for the start record for the first encountered entry
* in the log {@code version}.
*/
long getFirstStartRecordTimestamp( long version ) throws IOException;
Expand Down
Expand Up @@ -28,30 +28,36 @@ public interface LogVersionToTimestamp
long getTimestampForVersion( long version ) throws IOException;
}


public interface LastEntryInLog
{
long getLastEntryId( );
}

private final PhysicalLogFiles logFiles;
private final LogHeaderCache logHeaderCache;
private final TransactionIdStore transactionIdStore;
private final LastEntryInLog lastEntryInLog;
private final LogVersionToTimestamp logVersionToTimestamp;

public PhysicalLogFileInformation( PhysicalLogFiles logFiles,
LogHeaderCache logHeaderCache,
TransactionIdStore transactionIdStore,
LastEntryInLog lastEntryInLog,
LogVersionToTimestamp logVersionToTimestamp )
{
this.logFiles = logFiles;
this.logHeaderCache = logHeaderCache;
this.transactionIdStore = transactionIdStore;
this.lastEntryInLog = lastEntryInLog;
this.logVersionToTimestamp = logVersionToTimestamp;
}

@Override
public long getFirstExistingTxId() throws IOException
public long getFirstExistingEntryId() throws IOException
{
long version = logFiles.getHighestLogVersion();
long candidateFirstTx = -1;
while ( logFiles.versionExists( version ) )
{
candidateFirstTx = getFirstCommittedTxId( version );
candidateFirstTx = getFirstEntryId( version );
version--;
}
version++; // the loop above goes back one version too far.
Expand All @@ -62,7 +68,7 @@ public long getFirstExistingTxId() throws IOException
}

@Override
public long getFirstCommittedTxId( long version ) throws IOException
public long getFirstEntryId( long version ) throws IOException
{
long logHeader = logHeaderCache.getLogHeader( version );
if ( logHeader != -1 )
Expand All @@ -81,9 +87,9 @@ public long getFirstCommittedTxId( long version ) throws IOException
}

@Override
public long getLastCommittedTxId()
public long getLastEntryId()
{
return transactionIdStore.getLastCommittedTransactionId();
return lastEntryInLog.getLastEntryId();
}

@Override
Expand Down
Expand Up @@ -25,11 +25,11 @@
import org.neo4j.kernel.impl.transaction.log.IllegalLogFormatException;
import org.neo4j.kernel.impl.transaction.log.LogFileInformation;

public final class TransactionCountThreshold implements Threshold
public final class EntryCountThreshold implements Threshold
{
private final long maxTransactionCount;

TransactionCountThreshold( long maxTransactionCount )
EntryCountThreshold( long maxTransactionCount )
{
this.maxTransactionCount = maxTransactionCount;
}
Expand All @@ -46,15 +46,15 @@ public boolean reached( File ignored, long version, LogFileInformation source )
try
{
// try to ask next version log file which is my last tx
long lastTx = source.getFirstCommittedTxId( version + 1 );
long lastTx = source.getFirstEntryId( version + 1 );
if ( lastTx == -1 )
{
throw new IllegalStateException(
"The next version should always exist, since this is called after rotation and the " +
"PruneStrategy never checks the current active log file" );
}

long highest = source.getLastCommittedTxId();
long highest = source.getLastEntryId();
return highest - lastTx >= maxTransactionCount;
}
catch ( IllegalLogFormatException e )
Expand Down
Expand Up @@ -27,14 +27,14 @@
import org.neo4j.kernel.impl.transaction.log.IllegalLogFormatException;
import org.neo4j.kernel.impl.transaction.log.LogFileInformation;

public final class TransactionTimespanThreshold implements Threshold
public final class EntryTimespanThreshold implements Threshold
{
private final long timeToKeepInMillis;
private final Clock clock;

private long lowerLimit;

TransactionTimespanThreshold( Clock clock, TimeUnit timeUnit, long timeToKeep )
EntryTimespanThreshold( Clock clock, TimeUnit timeUnit, long timeToKeep )
{
this.clock = clock;
this.timeToKeepInMillis = timeUnit.toMillis( timeToKeep );
Expand Down
Expand Up @@ -92,7 +92,7 @@ public static LogPruneStrategy fromConfigValue( FileSystemAbstraction fileSystem
case "true":
return NO_PRUNING;
case "false":
final TransactionCountThreshold thresholdToUse = new TransactionCountThreshold( 1 );
final EntryCountThreshold thresholdToUse = new EntryCountThreshold( 1 );
return new ThresholdBasedPruneStrategy( fileSystem, logFileInformation, files, thresholdToUse );
default:
throw new IllegalArgumentException( "Invalid log pruning configuration value '" + configValue +
Expand Down Expand Up @@ -121,17 +121,18 @@ static Threshold getThresholdByType( FileSystemAbstraction fileSystem, String ty
thresholdToUse = new FileSizeThreshold( fileSystem, thresholdValue );
break;
case "txs":
thresholdToUse = new TransactionCountThreshold( thresholdValue );
case "entries": // txs and entries are synonyms
thresholdToUse = new EntryCountThreshold( thresholdValue );
break;
case "hours":
thresholdToUse = new TransactionTimespanThreshold( Clock.SYSTEM_CLOCK, TimeUnit.HOURS, thresholdValue );
thresholdToUse = new EntryTimespanThreshold( Clock.SYSTEM_CLOCK, TimeUnit.HOURS, thresholdValue );
break;
case "days":
thresholdToUse = new TransactionTimespanThreshold( Clock.SYSTEM_CLOCK, TimeUnit.DAYS, thresholdValue );
thresholdToUse = new EntryTimespanThreshold( Clock.SYSTEM_CLOCK, TimeUnit.DAYS, thresholdValue );
break;
default:
throw new IllegalArgumentException( "Invalid log pruning configuration value '" + originalConfigValue +
"'. Invalid type '" + type + "', valid are files, size, txs, hours, days." );
"'. Invalid type '" + type + "', valid are files, size, txs, entries, hours, days." );
}
return thresholdToUse;
}
Expand Down
Expand Up @@ -53,6 +53,7 @@ public void rotateLogFile() throws IOException
};

/**
* Rotates the undelying log if it is required. Returns true if rotation happened, false otherwise
* @param logAppendEvent A trace event for the current log append operation.
*/
boolean rotateLogIfNeeded( LogAppendEvent logAppendEvent ) throws IOException;
Expand Down
Expand Up @@ -45,7 +45,7 @@ public class PhysicalLogFileInformationTest
@Test
public void shouldReadAndCacheFirstCommittedTransactionIdForAGivenVersionWhenNotCached() throws Exception
{
PhysicalLogFileInformation info = new PhysicalLogFileInformation( logFiles, logHeaderCache, transactionIdStore,
PhysicalLogFileInformation info = new PhysicalLogFileInformation( logFiles, logHeaderCache, transactionIdStore::getLastCommittedTransactionId,
logVersionToTimestamp );
long expected = 5;

Expand All @@ -56,29 +56,29 @@ public void shouldReadAndCacheFirstCommittedTransactionIdForAGivenVersionWhenNot
new LogHeader( (byte) -1/*ignored*/, -1L/*ignored*/, expected - 1L )
);

long firstCommittedTxId = info.getFirstCommittedTxId( version );
long firstCommittedTxId = info.getFirstEntryId( version );
assertEquals( expected, firstCommittedTxId );
verify( logHeaderCache, times( 1 ) ).putHeader( version, expected - 1 );
}

@Test
public void shouldReadFirstCommittedTransactionIdForAGivenVersionWhenCached() throws Exception
{
PhysicalLogFileInformation info = new PhysicalLogFileInformation( logFiles, logHeaderCache, transactionIdStore,
PhysicalLogFileInformation info = new PhysicalLogFileInformation( logFiles, logHeaderCache, transactionIdStore::getLastCommittedTransactionId,
logVersionToTimestamp );
long expected = 5;

long version = 10L;
when( logHeaderCache.getLogHeader( version ) ).thenReturn( expected - 1 );

long firstCommittedTxId = info.getFirstCommittedTxId( version );
long firstCommittedTxId = info.getFirstEntryId( version );
assertEquals( expected, firstCommittedTxId );
}

@Test
public void shouldReadAndCacheFirstCommittedTransactionIdWhenNotCached() throws Exception
{
PhysicalLogFileInformation info = new PhysicalLogFileInformation( logFiles, logHeaderCache, transactionIdStore,
PhysicalLogFileInformation info = new PhysicalLogFileInformation( logFiles, logHeaderCache, transactionIdStore::getLastCommittedTransactionId,
logVersionToTimestamp );
long expected = 5;

Expand All @@ -91,15 +91,15 @@ public void shouldReadAndCacheFirstCommittedTransactionIdWhenNotCached() throws
);
when( logFiles.hasAnyEntries( version ) ).thenReturn( true );

long firstCommittedTxId = info.getFirstExistingTxId();
long firstCommittedTxId = info.getFirstExistingEntryId();
assertEquals( expected, firstCommittedTxId );
verify( logHeaderCache, times( 1 ) ).putHeader( version, expected - 1 );
}

@Test
public void shouldReadFirstCommittedTransactionIdWhenCached() throws Exception
{
PhysicalLogFileInformation info = new PhysicalLogFileInformation( logFiles, logHeaderCache, transactionIdStore,
PhysicalLogFileInformation info = new PhysicalLogFileInformation( logFiles, logHeaderCache, transactionIdStore::getLastCommittedTransactionId,
logVersionToTimestamp );
long expected = 5;

Expand All @@ -109,21 +109,21 @@ public void shouldReadFirstCommittedTransactionIdWhenCached() throws Exception
when( logHeaderCache.getLogHeader( version ) ).thenReturn( expected -1 );
when( logFiles.hasAnyEntries( version ) ).thenReturn( true );

long firstCommittedTxId = info.getFirstExistingTxId();
long firstCommittedTxId = info.getFirstExistingEntryId();
assertEquals( expected, firstCommittedTxId );
}

@Test
public void shouldReturnNothingWhenThereAreNoTransactions() throws Exception
{
PhysicalLogFileInformation info = new PhysicalLogFileInformation( logFiles, logHeaderCache, transactionIdStore,
PhysicalLogFileInformation info = new PhysicalLogFileInformation( logFiles, logHeaderCache, transactionIdStore::getLastCommittedTransactionId,
logVersionToTimestamp );

long version = 10L;
when( logFiles.getHighestLogVersion() ).thenReturn( version );
when( logFiles.hasAnyEntries( version ) ).thenReturn( false );

long firstCommittedTxId = info.getFirstExistingTxId();
long firstCommittedTxId = info.getFirstExistingEntryId();
assertEquals( -1, firstCommittedTxId );
}
}
Expand Up @@ -102,7 +102,7 @@ private long countLogEntries()
filterNeostoreLogicalLog( fs, storeDir.getPath(), logicalLogCounter );

long txLogRecordCount = db.getDependencyResolver()
.resolveDependency( LogFileInformation.class ).getLastCommittedTxId();
.resolveDependency( LogFileInformation.class ).getLastEntryId();

return logicalLogCounter.getCount() + txLogRecordCount;
}
Expand Down

0 comments on commit 937c0de

Please sign in to comment.