Skip to content

Commit

Permalink
Added testing about allocating double record units in import
Browse files Browse the repository at this point in the history
  • Loading branch information
tinwelint committed Dec 12, 2017
1 parent 1c4c789 commit 9a369ed
Show file tree
Hide file tree
Showing 14 changed files with 262 additions and 28 deletions.
Expand Up @@ -59,12 +59,12 @@ public interface Monitor
private final PropertyStore propertyStore;
private final IoMonitor ioMonitor;
private final Monitor monitor;
private final PrepareIdSequence<RECORD> prepareIdSequence;
private final PrepareIdSequence prepareIdSequence;

EntityStoreUpdaterStep( StageControl control, Configuration config,
CommonAbstractStore<RECORD,? extends StoreHeader> entityStore,
PropertyStore propertyStore, IoMonitor ioMonitor,
Monitor monitor, PrepareIdSequence<RECORD> prepareIdSequence )
Monitor monitor, PrepareIdSequence prepareIdSequence )
{
super( control, "v", config, config.parallelRecordWrites() ? 0 : 1, ioMonitor );
this.entityStore = entityStore;
Expand Down
Expand Up @@ -45,6 +45,6 @@ public NodeFirstGroupStage( Configuration config, RecordStore<RelationshipGroupR
add( new BatchFeedStep( control(), config, allIn( groupStore, config ), groupStore.getRecordSize() ) );
add( new ReadRecordsStep<>( control(), config, true, groupStore, null ) );
add( new NodeSetFirstGroupStep( control(), config, nodeStore, cache ) );
add( new UpdateRecordsStep<>( control(), config, nodeStore, new StorePrepareIdSequence<>() ) );
add( new UpdateRecordsStep<>( control(), config, nodeStore, new StorePrepareIdSequence() ) );
}
}
Expand Up @@ -84,6 +84,6 @@ public NodeStage( Configuration config, IoMonitor writeMonitor,
add( new PropertyEncoderStep<>( control(), config, neoStore.getPropertyKeyRepository(), propertyStore ) );
add( new LabelScanStorePopulationStep( control(), config, labelScanStore ) );
add( new EntityStoreUpdaterStep<>( control(), config, nodeStore, propertyStore, writeMonitor,
storeUpdateMonitor, new StorePrepareIdSequence<>() ) );
storeUpdateMonitor, new StorePrepareIdSequence() ) );
}
}
Expand Up @@ -39,6 +39,6 @@ public RelationshipGroupStage( String topic, Configuration config,
{
super( NAME, topic, config, 0 );
add( new ReadGroupRecordsByCacheStep( control(), config, store, cache ) );
add( new UpdateRecordsStep<>( control(), config, store, new StorePrepareIdSequence<>() ) );
add( new UpdateRecordsStep<>( control(), config, store, new StorePrepareIdSequence() ) );
}
}
Expand Up @@ -51,6 +51,6 @@ public SparseNodeFirstRelationshipStage( Configuration config, NodeStore nodeSto
add( new ReadRecordsStep<>( control(), config, true, nodeStore, null ) );
add( new RecordProcessorStep<>( control(), "LINK", config,
new SparseNodeFirstRelationshipProcessor( cache ), false ) );
add( new UpdateRecordsStep<>( control(), config, nodeStore, new StorePrepareIdSequence<>() ) );
add( new UpdateRecordsStep<>( control(), config, nodeStore, new StorePrepareIdSequence() ) );
}
}
Expand Up @@ -44,11 +44,11 @@ public class UpdateRecordsStep<RECORD extends AbstractBaseRecord>
{
protected final RecordStore<RECORD> store;
private final int recordSize;
private final PrepareIdSequence<RECORD> prepareIdSequence;
private final PrepareIdSequence prepareIdSequence;
private long recordsUpdated;

public UpdateRecordsStep( StageControl control, Configuration config, RecordStore<RECORD> store,
PrepareIdSequence<RECORD> prepareIdSequence )
PrepareIdSequence prepareIdSequence )
{
super( control, "v", config, config.parallelRecordWrites() ? 0 : 1 );
this.store = store;
Expand Down
Expand Up @@ -48,6 +48,6 @@ public WriteGroupsStage( Configuration config, RelationshipGroupCache cache,
super( NAME, null, config, 0 );
add( new ReadGroupsFromCacheStep( control(), config, cache.iterator(), GROUP_ENTRY_SIZE ) );
add( new EncodeGroupsStep( control(), config, store ) );
add( new UpdateRecordsStep<>( control(), config, store, new StorePrepareIdSequence<>() ) );
add( new UpdateRecordsStep<>( control(), config, store, new StorePrepareIdSequence() ) );
}
}
Expand Up @@ -23,18 +23,17 @@
import java.util.function.LongFunction;

import org.neo4j.kernel.impl.store.CommonAbstractStore;
import org.neo4j.kernel.impl.store.RecordStore;
import org.neo4j.kernel.impl.store.id.IdSequence;
import org.neo4j.kernel.impl.store.record.AbstractBaseRecord;

/**
* Exists to allow {@link IdSequence} with specific behaviour relevant to import to be injected into
* {@link CommonAbstractStore#prepareForCommit(AbstractBaseRecord, IdSequence)}.
*/
public interface PrepareIdSequence<RECORD extends AbstractBaseRecord> extends Function<RecordStore<RECORD>,LongFunction<IdSequence>>
public interface PrepareIdSequence extends Function<IdSequence,LongFunction<IdSequence>>
{
static <RECORD extends AbstractBaseRecord> PrepareIdSequence<RECORD> of( boolean doubleUnits )
static PrepareIdSequence of( boolean doubleUnits )
{
return doubleUnits ? new SecondaryUnitPrepareIdSequence<>() : new StorePrepareIdSequence<>();
return doubleUnits ? new SecondaryUnitPrepareIdSequence() : new StorePrepareIdSequence();
}
}
Expand Up @@ -21,19 +21,17 @@

import java.util.function.LongFunction;

import org.neo4j.kernel.impl.store.RecordStore;
import org.neo4j.kernel.impl.store.id.IdRange;
import org.neo4j.kernel.impl.store.id.IdSequence;
import org.neo4j.kernel.impl.store.record.AbstractBaseRecord;

/**
* Assumes that records have been allocated such that there will be a free record, right after a given record,
* to place the secondary unit of that record.
*/
public class SecondaryUnitPrepareIdSequence<RECORD extends AbstractBaseRecord> implements PrepareIdSequence<RECORD>
public class SecondaryUnitPrepareIdSequence implements PrepareIdSequence
{
@Override
public LongFunction<IdSequence> apply( RecordStore<RECORD> store )
public LongFunction<IdSequence> apply( IdSequence idSequence )
{
return new NeighbourIdSequence();
}
Expand All @@ -56,7 +54,10 @@ public long nextId()
{
try
{
assert !returned;
if ( returned )
{
throw new IllegalStateException( "Already returned" );
}
return id + 1;
}
finally
Expand Down
Expand Up @@ -21,15 +21,13 @@

import java.util.function.LongFunction;

import org.neo4j.kernel.impl.store.RecordStore;
import org.neo4j.kernel.impl.store.id.IdSequence;
import org.neo4j.kernel.impl.store.record.AbstractBaseRecord;

public class StorePrepareIdSequence<RECORD extends AbstractBaseRecord> implements PrepareIdSequence<RECORD>
public class StorePrepareIdSequence implements PrepareIdSequence
{
@Override
public LongFunction<IdSequence> apply( RecordStore<RECORD> store )
public LongFunction<IdSequence> apply( IdSequence idSequence )
{
return id -> store;
return id -> idSequence;
}
}
@@ -0,0 +1,109 @@
/*
* Copyright (c) 2002-2017 "Neo Technology,"
* Network Engine for Objects in Lund AB [http://neotechnology.com]
*
* This file is part of Neo4j.
*
* Neo4j is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
package org.neo4j.unsafe.impl.batchimport;

import org.junit.Rule;
import org.junit.Test;

import org.neo4j.kernel.configuration.Config;
import org.neo4j.kernel.impl.store.NeoStores;
import org.neo4j.kernel.impl.store.StoreFactory;
import org.neo4j.kernel.impl.store.format.ForcedSecondaryUnitRecordFormats;
import org.neo4j.kernel.impl.store.format.RecordFormats;
import org.neo4j.kernel.impl.store.id.DefaultIdGeneratorFactory;
import org.neo4j.kernel.impl.store.record.PropertyRecord;
import org.neo4j.kernel.impl.store.record.RelationshipRecord;
import org.neo4j.logging.NullLogProvider;
import org.neo4j.test.rule.PageCacheAndDependenciesRule;
import org.neo4j.unsafe.impl.batchimport.input.InputRelationship;
import org.neo4j.unsafe.impl.batchimport.staging.SimpleStageControl;
import org.neo4j.unsafe.impl.batchimport.staging.StageControl;
import org.neo4j.unsafe.impl.batchimport.store.PrepareIdSequence;
import org.neo4j.unsafe.impl.batchimport.store.io.IoMonitor;

import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import static org.mockito.Mockito.mock;

import static org.neo4j.kernel.impl.store.StoreType.PROPERTY;
import static org.neo4j.kernel.impl.store.StoreType.PROPERTY_ARRAY;
import static org.neo4j.kernel.impl.store.StoreType.PROPERTY_STRING;
import static org.neo4j.kernel.impl.store.StoreType.RELATIONSHIP;
import static org.neo4j.kernel.impl.store.format.standard.Standard.LATEST_RECORD_FORMATS;
import static org.neo4j.unsafe.impl.batchimport.Configuration.DEFAULT;

public class EntityStoreUpdaterStepTest
{
@Rule
public final PageCacheAndDependenciesRule storage = new PageCacheAndDependenciesRule();

@Test
public void shouldAllocateDoubleRecordUnitsNextToRecord() throws Exception
{
// given
RecordFormats formats = new ForcedSecondaryUnitRecordFormats( LATEST_RECORD_FORMATS );
try ( NeoStores stores = new StoreFactory( storage.directory().absolutePath(), Config.defaults(),
new DefaultIdGeneratorFactory( storage.fileSystem() ), storage.pageCache(), storage.fileSystem(), formats,
NullLogProvider.getInstance() ).openNeoStores( true, RELATIONSHIP, PROPERTY, PROPERTY_ARRAY, PROPERTY_STRING ) )
{
StageControl control = new SimpleStageControl();
PrepareIdSequence idSequence = PrepareIdSequence.of( true );
int batchSize = 100;
stores.getRelationshipStore().setHighId( batchSize * 10 );
Batch<InputRelationship,RelationshipRecord> batch = batchOfRelationshipsWithPreallocatedSecondaryUnits( batchSize );
try ( EntityStoreUpdaterStep<RelationshipRecord,InputRelationship> step = new EntityStoreUpdaterStep<>( control,
DEFAULT, stores.getRelationshipStore(), stores.getPropertyStore(), mock( IoMonitor.class ),
mock( EntityStoreUpdaterStep.Monitor.class ), idSequence ) )
{
step.start( 0 );

// when
step.receive( 0, batch );
step.endOfUpstream();
while ( !step.isCompleted() )
{
Thread.sleep( 10 );
control.assertHealthy();
}
}

// then
for ( int i = 0; i < batchSize; i++ )
{
RelationshipRecord record = batch.records[i];
assertTrue( record.hasSecondaryUnitId() );
assertEquals( record.getId() + 1, record.getSecondaryUnitId() );
}
}
}

private Batch<InputRelationship,RelationshipRecord> batchOfRelationshipsWithPreallocatedSecondaryUnits( int batchSize )
{
Batch<InputRelationship,RelationshipRecord> batch = new Batch<>( new InputRelationship[batchSize] );
batch.records = new RelationshipRecord[batchSize];
batch.propertyRecords = new PropertyRecord[batchSize][];
for ( int i = 0; i < batchSize; i++ )
{
batch.records[i] = new RelationshipRecord( i * 2 );
batch.records[i].setInUse( true );
}
return batch;
}
}
Expand Up @@ -26,15 +26,18 @@
import org.neo4j.unsafe.impl.batchimport.input.Collector;
import org.neo4j.unsafe.impl.batchimport.input.InputRelationship;
import org.neo4j.unsafe.impl.batchimport.staging.DeadEndStep;
import org.neo4j.unsafe.impl.batchimport.staging.SimpleStageControl;
import org.neo4j.unsafe.impl.batchimport.staging.StageControl;
import org.neo4j.unsafe.impl.batchimport.store.BatchingTokenRepository.BatchingRelationshipTypeTokenRepository;

import static org.junit.Assert.assertEquals;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.neo4j.unsafe.impl.batchimport.Configuration.DEFAULT;
import static org.neo4j.unsafe.impl.batchimport.input.Collector.EMPTY;
import static org.neo4j.unsafe.impl.batchimport.input.InputEntity.NO_PROPERTIES;

public class RelationshipRecordPreparationStepTest
Expand Down Expand Up @@ -75,6 +78,49 @@ control, DEFAULT, mock( BatchingRelationshipTypeTokenRepository.class ), collect
}
}

@Test
public void shouldPreallocateDoubleRecordUnitsIfToldTo() throws Exception
{
// given
StageControl control = new SimpleStageControl();
try ( RelationshipRecordPreparationStep step = new RelationshipRecordPreparationStep(
control, DEFAULT, mock( BatchingRelationshipTypeTokenRepository.class ), EMPTY, new BatchingIdSequence(), true ) )
{
DeadEndStep end = new DeadEndStep( control );
end.start( 0 );
step.setDownstream( end );
step.start( 0 );

// when
Batch<InputRelationship,RelationshipRecord> batch = batch(
relationship( 1, 2 ),
relationship( 2, 3 ),
relationship( 3, 4 ),
relationship( 4, 5 ),
relationship( 5, 6 ) );
step.receive( 0, batch );
step.endOfUpstream();
while ( !step.isCompleted() )
{
// wait
control.assertHealthy();
Thread.sleep( 10 );
}

// then
long previousId = -1;
for ( RelationshipRecord record : batch.records )
{
long id = record.getId();
if ( previousId != -1 )
{
assertEquals( previousId + 2, id );
}
previousId = id;
}
}
}

private static Batch<InputRelationship,RelationshipRecord> batch( Data... relationships )
{
Batch<InputRelationship,RelationshipRecord> batch = new Batch<>( new InputRelationship[relationships.length] );
Expand Down
Expand Up @@ -51,7 +51,7 @@ public void ioThroughputStatDoesNotOverflow() throws Throwable

Configuration configuration = mock( Configuration.class );
StageControl stageControl = mock( StageControl.class );
UpdateRecordsStep<NodeRecord> step = new UpdateRecordsStep<>( stageControl, configuration, store, new StorePrepareIdSequence<>() );
UpdateRecordsStep<NodeRecord> step = new UpdateRecordsStep<>( stageControl, configuration, store, new StorePrepareIdSequence() );

NodeRecord record = new NodeRecord( 1 );
record.setInUse( true );
Expand All @@ -71,7 +71,7 @@ public void recordWithReservedIdIsSkipped() throws Throwable
RecordStore<NodeRecord> store = mock( NodeStore.class );
StageControl stageControl = mock( StageControl.class );
UpdateRecordsStep<NodeRecord> step = new UpdateRecordsStep<>( stageControl, Configuration.DEFAULT, store,
new StorePrepareIdSequence<>() );
new StorePrepareIdSequence() );

NodeRecord node1 = new NodeRecord( 1 );
node1.setInUse( true );
Expand All @@ -82,11 +82,11 @@ public void recordWithReservedIdIsSkipped() throws Throwable

step.process( batch, mock( BatchSender.class ) );

verify( store ).prepareForCommit( node1 );
verify( store ).prepareForCommit( node1, store );
verify( store ).updateRecord( node1 );
verify( store ).prepareForCommit( node2 );
verify( store ).prepareForCommit( node2, store );
verify( store ).updateRecord( node2 );
verify( store, never() ).prepareForCommit( nodeWithReservedId );
verify( store, never() ).prepareForCommit( nodeWithReservedId, store );
verify( store, never() ).updateRecord( nodeWithReservedId );
}
}

0 comments on commit 9a369ed

Please sign in to comment.