Skip to content

Commit

Permalink
Parallel type checking in calculate dense node stage
Browse files Browse the repository at this point in the history
so that it doesn't become a bottleneck
  • Loading branch information
tinwelint committed Aug 28, 2016
1 parent 199e4a7 commit db3af12
Showing 1 changed file with 38 additions and 16 deletions.
Expand Up @@ -22,12 +22,10 @@
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Arrays; import java.util.Arrays;
import java.util.Comparator; import java.util.Comparator;
import java.util.HashMap;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicLong;

import org.neo4j.kernel.impl.store.record.RelationshipRecord; import org.neo4j.kernel.impl.store.record.RelationshipRecord;
import org.neo4j.unsafe.impl.batchimport.input.InputRelationship; import org.neo4j.unsafe.impl.batchimport.input.InputRelationship;
import org.neo4j.unsafe.impl.batchimport.staging.BatchSender; import org.neo4j.unsafe.impl.batchimport.staging.BatchSender;
Expand All @@ -42,13 +40,13 @@
*/ */
public class RelationshipTypeCheckerStep extends ProcessorStep<Batch<InputRelationship,RelationshipRecord>> public class RelationshipTypeCheckerStep extends ProcessorStep<Batch<InputRelationship,RelationshipRecord>>
{ {
private static final Comparator<Map.Entry<Object,AtomicLong>> SORT_BY_COUNT_DESC = private static final Comparator<Map.Entry<Object,MutableLong>> SORT_BY_COUNT_DESC =
(e1,e2) -> Long.compare( e2.getValue().get(), e1.getValue().get() ); (e1,e2) -> Long.compare( e2.getValue().value, e1.getValue().value );
private static final Comparator<Map.Entry<Object,AtomicLong>> SORT_BY_ID_DESC = private static final Comparator<Map.Entry<Object,MutableLong>> SORT_BY_ID_DESC =
(e1,e2) -> Integer.compare( (Integer)e2.getKey(), (Integer)e1.getKey() ); (e1,e2) -> Integer.compare( (Integer)e2.getKey(), (Integer)e1.getKey() );
private final ConcurrentMap<Object,AtomicLong> allTypes = new ConcurrentHashMap<>(); private final Map<Thread,Map<Object,MutableLong>> typeCheckers = new ConcurrentHashMap<>();
private final BatchingRelationshipTypeTokenRepository typeTokenRepository; private final BatchingRelationshipTypeTokenRepository typeTokenRepository;
private Map.Entry<Object,AtomicLong>[] sortedTypes; private Map.Entry<Object,MutableLong>[] sortedTypes;


public RelationshipTypeCheckerStep( StageControl control, Configuration config, public RelationshipTypeCheckerStep( StageControl control, Configuration config,
BatchingRelationshipTypeTokenRepository typeTokenRepository ) BatchingRelationshipTypeTokenRepository typeTokenRepository )
Expand All @@ -60,16 +58,21 @@ public RelationshipTypeCheckerStep( StageControl control, Configuration config,
@Override @Override
protected void process( Batch<InputRelationship,RelationshipRecord> batch, BatchSender sender ) throws Throwable protected void process( Batch<InputRelationship,RelationshipRecord> batch, BatchSender sender ) throws Throwable
{ {
Map<Object,MutableLong> types = typeCheckers.get( Thread.currentThread() );
if ( types == null )
{
typeCheckers.put( Thread.currentThread(), types = new HashMap<>() );
}

for ( InputRelationship relationship : batch.input ) for ( InputRelationship relationship : batch.input )
{ {
Object type = relationship.typeAsObject(); Object type = relationship.typeAsObject();
AtomicLong count = allTypes.get( type ); MutableLong count = types.get( type );
if ( count == null ) if ( count == null )
{ {
AtomicLong existing = allTypes.putIfAbsent( type, count = new AtomicLong() ); types.put( type, count = new MutableLong() );
count = existing != null ? existing : count;
} }
count.incrementAndGet(); count.value++;
} }
sender.send( batch ); sender.send( batch );
} }
Expand All @@ -78,10 +81,24 @@ protected void process( Batch<InputRelationship,RelationshipRecord> batch, Batch
@Override @Override
protected void done() protected void done()
{ {
sortedTypes = allTypes.entrySet().toArray( new Map.Entry[allTypes.size()] ); Map<Object,MutableLong> mergedTypes = new HashMap<>();
for ( Map<Object,MutableLong> localTypes : typeCheckers.values() )
{
for ( Map.Entry<Object,MutableLong> localType : localTypes.entrySet() )
{
MutableLong count = mergedTypes.get( localType.getKey() );
if ( count == null )
{
mergedTypes.put( localType.getKey(), count = new MutableLong() );
}
count.value += localType.getValue().value;
}
}

sortedTypes = mergedTypes.entrySet().toArray( new Map.Entry[mergedTypes.size()] );
if ( sortedTypes.length > 0 ) if ( sortedTypes.length > 0 )
{ {
Comparator<Map.Entry<Object,AtomicLong>> comparator = sortedTypes[0].getKey() instanceof Integer ? Comparator<Map.Entry<Object,MutableLong>> comparator = sortedTypes[0].getKey() instanceof Integer ?
SORT_BY_ID_DESC : SORT_BY_COUNT_DESC; SORT_BY_ID_DESC : SORT_BY_COUNT_DESC;
Arrays.sort( sortedTypes, comparator ); Arrays.sort( sortedTypes, comparator );
} }
Expand Down Expand Up @@ -116,14 +133,19 @@ protected void done()
public Object[] getRelationshipTypes( long belowOrEqualToThreshold ) public Object[] getRelationshipTypes( long belowOrEqualToThreshold )
{ {
List<Object> result = new ArrayList<>(); List<Object> result = new ArrayList<>();
for ( Map.Entry<Object,AtomicLong> candidate : sortedTypes ) for ( Map.Entry<Object,MutableLong> candidate : sortedTypes )
{ {
if ( candidate.getValue().get() <= belowOrEqualToThreshold ) if ( candidate.getValue().value <= belowOrEqualToThreshold )
{ {
result.add( candidate.getKey() ); result.add( candidate.getKey() );
} }
} }


return result.toArray(); return result.toArray();
} }

private static class MutableLong
{
private long value;
}
} }

0 comments on commit db3af12

Please sign in to comment.