Skip to content

Commit

Permalink
Decouples PhysicalLogFile from transaction specific concepts
Browse files Browse the repository at this point in the history
TransactionMetadataCache is split into transaction specific cache
 and log header cache, which follows better the way it is used.
PhysicalLogFile needs a LogHeaderCache and a last committed id
 provider, which removes all dependence on transaction concepts.

This is in preparation for using PhysicalLogFile to implement a new
 version of the RaftLog.
  • Loading branch information
digitalstain committed Feb 4, 2016
1 parent 499ca6c commit 25d67db
Show file tree
Hide file tree
Showing 25 changed files with 243 additions and 170 deletions.
Expand Up @@ -95,6 +95,7 @@
import org.neo4j.kernel.impl.transaction.log.LogFile;
import org.neo4j.kernel.impl.transaction.log.LogFileInformation;
import org.neo4j.kernel.impl.transaction.log.LogFileRecoverer;
import org.neo4j.kernel.impl.transaction.log.LogHeaderCache;
import org.neo4j.kernel.impl.transaction.log.LogPosition;
import org.neo4j.kernel.impl.transaction.log.LogVersionRepository;
import org.neo4j.kernel.impl.transaction.log.LogVersionedStoreChannel;
Expand Down Expand Up @@ -571,13 +572,15 @@ private TransactionLogModule buildTransactionLogs(
SynchronizedArrayIdOrderingQueue legacyIndexTransactionOrdering,
TransactionIdStore transactionIdStore, LogVersionRepository logVersionRepository )
{
TransactionMetadataCache transactionMetadataCache = new TransactionMetadataCache( 1000, 100_000 );
TransactionMetadataCache transactionMetadataCache = new TransactionMetadataCache( 100_000 );
LogHeaderCache logHeaderCache = new LogHeaderCache( 1000 );
final PhysicalLogFiles logFiles = new PhysicalLogFiles( storeDir, PhysicalLogFile.DEFAULT_NAME,
fileSystemAbstraction );

final PhysicalLogFile logFile = life.add( new PhysicalLogFile( fileSystemAbstraction, logFiles,
config.get( GraphDatabaseSettings.logical_log_rotation_threshold ), transactionIdStore,
logVersionRepository, physicalLogMonitor, transactionMetadataCache ) );
config.get( GraphDatabaseSettings.logical_log_rotation_threshold ),
transactionIdStore::getLastCommittedTransactionId, logVersionRepository, physicalLogMonitor,
logHeaderCache ) );

final PhysicalLogFileInformation.LogVersionToTimestamp
logInformation = new PhysicalLogFileInformation.LogVersionToTimestamp()
Expand All @@ -601,7 +604,7 @@ public long getTimestampForVersion( long version ) throws IOException
}
};
final LogFileInformation logFileInformation =
new PhysicalLogFileInformation( logFiles, transactionMetadataCache, transactionIdStore, logInformation );
new PhysicalLogFileInformation( logFiles, logHeaderCache, transactionIdStore, logInformation );

String pruningConf = config.get(
config.get( GraphDatabaseFacadeFactory.Configuration.ephemeral )
Expand Down
Expand Up @@ -566,8 +566,7 @@ public void initialize( CountsAccessor.Updater updater )
@Override
public long initialVersion()
{
return ((MetaDataStore) getOrCreateStore( StoreType.META_DATA ))
.getLastCommittedTransactionId();
return ((MetaDataStore) getOrCreateStore( StoreType.META_DATA )).getLastCommittedTransactionId();
}
} );

Expand Down
@@ -0,0 +1,48 @@
/*
* Copyright (c) 2002-2016 "Neo Technology,"
* Network Engine for Objects in Lund AB [http://neotechnology.com]
*
* This file is part of Neo4j.
*
* Neo4j is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
package org.neo4j.kernel.impl.transaction.log;

import org.neo4j.helpers.collection.LruCache;

public class LogHeaderCache
{
private final LruCache<Long /*log version*/, Long /*last committed tx*/> logHeaderCache;

public LogHeaderCache( int headerCacheSize )
{
this.logHeaderCache = new LruCache<>( "Log header cache", headerCacheSize );
}

public void clear()
{
logHeaderCache.clear();
}

public void putHeader( long logVersion, long previousLogLastCommittedTx )
{
logHeaderCache.put( logVersion, previousLogLastCommittedTx );
}

public long getLogHeader( long logVersion )
{
Long value = logHeaderCache.get( logVersion );
return value == null ? -1 : value;
}
}
Expand Up @@ -23,6 +23,7 @@
import java.io.FileNotFoundException;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.function.Supplier;

import org.neo4j.helpers.Exceptions;
import org.neo4j.io.fs.FileSystemAbstraction;
Expand Down Expand Up @@ -59,9 +60,9 @@ public void opened( File logFile, long logVersion, long lastTransactionId, boole
public static final String REGEX_DEFAULT_VERSION_SUFFIX = "\\.";
private final long rotateAtSize;
private final FileSystemAbstraction fileSystem;
private final TransactionIdStore transactionIdStore;
private final Supplier<Long> lastCommittedId;
private final PhysicalLogFiles logFiles;
private final TransactionMetadataCache transactionMetadataCache;
private final LogHeaderCache logHeaderCache;
private final Monitor monitor;
private final ByteBuffer headerBuffer = ByteBuffer.allocate( LOG_HEADER_SIZE );
private PositionAwarePhysicalFlushableChannel writer;
Expand All @@ -71,16 +72,16 @@ public void opened( File logFile, long logVersion, long lastTransactionId, boole
private volatile PhysicalLogVersionedStoreChannel channel;

public PhysicalLogFile( FileSystemAbstraction fileSystem, PhysicalLogFiles logFiles, long rotateAtSize,
TransactionIdStore transactionIdStore,
LogVersionRepository logVersionRepository, Monitor monitor,
TransactionMetadataCache transactionMetadataCache )
Supplier<Long> lastCommittedId, LogVersionRepository logVersionRepository,
Monitor monitor, LogHeaderCache logHeaderCache
)
{
this.fileSystem = fileSystem;
this.rotateAtSize = rotateAtSize;
this.transactionIdStore = transactionIdStore;
this.lastCommittedId = lastCommittedId;
this.logVersionRepository = logVersionRepository;
this.monitor = monitor;
this.transactionMetadataCache = transactionMetadataCache;
this.logHeaderCache = logHeaderCache;
this.logFiles = logFiles;
this.readerLogVersionBridge = new ReaderLogVersionBridge( fileSystem, logFiles );
}
Expand Down Expand Up @@ -170,9 +171,9 @@ private PhysicalLogVersionedStoreChannel openLogChannelForVersion( long forVersi
if ( header == null )
{
// Either the header is not there in full or the file was new. Don't care
long lastTxId = transactionIdStore.getLastCommittedTransactionId();
long lastTxId = lastCommittedId.get();
writeLogHeader( headerBuffer, forVersion, lastTxId );
transactionMetadataCache.putHeader( forVersion, lastTxId );
logHeaderCache.putHeader( forVersion, lastTxId );
storeChannel.writeAll( headerBuffer );
monitor.opened( toOpen, forVersion, lastTxId, true );
}
Expand Down Expand Up @@ -252,15 +253,15 @@ public void accept( LogHeaderVisitor visitor ) throws IOException
{
// Start from the where we're currently at and go backwards in time (versions)
long logVersion = logFiles.getHighestLogVersion();
long highTransactionId = transactionIdStore.getLastCommittedTransactionId();
long highTransactionId = lastCommittedId.get();
while ( logFiles.versionExists( logVersion ) )
{
long previousLogLastTxId = transactionMetadataCache.getLogHeader( logVersion );
long previousLogLastTxId = logHeaderCache.getLogHeader( logVersion );
if ( previousLogLastTxId == -1 )
{
LogHeader header = readLogHeader( fileSystem, logFiles.getLogFileForVersion( logVersion ) );
assert logVersion == header.logVersion;
transactionMetadataCache.putHeader( header.logVersion, header.lastCommittedTxId );
logHeaderCache.putHeader( header.logVersion, header.lastCommittedTxId );
previousLogLastTxId = header.lastCommittedTxId;
}

Expand Down
Expand Up @@ -29,17 +29,17 @@ public interface LogVersionToTimestamp
}

private final PhysicalLogFiles logFiles;
private final TransactionMetadataCache transactionMetadataCache;
private final LogHeaderCache logHeaderCache;
private final TransactionIdStore transactionIdStore;
private final LogVersionToTimestamp logVersionToTimestamp;

public PhysicalLogFileInformation( PhysicalLogFiles logFiles,
TransactionMetadataCache transactionMetadataCache,
LogHeaderCache logHeaderCache,
TransactionIdStore transactionIdStore,
LogVersionToTimestamp logVersionToTimestamp )
{
this.logFiles = logFiles;
this.transactionMetadataCache = transactionMetadataCache;
this.logHeaderCache = logHeaderCache;
this.transactionIdStore = transactionIdStore;
this.logVersionToTimestamp = logVersionToTimestamp;
}
Expand All @@ -64,7 +64,7 @@ public long getFirstExistingTxId() throws IOException
@Override
public long getFirstCommittedTxId( long version ) throws IOException
{
long logHeader = transactionMetadataCache.getLogHeader( version );
long logHeader = logHeaderCache.getLogHeader( version );
if ( logHeader != -1 )
{ // It existed in cache
return logHeader + 1;
Expand All @@ -74,7 +74,7 @@ public long getFirstCommittedTxId( long version ) throws IOException
if ( logFiles.versionExists( version ) )
{
long previousVersionLastCommittedTx = logFiles.extractHeader( version ).lastCommittedTxId;
transactionMetadataCache.putHeader( version, previousVersionLastCommittedTx );
logHeaderCache.putHeader( version, previousVersionLastCommittedTx );
return previousVersionLastCommittedTx + 1;
}
return -1;
Expand Down
Expand Up @@ -46,11 +46,13 @@ public ReadOnlyTransactionStore( PageCache pageCache, FileSystemAbstraction fs,
throws IOException
{
PhysicalLogFiles logFiles = new PhysicalLogFiles( fromPath, fs );
TransactionMetadataCache transactionMetadataCache = new TransactionMetadataCache( 10, 100 );
TransactionMetadataCache transactionMetadataCache = new TransactionMetadataCache( 100 );
LogHeaderCache logHeaderCache = new LogHeaderCache( 10 );
final ReadOnlyTransactionIdStore transactionIdStore = new ReadOnlyTransactionIdStore( pageCache, fromPath );
PhysicalLogFile logFile = life.add( new PhysicalLogFile( fs, logFiles, 0,
transactionIdStore, new ReadOnlyLogVersionRepository( pageCache, fromPath ),
monitors.newMonitor( PhysicalLogFile.Monitor.class ), transactionMetadataCache ) );
transactionIdStore::getLastCommittedTransactionId,
new ReadOnlyLogVersionRepository( pageCache, fromPath ),
monitors.newMonitor( PhysicalLogFile.Monitor.class ), logHeaderCache ) );
LogEntryReader<ReadableClosablePositionAwareChannel> logEntryReader =
new VersionAwareLogEntryReader<>( new RecordStorageCommandReaderFactory() );
physicalStore = new PhysicalLogicalTransactionStore( logFile, transactionMetadataCache, logEntryReader );
Expand Down
Expand Up @@ -24,31 +24,17 @@
public class TransactionMetadataCache
{
private final LruCache<Long /*tx id*/, TransactionMetadata> txStartPositionCache;
private final LruCache<Long /*log version*/, Long /*last committed tx*/> logHeaderCache;

public TransactionMetadataCache( int headerCacheSize, int transactionCacheSize )
public TransactionMetadataCache( int transactionCacheSize )
{
this.logHeaderCache = new LruCache<>( "Log header cache", headerCacheSize );
this.txStartPositionCache = new LruCache<>( "Tx start position cache", transactionCacheSize );
}

public void clear()
{
logHeaderCache.clear();
txStartPositionCache.clear();
}

public void putHeader( long logVersion, long previousLogLastCommittedTx )
{
logHeaderCache.put( logVersion, previousLogLastCommittedTx );
}

public long getLogHeader( long logVersion )
{
Long value = logHeaderCache.get( logVersion );
return value == null ? -1 : value;
}

public TransactionMetadata getTransactionMetadata( long txId )
{
return txStartPositionCache.get( txId );
Expand Down
Expand Up @@ -35,6 +35,7 @@
import org.neo4j.kernel.impl.storageengine.impl.recordstorage.RecordStorageCommandReaderFactory;
import org.neo4j.kernel.impl.transaction.DeadSimpleLogVersionRepository;
import org.neo4j.kernel.impl.transaction.DeadSimpleTransactionIdStore;
import org.neo4j.kernel.impl.transaction.log.LogHeaderCache;
import org.neo4j.kernel.impl.transaction.log.LogPosition;
import org.neo4j.kernel.impl.transaction.log.LogPositionMarker;
import org.neo4j.kernel.impl.transaction.log.LogVersionRepository;
Expand Down Expand Up @@ -177,8 +178,8 @@ public void recoveryRequired()
}
}, monitor ) );

life.add( new PhysicalLogFile( fs, logFiles, 50, transactionIdStore, logVersionRepository,
mock( PhysicalLogFile.Monitor.class ), new TransactionMetadataCache( 10, 100 ) ) );
life.add( new PhysicalLogFile( fs, logFiles, 50, transactionIdStore::getLastCommittedTransactionId,
logVersionRepository, mock( PhysicalLogFile.Monitor.class ), new LogHeaderCache( 10 ) ) );

life.start();

Expand Down Expand Up @@ -247,8 +248,8 @@ public void recoveryRequired()
}
}, monitor ));

life.add( new PhysicalLogFile( fs, logFiles, 50, transactionIdStore, logVersionRepository, mock( PhysicalLogFile.Monitor.class),
new TransactionMetadataCache( 10, 100 )) );
life.add( new PhysicalLogFile( fs, logFiles, 50, transactionIdStore::getLastCommittedTransactionId,
logVersionRepository, mock( PhysicalLogFile.Monitor.class), new LogHeaderCache( 10 ) ) );

life.start();

Expand Down
Expand Up @@ -19,7 +19,6 @@
*/
package org.neo4j.kernel.api;

import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
Expand All @@ -30,31 +29,13 @@
import java.util.Collection;

import org.neo4j.function.ThrowingConsumer;
import org.neo4j.helpers.FakeClock;
import org.neo4j.kernel.api.exceptions.TransactionFailureException;
import org.neo4j.kernel.api.txstate.LegacyIndexTransactionState;
import org.neo4j.kernel.api.txstate.TransactionState;
import org.neo4j.kernel.impl.api.KernelStatement;
import org.neo4j.kernel.impl.api.KernelTransactionImplementation;
import org.neo4j.kernel.impl.api.KernelTransactions;
import org.neo4j.kernel.impl.api.TransactionCommitProcess;
import org.neo4j.kernel.impl.api.TransactionHeaderInformation;
import org.neo4j.kernel.impl.api.TransactionHooks;
import org.neo4j.kernel.impl.api.TransactionToApply;
import org.neo4j.kernel.impl.api.store.StoreStatement;
import org.neo4j.kernel.impl.locking.NoOpClient;
import org.neo4j.kernel.impl.store.MetaDataStore;
import org.neo4j.kernel.impl.store.NeoStores;
import org.neo4j.kernel.impl.transaction.TransactionHeaderInformationFactory;
import org.neo4j.kernel.impl.transaction.TransactionMonitor;
import org.neo4j.kernel.impl.transaction.TransactionRepresentation;
import org.neo4j.kernel.impl.transaction.command.Command;
import org.neo4j.kernel.impl.transaction.tracing.CommitEvent;
import org.neo4j.kernel.impl.transaction.tracing.TransactionTracer;
import org.neo4j.storageengine.api.StorageCommand;
import org.neo4j.storageengine.api.StorageEngine;
import org.neo4j.storageengine.api.StoreReadLayer;
import org.neo4j.storageengine.api.TransactionApplicationMode;
import org.neo4j.storageengine.api.lock.ResourceLocker;
import org.neo4j.test.DoubleLatch;

Expand Down
Expand Up @@ -27,7 +27,6 @@
import org.neo4j.graphdb.Transaction;
import org.neo4j.graphdb.index.Index;
import org.neo4j.kernel.GraphDatabaseAPI;
import org.neo4j.kernel.impl.store.NeoStores;
import org.neo4j.kernel.impl.transaction.log.TransactionIdStore;
import org.neo4j.test.DatabaseRule;
import org.neo4j.test.ImpermanentDatabaseRule;
Expand Down
Expand Up @@ -238,7 +238,7 @@ public void testCreateStore() throws Exception
{
nodeIds[i] = nextId( Node.class );
transaction.nodeDoCreate( nodeIds[i] );
nodeAddProperty( nodeIds[i], index( "nisse" ), new Integer( 10 - i ) );
nodeAddProperty( nodeIds[i], index( "nisse" ), 10 - i );
}
for ( int i = 0; i < 2; i++ )
{
Expand Down

0 comments on commit 25d67db

Please sign in to comment.