Skip to content

Commit

Permalink
Cleanups around id buffering
Browse files Browse the repository at this point in the history
 * added tests here and there
 * completed and removed couple TODOs
 * remove unused classes and revert not needed changes
  • Loading branch information
lutovich authored and burqen committed Jun 30, 2016
1 parent 07167b9 commit 1a2361b
Show file tree
Hide file tree
Showing 10 changed files with 164 additions and 105 deletions.
Expand Up @@ -291,13 +291,8 @@ private void readFromStore( AbstractDynamicStore store, AbstractDynamicStore.Dyn
buffer = newBuffer;
}
buffer.put( data, 0, data.length );
Thread.sleep( 1 ); // TODO: Remove
}
}
catch ( InterruptedException e )
{
throw new RuntimeException( e );
}
}

private ByteBuffer newBiggerBuffer( int requiredCapacity )
Expand Down
Expand Up @@ -113,13 +113,6 @@ public CommunityEditionModule( PlatformModule platformModule )
registerRecovery( config.get( GraphDatabaseFacadeFactory.Configuration.editionName), life, dependencies );

publishEditionInfo( dependencies.resolveDependency( UsageData.class ) );

eligibleForIdReuse = createEligibleForIdReuseFilter();
}

protected IdReuseEligibility createEligibleForIdReuseFilter()
{
return IdReuseEligibility.ALWAYS;
}

protected ConstraintSemantics createSchemaRuleVerifier()
Expand Down
Expand Up @@ -660,7 +660,6 @@ public void transactionCommitted( long transactionId, long checksum, long commit
setRecord( Position.LAST_TRANSACTION_ID, transactionId );
setRecord( Position.LAST_TRANSACTION_CHECKSUM, checksum );
setRecord( Position.LAST_TRANSACTION_COMMIT_TIMESTAMP, commitTimestamp );
// TODO: setRecords(...) to avoid multiple new pageCursor
}
}
}
Expand Down
Expand Up @@ -19,13 +19,15 @@
*/
package org.neo4j.kernel.impl.storemigration;

import org.apache.commons.lang3.StringUtils;

import java.io.BufferedOutputStream;
import java.io.BufferedReader;
import java.io.File;
import java.io.FileNotFoundException;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.OutputStream;
import java.io.Reader;
import java.io.Writer;
import java.util.ArrayList;
import java.util.Arrays;
Expand Down Expand Up @@ -130,6 +132,7 @@
public class StoreMigrator implements StoreMigrationParticipant
{
private static final String UTF8 = Charsets.UTF_8.name();
private static final char TX_LOG_COUNTERS_SEPARATOR = 'A';

// Developers: There is a benchmark, storemigrate-benchmark, that generates large stores and benchmarks
// the upgrade process. Please utilize that when writing upgrade code to ensure the code is fast enough to
Expand Down Expand Up @@ -170,11 +173,9 @@ public void migrate( File storeDir, File migrationDir, SchemaIndexProvider schem
// Extract information about the last transaction from legacy neostore
File neoStore = new File( storeDir, MetaDataStore.DEFAULT_NAME );
long lastTxId = MetaDataStore.getRecord( pageCache, neoStore, Position.LAST_TRANSACTION_ID );
// Checksum and timestamp
// TODO: Do we ever use this information during migration?
TransactionId lastTxInfo = extractTransactionIdInformation( neoStore, storeDir, lastTxId );
LogPosition lastTxLogPosition = extractTransactionLogPosition( neoStore, storeDir, lastTxId );
// Write the tx information to file in migrationDir, because we need it later when moving files into storeDir
// Write tx info to file in migrationDir, because we need it later when moveMigratedFiles into storeDir
writeLastTxInformation( migrationDir, lastTxInfo );
writeLastTxLogPosition( migrationDir, lastTxLogPosition );

Expand Down Expand Up @@ -209,48 +210,56 @@ public void migrate( File storeDir, File migrationDir, SchemaIndexProvider schem
progressMonitor.finished();
}

private void writeLastTxInformation( File migrationDir, TransactionId txInfo ) throws IOException
void writeLastTxInformation( File migrationDir, TransactionId txInfo ) throws IOException
{
try ( Writer writer = fileSystem.openAsWriter( lastTxInformationFile( migrationDir), UTF8, false ) )
{
// TODO: Use same splitter as for writeLastTxPosition?
writer.write( txInfo.transactionId() + "A" + txInfo.checksum() + "A" + txInfo.commitTimestamp() );
}
writeTxLogCounters( fileSystem, lastTxInformationFile( migrationDir ),
txInfo.transactionId(), txInfo.checksum(), txInfo.commitTimestamp() );
}

private void writeLastTxLogPosition( File migrationDir, LogPosition lastTxLogPosition ) throws IOException
void writeLastTxLogPosition( File migrationDir, LogPosition lastTxLogPosition ) throws IOException
{
try ( Writer writer = fileSystem.openAsWriter( lastTxLogPositionFile( migrationDir ), UTF8, false ) )
{
writer.write( lastTxLogPosition.getLogVersion() + "A" + lastTxLogPosition.getByteOffset() );
}
writeTxLogCounters( fileSystem, lastTxLogPositionFile( migrationDir ),
lastTxLogPosition.getLogVersion(), lastTxLogPosition.getByteOffset() );
}

// TODO: WRITE TESTS
// accessible for tests
static TransactionId readLastTxInformation( FileSystemAbstraction fileSystem, File migrationDir ) throws IOException
TransactionId readLastTxInformation( File migrationDir ) throws IOException
{
long[] counters = readTxLogCounters( fileSystem, lastTxInformationFile( migrationDir ), 3 );
return new TransactionId( counters[0], counters[1], counters[2] );
}

LogPosition readLastTxLogPosition( File migrationDir ) throws IOException
{
try ( Reader reader = fileSystem.openAsReader( lastTxInformationFile( migrationDir ), UTF8 ) )
long[] counters = readTxLogCounters( fileSystem, lastTxLogPositionFile( migrationDir ), 2 );
return new LogPosition( counters[0], counters[1] );
}

private static void writeTxLogCounters( FileSystemAbstraction fs, File file, long... counters ) throws IOException
{
try ( Writer writer = fs.openAsWriter( file, UTF8, false ) )
{
char[] buffer = new char[4096]; // TODO: Why so large buffer?
int chars = reader.read( buffer );
String s = String.valueOf( buffer, 0, chars );
String[] split = s.split( "A" );
return new TransactionId( Long.parseLong( split[0] ), Long.parseLong( split[1] ),
Long.parseLong( split[2] ) );
writer.write( StringUtils.join( counters, TX_LOG_COUNTERS_SEPARATOR ) );
}
}

// accessible for tests
static LogPosition readLastTxLogPosition( FileSystemAbstraction fileSystem, File migrationDir ) throws IOException
private static long[] readTxLogCounters( FileSystemAbstraction fs, File file, int numberOfCounters )
throws IOException
{
try ( Reader reader = fileSystem.openAsReader( lastTxLogPositionFile( migrationDir ), UTF8 ) )
try ( BufferedReader reader = new BufferedReader( fs.openAsReader( file, UTF8 ) ) )
{
char[] buffer = new char[4096]; // TODO: Why so large buffer?
int chars = reader.read( buffer );
String s = String.valueOf( buffer, 0, chars );
String[] split = s.split( "A" );
return new LogPosition( Long.parseLong( split[0] ), Long.parseLong( split[1] ) );
String line = reader.readLine();
String[] split = StringUtils.split( line, TX_LOG_COUNTERS_SEPARATOR );
if ( split.length != numberOfCounters )
{
throw new IllegalArgumentException( "Unexpected number of tx counters '" + numberOfCounters +
"', file contains: '" + line + "'" );
}
long[] counters = new long[numberOfCounters];
for ( int i = 0; i < split.length; i++ )
{
counters[i] = Long.parseLong( split[i] );
}
return counters;
}
}

Expand Down Expand Up @@ -878,7 +887,7 @@ private void updateOrAddNeoStoreFieldsAsPartOfMigration( File migrationDir, File
// problematic as long as we don't migrate and translate old logs.

// TODO: Is this what we want to do with txInfo and do we not need UPGRADE_TRANSACTION_COMMIT_TIMESTAMP?
TransactionId lastTxInfo = readLastTxInformation( fileSystem, migrationDir );
TransactionId lastTxInfo = readLastTxInformation( migrationDir );
// Checksum
MetaDataStore.setRecord( pageCache, storeDirNeoStore, Position.LAST_TRANSACTION_CHECKSUM,
lastTxInfo.checksum() );
Expand All @@ -889,7 +898,7 @@ private void updateOrAddNeoStoreFieldsAsPartOfMigration( File migrationDir, File

// add LAST_CLOSED_TRANSACTION_LOG_VERSION and LAST_CLOSED_TRANSACTION_LOG_BYTE_OFFSET to the migrated
// NeoStore
LogPosition logPosition = readLastTxLogPosition( fileSystem, migrationDir );
LogPosition logPosition = readLastTxLogPosition( migrationDir );
MetaDataStore.setRecord( pageCache, storeDirNeoStore, Position.LAST_CLOSED_TRANSACTION_LOG_VERSION, logPosition
.getLogVersion() );
MetaDataStore.setRecord( pageCache, storeDirNeoStore, Position.LAST_CLOSED_TRANSACTION_LOG_BYTE_OFFSET,
Expand Down
Expand Up @@ -117,7 +117,6 @@ public void migrateLogs( File storeDir, File migrationDir ) throws IOException
}
}

// TODO: TEST THIS
public TransactionId getTransactionInformation( File storeDir, long transactionId ) throws IOException
{
List<File> logFiles = Arrays.asList( fs.listFiles( storeDir, versionedLegacyLogFilesFilter ) );
Expand Down
Expand Up @@ -98,5 +98,4 @@ public long lastCommittedTransactionLogByteOffset()
return TransactionIdStore.BASE_TX_LOG_BYTE_OFFSET;
}
};

}
Expand Up @@ -52,8 +52,6 @@

import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import static org.neo4j.kernel.impl.storemigration.StoreMigrator.readLastTxInformation;
import static org.neo4j.kernel.impl.storemigration.StoreMigrator.readLastTxLogPosition;
import static org.neo4j.kernel.impl.transaction.log.TransactionIdStore.BASE_TX_LOG_BYTE_OFFSET;
import static org.neo4j.kernel.impl.transaction.log.TransactionIdStore.BASE_TX_LOG_VERSION;
import static org.neo4j.kernel.impl.transaction.log.TransactionIdStore.UNKNOWN_TX_COMMIT_TIMESTAMP;
Expand Down Expand Up @@ -215,7 +213,7 @@ public void shouldComputeTheLastTxLogPositionCorrectly() throws Throwable
migrator.migrate( storeDirectory, migrationDir, schemaIndexProvider, versionToMigrateFrom );

// THEN it should compute the correct last tx log position
assertEquals( expectedLogPosition, readLastTxLogPosition( fs, migrationDir ) );
assertEquals( expectedLogPosition, migrator.readLastTxLogPosition( migrationDir ) );
}

@Test
Expand All @@ -241,7 +239,6 @@ public void shouldComputeTheLastTxInfoCorrectly() throws Exception
migrator.migrate( storeDirectory, migrationDir, schemaIndexProvider, versionToMigrateFrom );

// then
System.out.println( readLastTxInformation( fs, migrationDir ) );
assertTrue( txIdComparator.apply( readLastTxInformation( fs, migrationDir ) ) );
assertTrue( txIdComparator.apply( migrator.readLastTxInformation( migrationDir ) ) );
}
}
Expand Up @@ -21,19 +21,25 @@

import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.RuleChain;

import java.io.File;
import java.io.IOException;

import org.neo4j.io.fs.DefaultFileSystemAbstraction;
import org.neo4j.io.fs.FileSystemAbstraction;
import org.neo4j.io.pagecache.PageCache;
import org.neo4j.kernel.configuration.Config;
import org.neo4j.kernel.impl.logging.LogService;
import org.neo4j.kernel.impl.logging.NullLogService;
import org.neo4j.kernel.impl.store.TransactionId;
import org.neo4j.kernel.impl.storemigration.legacylogs.LegacyLogs;
import org.neo4j.kernel.impl.storemigration.monitoring.MigrationProgressMonitor;
import org.neo4j.kernel.impl.storemigration.monitoring.SilentMigrationProgressMonitor;
import org.neo4j.kernel.impl.transaction.log.LogPosition;
import org.neo4j.kernel.impl.transaction.log.NoSuchTransactionException;
import org.neo4j.test.PageCacheRule;
import org.neo4j.test.RandomRule;
import org.neo4j.test.TargetDirectory;

import static org.junit.Assert.assertEquals;
Expand All @@ -50,11 +56,15 @@

public class StoreMigratorTest
{
private final FileSystemAbstraction fs = new DefaultFileSystemAbstraction();
private final TargetDirectory.TestDirectory directory = TargetDirectory.testDirForTest( getClass() );
private final PageCacheRule pageCacheRule = new PageCacheRule();
private final RandomRule random = new RandomRule();

@Rule
public final TargetDirectory.TestDirectory directory = TargetDirectory.testDirForTest( getClass() );
@Rule
public final PageCacheRule pageCacheRule = new PageCacheRule();
public final FileSystemAbstraction fs = new DefaultFileSystemAbstraction();
public final RuleChain ruleChain = RuleChain.outerRule( directory )
.around( pageCacheRule )
.around( random );

@Test
public void shouldExtractTransactionInformationFromMetaDataStore() throws Exception
Expand Down Expand Up @@ -159,4 +169,36 @@ public void shouldGenerateTransactionInformationAsLastOption() throws Exception
assertEquals( expected.commitTimestamp(), actual.commitTimestamp() );
// We do not expect checksum to be equal as it is randomly generated
}

@Test
public void writeAndReadLastTxInformation() throws IOException
{
StoreMigrator migrator = newStoreMigrator();
TransactionId writtenTxId = new TransactionId( random.nextLong(), random.nextLong(), random.nextLong() );

migrator.writeLastTxInformation( directory.graphDbDir(), writtenTxId );

TransactionId readTxId = migrator.readLastTxInformation( directory.graphDbDir() );

assertEquals( writtenTxId, readTxId );
}

@Test
public void writeAndReadLastTxLogPosition() throws IOException
{
StoreMigrator migrator = newStoreMigrator();
LogPosition writtenLogPosition = new LogPosition( random.nextLong(), random.nextLong() );

migrator.writeLastTxLogPosition( directory.graphDbDir(), writtenLogPosition );

LogPosition readLogPosition = migrator.readLastTxLogPosition( directory.graphDbDir() );

assertEquals( writtenLogPosition, readLogPosition );
}

private StoreMigrator newStoreMigrator()
{
return new StoreMigrator( new SilentMigrationProgressMonitor(), fs, pageCacheRule.getPageCache( fs ),
new Config(), NullLogService.getInstance() );
}
}

0 comments on commit 1a2361b

Please sign in to comment.