Skip to content

Commit

Permalink
More space-efficient data structures for detecting duplicates in IdMa…
Browse files Browse the repository at this point in the history
…pper

Big import data sets may have a large amount of collisions (accidental or
actual duplicates). Detecting duplicate input ids within the same group
was previously done using a combination of maps, although that could
quickly run out of heap memory.

This commit introduces another way of doing this detection. Basically it
works by copying the subset of collisions into a new cache (NumberArray so
an live off-heap) with its own tracker cache associated with it. This pair
of arrays will be sorted with ParallelSort just like the whole data set
was sorted just previously. Given the now sorted tracker cache over the
collisions and the kept input ids for these collisions, all potential
duplicates are next to each other and can be compared in isolation.

co-author: @alexaverbuch
  • Loading branch information
tinwelint committed Apr 16, 2015
1 parent 6329420 commit 92f8f18
Show file tree
Hide file tree
Showing 7 changed files with 368 additions and 106 deletions.
Expand Up @@ -91,7 +91,7 @@ public static IdMapper actual()
*/
public static IdMapper strings( NumberArrayFactory cacheFactory )
{
return new EncodingIdMapper( cacheFactory, new StringEncoder(), new Radix.String(), NO_MONITOR );
return new EncodingIdMapper( cacheFactory, new StringEncoder(), Radix.STRING, NO_MONITOR );
}

/**
Expand All @@ -103,6 +103,6 @@ public static IdMapper strings( NumberArrayFactory cacheFactory )
*/
public static IdMapper longs( NumberArrayFactory cacheFactory )
{
return new EncodingIdMapper( cacheFactory, new LongEncoder(), new Radix.Long(), NO_MONITOR );
return new EncodingIdMapper( cacheFactory, new LongEncoder(), Radix.LONG, NO_MONITOR );
}
}

Large diffs are not rendered by default.

Expand Up @@ -19,6 +19,7 @@
*/
package org.neo4j.unsafe.impl.batchimport.cache.idmapping.string;

import org.neo4j.function.Factory;
import org.neo4j.register.Register.IntRegister;

import static java.lang.Math.pow;
Expand All @@ -31,6 +32,24 @@
*/
public abstract class Radix
{
public static final Factory<Radix> LONG = new Factory<Radix>()
{
@Override
public Radix newInstance()
{
return new Radix.Long();
}
};

public static final Factory<Radix> STRING = new Factory<Radix>()
{
@Override
public Radix newInstance()
{
return new Radix.String();
}
};

protected final int[] radixIndexCount = new int[(int) pow( 2, RadixCalculator.RADIX_BITS - 1 )];

public int registerRadixOf( long value )
Expand Down
@@ -0,0 +1,62 @@
/**
* Copyright (c) 2002-2015 "Neo Technology,"
* Network Engine for Objects in Lund AB [http://neotechnology.com]
*
* This file is part of Neo4j.
*
* Neo4j is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
package org.neo4j.unsafe.impl.batchimport.cache.idmapping.string;

import java.util.List;

import org.neo4j.csv.reader.SourceTraceability;

/**
* Encodes source id (effectively and id refering to a file name or similar,
* i.e {@link SourceTraceability#sourceDescription()}, group id and line number.
*/
class SourceInformation implements Cloneable
{
static final long LINE_NUMBER_MASK = 0xFFFF_FFFFFFFFL;

int sourceId;
long lineNumber;

SourceInformation decode( long sourceInformation )
{
sourceId = (int) ((sourceInformation & ~LINE_NUMBER_MASK) >>> 48); // >>> we don't want the sign to matter
lineNumber = (sourceInformation & LINE_NUMBER_MASK);
return this;
}

static long encodeSourceInformation( int sourceId, long lineNumber )
{
if ( (sourceId & 0xFFFF0000) != 0 )
{
throw new IllegalArgumentException( "Collisions in too many sources (currently at " + sourceId + ")" );
}
if ( (lineNumber & ~LINE_NUMBER_MASK) != 0 )
{
throw new IllegalArgumentException( "Collision in source with too many lines (" + lineNumber + ")" );
}

return ((long)sourceId << 48) | lineNumber;
}

public String describe( List<String> sourceDescriptions )
{
return sourceDescriptions.get( sourceId ) + ":" + lineNumber;
}
}
Expand Up @@ -88,5 +88,5 @@ public boolean equals( Object obj )
}
}

Group GLOBAL = new Adapter( 0, "global group" );
Group GLOBAL = new Adapter( 0, "global id space" );
}
Expand Up @@ -215,7 +215,7 @@ public void shouldEncodeSmallSetOfRandomData() throws Throwable
public void shouldRefuseCollisionsBasedOnSameInputId() throws Exception
{
// GIVEN
IdMapper mapper = new EncodingIdMapper( NumberArrayFactory.HEAP, new StringEncoder(), new Radix.String(),
IdMapper mapper = new EncodingIdMapper( NumberArrayFactory.HEAP, new StringEncoder(), Radix.STRING,
NO_MONITOR );
InputIterable<Object> ids = wrap( "source", Arrays.<Object>asList( "10", "9", "10" ) );
Group group = new Group.Adapter( GLOBAL.id(), "global" );
Expand Down Expand Up @@ -244,7 +244,7 @@ public void shouldRefuseCollisionsBasedOnSameInputId() throws Exception
public void shouldIncludeSourceLocationsOfCollisions() throws Exception
{
// GIVEN
IdMapper mapper = new EncodingIdMapper( NumberArrayFactory.HEAP, new StringEncoder(), new Radix.String(),
IdMapper mapper = new EncodingIdMapper( NumberArrayFactory.HEAP, new StringEncoder(), Radix.STRING,
NO_MONITOR );
final List<Object> idList = Arrays.<Object>asList( "10", "9", "10" );
InputIterable<Object> ids = wrap( "source", idList );
Expand Down Expand Up @@ -278,7 +278,7 @@ public void shouldCopeWithCollisionsBasedOnDifferentInputIds() throws Exception
Monitor monitor = mock( Monitor.class );
Encoder encoder = mock( Encoder.class );
when( encoder.encode( any() ) ).thenReturn( 12345L );
IdMapper mapper = new EncodingIdMapper( NumberArrayFactory.HEAP, encoder, new Radix.String(), monitor );
IdMapper mapper = new EncodingIdMapper( NumberArrayFactory.HEAP, encoder, Radix.STRING, monitor );
InputIterable<Object> ids = wrap( "source", Arrays.<Object>asList( "10", "9" ) );
try ( ResourceIterator<Object> iterator = ids.iterator() )
{
Expand Down Expand Up @@ -320,7 +320,7 @@ public void shouldCopeWithMixedActualAndAccidentalCollisions() throws Exception
when( encoder.encode( a2 ) ).thenReturn( 1L );
when( encoder.encode( e ) ).thenReturn( 2L );
when( encoder.encode( f ) ).thenReturn( 1L );
IdMapper mapper = new EncodingIdMapper( NumberArrayFactory.HEAP, encoder, new Radix.String(), monitor );
IdMapper mapper = new EncodingIdMapper( NumberArrayFactory.HEAP, encoder, Radix.STRING, monitor );
InputIterable<Object> ids = wrap( "source", Arrays.<Object>asList( "a", "b", "c", "a", "e", "f" ) );
Group.Adapter groupA = new Group.Adapter( 1, "A" );
Group.Adapter groupB = new Group.Adapter( 2, "B" );
Expand Down Expand Up @@ -358,7 +358,7 @@ public void shouldBeAbleToHaveDuplicateInputIdButInDifferentGroups() throws Exce
{
// GIVEN
Monitor monitor = mock( Monitor.class );
IdMapper mapper = new EncodingIdMapper( NumberArrayFactory.HEAP, new StringEncoder(), new Radix.String(),
IdMapper mapper = new EncodingIdMapper( NumberArrayFactory.HEAP, new StringEncoder(), Radix.STRING,
monitor );
InputIterable<Object> ids = wrap( "source", Arrays.<Object>asList( "10", "9", "10" ) );
Groups groups = new Groups();
Expand All @@ -385,7 +385,7 @@ public void shouldBeAbleToHaveDuplicateInputIdButInDifferentGroups() throws Exce
public void shouldOnlyFindInputIdsInSpecificGroup() throws Exception
{
// GIVEN
IdMapper mapper = new EncodingIdMapper( NumberArrayFactory.HEAP, new StringEncoder(), new Radix.String(),
IdMapper mapper = new EncodingIdMapper( NumberArrayFactory.HEAP, new StringEncoder(), Radix.STRING,
NO_MONITOR );
InputIterable<Object> ids = wrap( "source", Arrays.<Object>asList( "8", "9", "10" ) );
Groups groups = new Groups();
Expand Down Expand Up @@ -417,7 +417,7 @@ public void shouldOnlyFindInputIdsInSpecificGroup() throws Exception
public void shouldHandleManyGroups() throws Exception
{
// GIVEN
IdMapper mapper = new EncodingIdMapper( NumberArrayFactory.HEAP, new LongEncoder(), new Radix.String(),
IdMapper mapper = new EncodingIdMapper( NumberArrayFactory.HEAP, new LongEncoder(), Radix.STRING,
NO_MONITOR );
int size = 100;

Expand Down Expand Up @@ -498,9 +498,9 @@ Encoder encoder()
}

@Override
Radix radix()
Factory<Radix> radix()
{
return new Radix.Long();
return Radix.LONG;
}

@Override
Expand All @@ -525,9 +525,9 @@ Encoder encoder()
}

@Override
Radix radix()
Factory<Radix> radix()
{
return new Radix.String();
return Radix.STRING;
}

@Override
Expand All @@ -554,9 +554,9 @@ Encoder encoder()
}

@Override
Radix radix()
Factory<Radix> radix()
{
return new Radix.String();
return Radix.STRING;
}

@Override
Expand Down Expand Up @@ -610,7 +610,7 @@ private char randomLetter( Random random )

abstract Encoder encoder();

abstract Radix radix();
abstract Factory<Radix> radix();

abstract Factory<Object> data( Random random );
}
Expand Down
@@ -0,0 +1,56 @@
/**
* Copyright (c) 2002-2015 "Neo Technology,"
* Network Engine for Objects in Lund AB [http://neotechnology.com]
*
* This file is part of Neo4j.
*
* Neo4j is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
package org.neo4j.unsafe.impl.batchimport.cache.idmapping.string;

import org.junit.Test;

import java.util.Random;
import java.util.concurrent.ThreadLocalRandom;

import static org.junit.Assert.assertEquals;

import static java.lang.Math.abs;
import static java.lang.String.format;

public class SourceInformationTest
{
@Test
public void shouldEncodeAndDecodeInformation() throws Exception
{
// GIVEN
SourceInformation codec = new SourceInformation();
Random random = ThreadLocalRandom.current();

// WHEN/THEN
for ( int i = 0; i < 100; i++ )
{
int sourceId = random.nextInt( 0xFFFF + 1 );
long lineNumber = abs( random.nextLong() ) & SourceInformation.LINE_NUMBER_MASK;

long encoded = SourceInformation.encodeSourceInformation( sourceId, lineNumber );
codec.decode( encoded );

String hint = format( "sourceId:%d, lineNumber:%d --> %d --> sourceId:%d, lineNumber:%d",
sourceId, lineNumber, encoded, codec.sourceId, codec.lineNumber );
assertEquals( hint, sourceId, codec.sourceId );
assertEquals( hint, lineNumber, codec.lineNumber );
}
}
}

0 comments on commit 92f8f18

Please sign in to comment.