Skip to content

Commit

Permalink
Rewrite of how data from Input gets written into records
Browse files Browse the repository at this point in the history
Instead of going through the stages w/ the queuing and passing objects
importing is done by fully executing the importing threads in parallel.
All data structures and access patterns ihave been made to allow for
concurrent access.

Garbage was a big concern previously and so this new importing doesn't
create objects for the entities, but instead allow visitors to observe
the different fields, directly from an underlying buffer which has been
read from the input source.
  • Loading branch information
tinwelint committed Jan 5, 2018
1 parent 41cbf1b commit 4a0e402
Show file tree
Hide file tree
Showing 167 changed files with 6,464 additions and 9,109 deletions.
5 changes: 5 additions & 0 deletions community/common/src/test/java/org/neo4j/test/Randoms.java
Expand Up @@ -313,6 +313,11 @@ private char symbol()
}
}

public long nextLong()
{
return random.nextLong();
}

public long nextLong( long bound )
{
return abs( random.nextLong() ) % bound;
Expand Down
Expand Up @@ -110,7 +110,7 @@ public void evaluate() throws Throwable

private void enhanceFailureWithSeed( Throwable t )
{
Exceptions.withMessage( t, t.getMessage() + ": random seed used:" + seed );
Exceptions.withMessage( t, t.getMessage() + ": random seed used:" + seed + "L" );
}
};
}
Expand Down
Expand Up @@ -33,9 +33,14 @@ public class AutoReadingSource implements Source
private SectionedCharBuffer charBuffer;

public AutoReadingSource( CharReadable reader, int bufferSize )
{
this( reader, new SectionedCharBuffer( bufferSize ) );
}

public AutoReadingSource( CharReadable reader, SectionedCharBuffer charBuffer )
{
this.reader = reader;
this.charBuffer = new SectionedCharBuffer( bufferSize );
this.charBuffer = charBuffer;
}

@Override
Expand Down Expand Up @@ -80,11 +85,6 @@ public char[] data()
{
return charBuffer.array();
}

@Override
public void close()
{ // Nothing to close
}
};
}

Expand Down
Expand Up @@ -26,6 +26,7 @@
import org.neo4j.csv.reader.Source.Chunk;

import static java.lang.String.format;

import static org.neo4j.csv.reader.Mark.END_OF_LINE_CHARACTER;

/**
Expand Down Expand Up @@ -279,15 +280,15 @@ public <EXTRACTOR extends Extractor<?>> EXTRACTOR extract( Mark mark, EXTRACTOR
@Override
public boolean tryExtract( Mark mark, Extractor<?> extractor )
{
long from = mark.startPosition();
long to = mark.position();
return extractor.extract( buffer, (int) from, (int) (to - from), mark.isQuoted() );
int from = mark.startPosition();
int to = mark.position();
return extractor.extract( buffer, from, to - from, mark.isQuoted() );
}

private int nextChar( int skippedChars ) throws IOException
{
int ch;
if ( fillBufferIfWeHaveExhaustedIt() )
if ( bufferPos < bufferEnd || fillBuffer() )
{
ch = buffer[bufferPos];
}
Expand All @@ -308,59 +309,54 @@ private int nextChar( int skippedChars ) throws IOException
/**
* @return {@code true} if something was read, otherwise {@code false} which means that we reached EOF.
*/
private boolean fillBufferIfWeHaveExhaustedIt() throws IOException
private boolean fillBuffer() throws IOException
{
if ( bufferPos >= bufferEnd )
{
boolean first = currentChunk == null;
boolean first = currentChunk == null;

if ( !first )
if ( !first )
{
if ( bufferPos - seekStartPos >= dataCapacity )
{
currentChunk.close();
if ( bufferPos - seekStartPos >= dataCapacity )
{
throw new IllegalStateException( "Tried to read a field larger than buffer size " +
dataLength + ". A common cause of this is that a field has an unterminated " +
"quote and so will try to seek until the next quote, which ever line it may be on." +
" This should not happen if multi-line fields are disabled, given that the fields contains " +
"no new-line characters. This field started at " + sourceDescription() + ":" + lineNumber() );
}
throw new IllegalStateException( "Tried to read a field larger than buffer size " +
dataLength + ". A common cause of this is that a field has an unterminated " +
"quote and so will try to seek until the next quote, which ever line it may be on." +
" This should not happen if multi-line fields are disabled, given that the fields contains " +
"no new-line characters. This field started at " + sourceDescription() + ":" + lineNumber() );
}
}

absoluteBufferStartPosition += dataLength;
absoluteBufferStartPosition += dataLength;

// Fill the buffer with new characters
Chunk nextChunk = source.nextChunk( first ? -1 : seekStartPos );
if ( nextChunk.backPosition() == nextChunk.startPosition() + nextChunk.length() )
{
return false;
}
buffer = nextChunk.data();
dataLength = nextChunk.length();
dataCapacity = nextChunk.maxFieldSize();
bufferPos = nextChunk.startPosition();
bufferStartPos = bufferPos;
bufferEnd = bufferPos + dataLength;
int shift = seekStartPos - nextChunk.backPosition();
seekStartPos = nextChunk.backPosition();
if ( first )
{
lineStartPos = seekStartPos;
}
else
{
lineStartPos -= shift;
}
String sourceDescriptionAfterRead = nextChunk.sourceDescription();
if ( !sourceDescriptionAfterRead.equals( sourceDescription ) )
{ // We moved over to a new source, reset line number
lineNumber = 0;
sourceDescription = sourceDescriptionAfterRead;
}
currentChunk = nextChunk;
return dataLength > 0;
// Fill the buffer with new characters
Chunk nextChunk = source.nextChunk( first ? -1 : seekStartPos );
if ( nextChunk == Source.EMPTY_CHUNK )
{
return false;
}
return true;

buffer = nextChunk.data();
dataLength = nextChunk.length();
dataCapacity = nextChunk.maxFieldSize();
bufferPos = nextChunk.startPosition();
bufferEnd = bufferPos + dataLength;
int shift = seekStartPos - nextChunk.backPosition();
seekStartPos = nextChunk.backPosition();
if ( first )
{
lineStartPos = seekStartPos;
}
else
{
lineStartPos -= shift;
}
String sourceDescriptionAfterRead = nextChunk.sourceDescription();
if ( !sourceDescriptionAfterRead.equals( sourceDescription ) )
{ // We moved over to a new source, reset line number
lineNumber = 0;
sourceDescription = sourceDescriptionAfterRead;
}
currentChunk = nextChunk;
return dataLength > 0;
}

@Override
Expand All @@ -381,7 +377,6 @@ public String sourceDescription()
return sourceDescription;
}

@Override
public long lineNumber()
{
return lineNumber;
Expand All @@ -393,4 +388,9 @@ public String toString()
return format( "%s[source:%s, position:%d, line:%d]", getClass().getSimpleName(),
sourceDescription(), position(), lineNumber() );
}

public static boolean isEolChar( char c )
{
return c == EOL_CHAR || c == EOL_CHAR_2;
}
}
Expand Up @@ -60,6 +60,11 @@ public interface CharReadable extends Closeable, SourceTraceability
* Reads characters into the given array starting at {@code offset}, reading {@code length} number of characters.
*
* Similar to {@link Reader#read(char[], int, int)}
* @param into char[] to read the data into.
* @param offset offset to start reading into the char[].
* @param length number of bytes to read maxuimum.
* @return number of bytes read, or 0 if there were no bytes read and end of readable is reached.
* @throws IOException on read error.
*/
int read( char[] into, int offset, int length ) throws IOException;

Expand All @@ -84,12 +89,6 @@ public long position()
return 0;
}

@Override
public long lineNumber()
{
return 0;
}

@Override
public String sourceDescription()
{
Expand Down
@@ -0,0 +1,144 @@
/*
* Copyright (c) 2002-2017 "Neo Technology,"
* Network Engine for Objects in Lund AB [http://neotechnology.com]
*
* This file is part of Neo4j.
*
* Neo4j is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
package org.neo4j.csv.reader;

import java.io.IOException;
import java.util.Arrays;

import org.neo4j.csv.reader.Source.Chunk;

/**
* Chunks up a {@link CharReadable}.
*/
public abstract class CharReadableChunker implements Chunker
{
protected final CharReadable reader;
protected final int chunkSize;
protected volatile long position;
private char[] backBuffer; // grows on demand
private int backBufferCursor;

public CharReadableChunker( CharReadable reader, int chunkSize )
{
this.reader = reader;
this.chunkSize = chunkSize;
this.backBuffer = new char[chunkSize >> 4];
}

@Override
public ChunkImpl newChunk()
{
return new ChunkImpl( new char[chunkSize] );
}

@Override
public void close() throws IOException
{
reader.close();
}

public long position()
{
return position;
}

protected int fillFromBackBuffer( char[] into )
{
if ( backBufferCursor > 0 )
{ // Read from and reset back buffer
assert backBufferCursor < chunkSize;
System.arraycopy( backBuffer, 0, into, 0, backBufferCursor );
int result = backBufferCursor;
backBufferCursor = 0;
return result;
}
return 0;
}

protected int storeInBackBuffer( char[] data, int offset, int length )
{
System.arraycopy( data, offset, backBuffer( length ), backBufferCursor, length );
backBufferCursor += length;
return length;
}

private char[] backBuffer( int length )
{
if ( backBufferCursor + length > backBuffer.length )
{
backBuffer = Arrays.copyOf( backBuffer, backBufferCursor + length );
}
return backBuffer;
}

public static class ChunkImpl implements Chunk
{
final char[] buffer;
private int length;
private String sourceDescription;

public ChunkImpl( char[] buffer )
{
this.buffer = buffer;
}

public void initialize( int length, String sourceDescription )
{
this.length = length;
this.sourceDescription = sourceDescription;
}

@Override
public int startPosition()
{
return 0;
}

@Override
public String sourceDescription()
{
return sourceDescription;
}

@Override
public int maxFieldSize()
{
return buffer.length;
}

@Override
public int length()
{
return length;
}

@Override
public char[] data()
{
return buffer;
}

@Override
public int backPosition()
{
return 0;
}
}
}

0 comments on commit 4a0e402

Please sign in to comment.