Skip to content

Commit

Permalink
Add additional capability to high limit 3.1 for 3 bytes rel types.
Browse files Browse the repository at this point in the history
Update way how batch importer retrieve record format.
Update tests
  • Loading branch information
MishaDemianenko committed Sep 13, 2016
1 parent 4fa5cfd commit f9933b1
Show file tree
Hide file tree
Showing 8 changed files with 54 additions and 36 deletions.
Expand Up @@ -52,11 +52,11 @@
import org.neo4j.graphdb.RelationshipType; import org.neo4j.graphdb.RelationshipType;
import org.neo4j.graphdb.Transaction; import org.neo4j.graphdb.Transaction;
import org.neo4j.graphdb.factory.GraphDatabaseSettings; import org.neo4j.graphdb.factory.GraphDatabaseSettings;
import org.neo4j.helpers.collection.MapUtil;
import org.neo4j.helpers.progress.ProgressMonitorFactory; import org.neo4j.helpers.progress.ProgressMonitorFactory;
import org.neo4j.io.fs.DefaultFileSystemAbstraction; import org.neo4j.io.fs.DefaultFileSystemAbstraction;
import org.neo4j.kernel.configuration.Config; import org.neo4j.kernel.configuration.Config;
import org.neo4j.kernel.impl.logging.NullLogService; import org.neo4j.kernel.impl.logging.NullLogService;
import org.neo4j.kernel.impl.store.format.RecordFormats;
import org.neo4j.kernel.impl.store.format.standard.StandardV3_0; import org.neo4j.kernel.impl.store.format.standard.StandardV3_0;
import org.neo4j.logging.NullLogProvider; import org.neo4j.logging.NullLogProvider;
import org.neo4j.test.Randoms; import org.neo4j.test.Randoms;
Expand Down Expand Up @@ -162,7 +162,7 @@ public void shouldImportCsvData() throws Exception
ExecutionMonitor processorAssigner = eagerRandomSaturation( config.maxNumberOfProcessors() ); ExecutionMonitor processorAssigner = eagerRandomSaturation( config.maxNumberOfProcessors() );
final BatchImporter inserter = new ParallelBatchImporter( directory.graphDbDir(), final BatchImporter inserter = new ParallelBatchImporter( directory.graphDbDir(),
new DefaultFileSystemAbstraction(), config, NullLogService.getInstance(), new DefaultFileSystemAbstraction(), config, NullLogService.getInstance(),
processorAssigner, EMPTY, new Config( MapUtil.stringMap( GraphDatabaseSettings.record_format.name(), getFormatName() ) ) ); processorAssigner, EMPTY, Config.empty(), getFormat() );


boolean successful = false; boolean successful = false;
IdGroupDistribution groups = new IdGroupDistribution( NODE_COUNT, 5, random.random() ); IdGroupDistribution groups = new IdGroupDistribution( NODE_COUNT, 5, random.random() );
Expand Down Expand Up @@ -234,9 +234,9 @@ private void assertConsistent( File storeDir ) throws ConsistencyCheckIncomplete
result.isSuccessful() ); result.isSuccessful() );
} }


protected String getFormatName() protected RecordFormats getFormat()
{ {
return StandardV3_0.NAME; return StandardV3_0.RECORD_FORMATS;
} }


public abstract static class InputIdGenerator public abstract static class InputIdGenerator
Expand Down
Expand Up @@ -37,6 +37,11 @@ public enum Capability
*/ */
DENSE_NODES( CapabilityType.FORMAT, CapabilityType.STORE ), DENSE_NODES( CapabilityType.FORMAT, CapabilityType.STORE ),


/**
* 3 bytes relationship type support
*/
RELATIONSHIP_TYPE_3BYTES( CapabilityType.FORMAT, CapabilityType.STORE ),

/** /**
* Store has version trailers in the end of cleanly shut down store * Store has version trailers in the end of cleanly shut down store
*/ */
Expand Down
Expand Up @@ -416,7 +416,7 @@ private void migrateWithBatchImporter( File storeDir, File migrationDir, long la
BatchImporter importer = new ParallelBatchImporter( migrationDir.getAbsoluteFile(), fileSystem, BatchImporter importer = new ParallelBatchImporter( migrationDir.getAbsoluteFile(), fileSystem,
importConfig, logService, importConfig, logService,
withDynamicProcessorAssignment( migrationBatchImporterMonitor( legacyStore, progressMonitor, withDynamicProcessorAssignment( migrationBatchImporterMonitor( legacyStore, progressMonitor,
importConfig ), importConfig ), additionalInitialIds, config ); importConfig ), importConfig ), additionalInitialIds, config, newFormat );
InputIterable<InputNode> nodes = InputIterable<InputNode> nodes =
legacyNodesAsInput( legacyStore, requiresPropertyMigration, nodeInputCursors ); legacyNodesAsInput( legacyStore, requiresPropertyMigration, nodeInputCursors );
InputIterable<InputRelationship> relationships = InputIterable<InputRelationship> relationships =
Expand Down
Expand Up @@ -58,7 +58,6 @@
import static java.lang.Math.max; import static java.lang.Math.max;
import static java.lang.String.format; import static java.lang.String.format;
import static java.lang.System.currentTimeMillis; import static java.lang.System.currentTimeMillis;

import static org.neo4j.helpers.Format.bytes; import static org.neo4j.helpers.Format.bytes;
import static org.neo4j.helpers.collection.Iterators.asSet; import static org.neo4j.helpers.collection.Iterators.asSet;
import static org.neo4j.io.ByteUnit.mebiBytes; import static org.neo4j.io.ByteUnit.mebiBytes;
Expand Down Expand Up @@ -90,6 +89,7 @@ public class ParallelBatchImporter implements BatchImporter
private final ExecutionMonitor executionMonitor; private final ExecutionMonitor executionMonitor;
private final AdditionalInitialIds additionalInitialIds; private final AdditionalInitialIds additionalInitialIds;
private final Config dbConfig; private final Config dbConfig;
private final RecordFormats recordFormats;


/** /**
* Advanced usage of the parallel batch importer, for special and very specific cases. Please use * Advanced usage of the parallel batch importer, for special and very specific cases. Please use
Expand All @@ -98,13 +98,14 @@ public class ParallelBatchImporter implements BatchImporter
public ParallelBatchImporter( File storeDir, FileSystemAbstraction fileSystem, Configuration config, public ParallelBatchImporter( File storeDir, FileSystemAbstraction fileSystem, Configuration config,
LogService logService, ExecutionMonitor executionMonitor, LogService logService, ExecutionMonitor executionMonitor,
AdditionalInitialIds additionalInitialIds, AdditionalInitialIds additionalInitialIds,
Config dbConfig ) Config dbConfig, RecordFormats recordFormats )
{ {
this.storeDir = storeDir; this.storeDir = storeDir;
this.fileSystem = fileSystem; this.fileSystem = fileSystem;
this.config = config; this.config = config;
this.logService = logService; this.logService = logService;
this.dbConfig = dbConfig; this.dbConfig = dbConfig;
this.recordFormats = recordFormats;
this.log = logService.getInternalLogProvider().getLog( getClass() ); this.log = logService.getInternalLogProvider().getLog( getClass() );
this.executionMonitor = executionMonitor; this.executionMonitor = executionMonitor;
this.additionalInitialIds = additionalInitialIds; this.additionalInitialIds = additionalInitialIds;
Expand All @@ -119,7 +120,8 @@ public ParallelBatchImporter( File storeDir, Configuration config, LogService lo
ExecutionMonitor executionMonitor, Config dbConfig ) ExecutionMonitor executionMonitor, Config dbConfig )
{ {
this( storeDir, new DefaultFileSystemAbstraction(), config, logService, this( storeDir, new DefaultFileSystemAbstraction(), config, logService,
withDynamicProcessorAssignment( executionMonitor, config ), EMPTY, dbConfig ); withDynamicProcessorAssignment( executionMonitor, config ), EMPTY, dbConfig,
RecordFormatSelector.selectForConfig( dbConfig, NullLogProvider.getInstance() ));
} }


@Override @Override
Expand All @@ -135,7 +137,6 @@ public void doImport( Input input ) throws IOException
boolean hasBadEntries = false; boolean hasBadEntries = false;
File badFile = new File( storeDir, Configuration.BAD_FILE_NAME ); File badFile = new File( storeDir, Configuration.BAD_FILE_NAME );
CountingStoreUpdateMonitor storeUpdateMonitor = new CountingStoreUpdateMonitor(); CountingStoreUpdateMonitor storeUpdateMonitor = new CountingStoreUpdateMonitor();
RecordFormats recordFormats = RecordFormatSelector.selectForConfig( dbConfig, NullLogProvider.getInstance() );
try ( BatchingNeoStores neoStore = new BatchingNeoStores( fileSystem, storeDir, recordFormats, config, logService, try ( BatchingNeoStores neoStore = new BatchingNeoStores( fileSystem, storeDir, recordFormats, config, logService,
additionalInitialIds, dbConfig ); additionalInitialIds, dbConfig );
CountsAccessor.Updater countsUpdater = neoStore.getCountsStore().reset( CountsAccessor.Updater countsUpdater = neoStore.getCountsStore().reset(
Expand Down
Expand Up @@ -52,6 +52,8 @@
import org.neo4j.kernel.impl.api.index.BatchingMultipleIndexPopulator; import org.neo4j.kernel.impl.api.index.BatchingMultipleIndexPopulator;
import org.neo4j.kernel.impl.api.index.MultipleIndexPopulator; import org.neo4j.kernel.impl.api.index.MultipleIndexPopulator;
import org.neo4j.kernel.impl.logging.NullLogService; import org.neo4j.kernel.impl.logging.NullLogService;
import org.neo4j.kernel.impl.store.format.RecordFormatSelector;
import org.neo4j.kernel.impl.store.format.RecordFormats;
import org.neo4j.logging.NullLogProvider; import org.neo4j.logging.NullLogProvider;
import org.neo4j.test.Randoms; import org.neo4j.test.Randoms;
import org.neo4j.test.RepeatRule; import org.neo4j.test.RepeatRule;
Expand Down Expand Up @@ -298,8 +300,11 @@ private void changeRandomNode( GraphDatabaseService db, int nodeCount, Randoms r


private void createRandomData( int count ) throws IOException private void createRandomData( int count ) throws IOException
{ {
Config config = Config.empty();
RecordFormats recordFormats =
RecordFormatSelector.selectForConfig( config, NullLogProvider.getInstance() );
BatchImporter importer = new ParallelBatchImporter( directory.graphDbDir(), fs, BatchImporter importer = new ParallelBatchImporter( directory.graphDbDir(), fs,
DEFAULT, NullLogService.getInstance(), ExecutionMonitors.invisible(), EMPTY, Config.empty() ); DEFAULT, NullLogService.getInstance(), ExecutionMonitors.invisible(), EMPTY, config, recordFormats );
importer.doImport( new RandomDataInput( count ) ); importer.doImport( new RandomDataInput( count ) );
} }


Expand Down
Expand Up @@ -56,7 +56,8 @@ public class HighLimit extends BaseRecordFormats


public HighLimit() public HighLimit()
{ {
super( STORE_VERSION, 3, Capability.DENSE_NODES, Capability.SCHEMA, Capability.LUCENE_5 ); super( STORE_VERSION, 3, Capability.DENSE_NODES, Capability.RELATIONSHIP_TYPE_3BYTES, Capability.SCHEMA,
Capability.LUCENE_5 );
} }


@Override @Override
Expand Down
Expand Up @@ -19,17 +19,14 @@
*/ */
package org.neo4j.kernel.impl.store.format.highlimit; package org.neo4j.kernel.impl.store.format.highlimit;


import org.hamcrest.CoreMatchers;
import org.junit.Rule; import org.junit.Rule;
import org.junit.Test; import org.junit.Test;
import org.junit.rules.RuleChain;


import java.io.File; import java.io.File;
import java.io.IOException; import java.io.IOException;
import java.util.Set;
import java.util.stream.Collectors;
import java.util.stream.Stream;


import org.neo4j.graphdb.mockfs.EphemeralFileSystemAbstraction; import org.neo4j.io.fs.DefaultFileSystemAbstraction;
import org.neo4j.io.fs.FileSystemAbstraction; import org.neo4j.io.fs.FileSystemAbstraction;
import org.neo4j.io.pagecache.PageCache; import org.neo4j.io.pagecache.PageCache;
import org.neo4j.kernel.api.index.SchemaIndexProvider; import org.neo4j.kernel.api.index.SchemaIndexProvider;
Expand All @@ -41,57 +38,65 @@
import org.neo4j.kernel.impl.storemigration.monitoring.MigrationProgressMonitor; import org.neo4j.kernel.impl.storemigration.monitoring.MigrationProgressMonitor;
import org.neo4j.kernel.impl.storemigration.participant.StoreMigrator; import org.neo4j.kernel.impl.storemigration.participant.StoreMigrator;
import org.neo4j.test.rule.PageCacheRule; import org.neo4j.test.rule.PageCacheRule;
import org.neo4j.test.rule.TestDirectory;
import org.neo4j.test.rule.fs.DefaultFileSystemRule;


import static org.junit.Assert.assertThat; import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue; import static org.junit.Assert.assertFalse;
import static org.mockito.Mockito.mock; import static org.mockito.Mockito.mock;
import static org.neo4j.kernel.impl.store.MetaDataStore.Position.STORE_VERSION; import static org.neo4j.kernel.impl.store.MetaDataStore.Position.STORE_VERSION;


public class HighLimitStoreMigrationTest public class HighLimitStoreMigrationTest
{ {
private final PageCacheRule pageCacheRule = new PageCacheRule();
private final TestDirectory testDirectory = TestDirectory.testDirectory();
private final DefaultFileSystemRule fileSystemRule = new DefaultFileSystemRule();
@Rule @Rule
public final PageCacheRule pageCacheRule = new PageCacheRule(); public RuleChain chain = RuleChain.outerRule( pageCacheRule )
private final FileSystemAbstraction fileSystem = new EphemeralFileSystemAbstraction(); .around( fileSystemRule )
.around( testDirectory );


@Test @Test
public void haveSameFormatCapabilitiesAsHighLimit3_0() public void haveDifferentFormatCapabilitiesAsHighLimit3_0()
{ {
assertTrue( HighLimit.RECORD_FORMATS.hasSameCapabilities( HighLimitV3_0_0.RECORD_FORMATS, CapabilityType.FORMAT ) ); assertFalse( HighLimit.RECORD_FORMATS.hasSameCapabilities( HighLimitV3_0_0.RECORD_FORMATS, CapabilityType.FORMAT ) );
} }


@Test @Test
public void doNotMigrateHighLimit3_0StoreFiles() throws IOException public void migrateHighLimit3_0StoreFiles() throws IOException
{ {
DefaultFileSystemAbstraction fileSystem = fileSystemRule.get();
PageCache pageCache = pageCacheRule.getPageCache( fileSystem ); PageCache pageCache = pageCacheRule.getPageCache( fileSystem );
SchemaIndexProvider schemaIndexProvider = mock( SchemaIndexProvider.class ); SchemaIndexProvider schemaIndexProvider = mock( SchemaIndexProvider.class );

StoreMigrator migrator = new StoreMigrator( fileSystem, pageCache, Config.empty(), NullLogService.getInstance(), StoreMigrator migrator = new StoreMigrator( fileSystem, pageCache, Config.empty(), NullLogService.getInstance(),
schemaIndexProvider ); schemaIndexProvider );


File storeDir = new File( "storeDir" ); File storeDir = new File( testDirectory.graphDbDir(), "storeDir" );
File migrationDir = new File( "migrationDir" ); File migrationDir = new File( testDirectory.graphDbDir(), "migrationDir" );
fileSystem.mkdir( migrationDir ); fileSystem.mkdir( migrationDir );
fileSystem.mkdir( storeDir );


prepareNeoStoreFile( storeDir, HighLimitV3_0_0.STORE_VERSION, pageCache ); prepareNeoStoreFile( fileSystem, storeDir, HighLimitV3_0_0.STORE_VERSION, pageCache );


MigrationProgressMonitor.Section progressMonitor = mock( MigrationProgressMonitor.Section.class ); MigrationProgressMonitor.Section progressMonitor = mock( MigrationProgressMonitor.Section.class );

migrator.migrate( storeDir, migrationDir, progressMonitor, HighLimitV3_0_0.STORE_VERSION, HighLimit.STORE_VERSION ); migrator.migrate( storeDir, migrationDir, progressMonitor, HighLimitV3_0_0.STORE_VERSION, HighLimit.STORE_VERSION );


File[] migrationFiles = fileSystem.listFiles( migrationDir ); int newStoreFilesCount = fileSystem.listFiles( migrationDir ).length;
Set<String> fileNames = Stream.of( migrationFiles ).map( File::getName ).collect( Collectors.toSet() ); assertEquals( "Store should be migrated and new store files should be created.", 17, newStoreFilesCount );
assertThat( "Only specified files should be created after migration attempt from 3.0 to 3.1 using high limit " +
"format. Since formats are compatible and migration is not required.", fileNames,
CoreMatchers.hasItems( "lastxinformation", "lastxlogposition" ) );
} }


private File prepareNeoStoreFile( File storeDir, String storeVersion, PageCache pageCache ) throws IOException private File prepareNeoStoreFile( FileSystemAbstraction fileSystem, File storeDir, String storeVersion,
PageCache pageCache ) throws IOException
{ {
File neoStoreFile = createNeoStoreFile( storeDir ); File neoStoreFile = createNeoStoreFile( fileSystem, storeDir );
long value = MetaDataStore.versionStringToLong( storeVersion ); long value = MetaDataStore.versionStringToLong( storeVersion );
MetaDataStore.setRecord( pageCache, neoStoreFile, STORE_VERSION, value ); MetaDataStore.setRecord( pageCache, neoStoreFile, STORE_VERSION, value );
return neoStoreFile; return neoStoreFile;
} }


private File createNeoStoreFile( File storeDir ) throws IOException private File createNeoStoreFile( FileSystemAbstraction fileSystem, File storeDir ) throws IOException
{ {
fileSystem.mkdir( storeDir ); fileSystem.mkdir( storeDir );
File neoStoreFile = new File( storeDir, MetaDataStore.DEFAULT_NAME ); File neoStoreFile = new File( storeDir, MetaDataStore.DEFAULT_NAME );
Expand Down
Expand Up @@ -19,6 +19,7 @@
*/ */
package batchimport; package batchimport;


import org.neo4j.kernel.impl.store.format.RecordFormats;
import org.neo4j.kernel.impl.store.format.highlimit.HighLimit; import org.neo4j.kernel.impl.store.format.highlimit.HighLimit;
import org.neo4j.unsafe.impl.batchimport.ParallelBatchImporter; import org.neo4j.unsafe.impl.batchimport.ParallelBatchImporter;
import org.neo4j.unsafe.impl.batchimport.cache.idmapping.IdGenerator; import org.neo4j.unsafe.impl.batchimport.cache.idmapping.IdGenerator;
Expand All @@ -36,8 +37,8 @@ public ParallelBatchImporterTest( InputIdGenerator inputIdGenerator, IdMapper id
} }


@Override @Override
public String getFormatName() public RecordFormats getFormat()
{ {
return HighLimit.NAME; return HighLimit.RECORD_FORMATS;
} }
} }

0 comments on commit f9933b1

Please sign in to comment.