Skip to content

Commit

Permalink
Lets CharReadable/CharSeeker able to provide source information
Browse files Browse the repository at this point in the history
such as name of file, if such would be the current data source, as well as
position and line number. Meant for human consumption, for inclusion in
error messages and similar.

This information is used by the import tool to display information on in
which file and line number f.ex. duplicate node ids are defined.
  • Loading branch information
tinwelint committed Feb 10, 2015
1 parent be56eb4 commit 9117710
Show file tree
Hide file tree
Showing 28 changed files with 619 additions and 212 deletions.
Expand Up @@ -31,7 +31,7 @@
/**
* Much like a {@link BufferedReader} for a {@link Reader}.
*/
public class BufferedCharSeeker implements CharSeeker
public class BufferedCharSeeker implements CharSeeker, SourceMonitor
{
private static final int KB = 1024, MB = KB * KB;
public static final int DEFAULT_BUFFER_SIZE = 16 * MB;
Expand All @@ -50,9 +50,11 @@ public class BufferedCharSeeker implements CharSeeker
private final CharBuffer charBuffer;

private int bufferPos;
private int bufferEnd;
private long lineStartPos;
private int seekStartPos;
private int lineNumber = 1;
private int lineNumber;
private int intermediaryNewLines;
private boolean eof;
private final char quoteChar;

Expand All @@ -73,6 +75,8 @@ public BufferedCharSeeker( CharReadable reader, int bufferSize, char quoteChar )
this.charBuffer = CharBuffer.wrap( buffer );
this.bufferPos = bufferSize;
this.quoteChar = quoteChar;
this.lineStartPos = this.bufferPos;
reader.addSourceMonitor( this );
}

@Override
Expand All @@ -91,6 +95,18 @@ public boolean seek( Mark mark, int[] untilOneOfChars ) throws IOException
int skippedChars = 0;
int quoteDepth = 0;
boolean isQuoted = false;

if ( lineStartPos == bufferPos )
{
int skippedEolChars = skipEolChars();
seekStartPos += skippedEolChars;
if ( eof )
{
return eof( mark );
}
lineNumber++;
}

while ( !eof )
{
ch = nextChar( skippedChars );
Expand All @@ -112,7 +128,7 @@ else if ( isNewLine( ch ) )
{
if ( ch == untilOneOfChars[i] )
{ // We found a delimiter, set marker and return true
mark.set( lineNumber, seekStartPos, bufferPos - endOffset - skippedChars, ch, isQuoted );
mark.set( seekStartPos, bufferPos - endOffset - skippedChars, ch, isQuoted );
return true;
}
}
Expand All @@ -138,6 +154,7 @@ else if ( isNewLine( ch ) )
}
else if ( (ch == EOL_CHAR || ch == EOL_CHAR_2) )
{ // Found a new line, just keep going
intermediaryNewLines++;
nextChar( skippedChars );
}
else if ( ch == BACK_SLASH )
Expand All @@ -158,9 +175,7 @@ else if ( ch == BACK_SLASH )
}

// We found the last value of the line or stream
skippedChars += skipEolChars();
mark.set( lineNumber, seekStartPos, bufferPos - endOffset - skippedChars, END_OF_LINE_CHARACTER, isQuoted );
lineNumber++;
mark.set( seekStartPos, bufferPos - endOffset - skippedChars, END_OF_LINE_CHARACTER, isQuoted );
lineStartPos = bufferPos;
return true;
}
Expand All @@ -187,7 +202,7 @@ private int peekChar() throws IOException

private boolean eof( Mark mark )
{
mark.set( lineNumber, -1, -1, Mark.END_OF_LINE_CHARACTER, false );
mark.set( -1, -1, Mark.END_OF_LINE_CHARACTER, false );
return false;
}

Expand Down Expand Up @@ -238,22 +253,25 @@ private int nextChar( int skippedChars ) throws IOException

private void fillBufferIfWeHaveExhaustedIt() throws IOException
{
if ( bufferPos >= buffer.length )
if ( bufferPos >= bufferEnd )
{
if ( seekStartPos == 0 )
if ( seekStartPos == 0 && bufferEnd == charBuffer.capacity() )
{
throw new IllegalStateException( "Tried to read in a value larger than buffer size " + buffer.length );
}
charBuffer.position( seekStartPos );
charBuffer.limit( bufferPos );
charBuffer.compact();
charBuffer.limit( charBuffer.capacity() );
int remaining = charBuffer.remaining();
int read = reader.read( buffer, charBuffer.position(), remaining );
if ( read < remaining )
if ( read <= 0 )
{
buffer[charBuffer.position() + max( read, 0 )] = EOF_CHAR;
}
bufferPos = charBuffer.position();
seekStartPos = 0;
bufferEnd = bufferPos + max( read, 0 );
}
}

Expand All @@ -269,10 +287,16 @@ public long position()
return reader.position();
}

@Override
public void notify( String sourceDescription )
{
lineNumber = 0;
intermediaryNewLines = 0;
}

@Override
public String toString()
{
return getClass().getSimpleName() + "[buffer:" + charBuffer +
", seekPos:" + seekStartPos + ", line:" + lineNumber + "]";
return reader + ":" + (lineNumber+intermediaryNewLines);
}
}
25 changes: 25 additions & 0 deletions community/csv/src/main/java/org/neo4j/csv/reader/CharReadable.java
Expand Up @@ -23,6 +23,8 @@
import java.io.IOException;
import java.io.Reader;
import java.nio.CharBuffer;
import java.util.ArrayList;
import java.util.List;

/**
* A {@link Readable}, but focused on {@code char[]} instead of {@link CharBuffer}, with the main reaon
Expand Down Expand Up @@ -54,4 +56,27 @@ public interface CharReadable extends Closeable
* @return a low-level byte-like position of f.ex. total number of read bytes.
*/
long position();

/**
* Add {@link SourceMonitor} to listen for when this readable potentially moves over to new data sources.
*
* @param sourceMonitor notifies about when this readable potentially moves over to new data sources.
*/
void addSourceMonitor( SourceMonitor sourceMonitor );

public static abstract class Adapter implements CharReadable
{
protected final List<SourceMonitor> monitors = new ArrayList<>();

@Override
public void addSourceMonitor( SourceMonitor sourceMonitor )
{
monitors.add( sourceMonitor );
}

@Override
public void close() throws IOException
{ // Nothing to close
}
}
}
Expand Up @@ -42,7 +42,8 @@ public class CharSeekers
* @param quotationCharacter character to interpret quotation character.
* @return a {@link CharSeeker} with optional {@link ThreadAheadReadable read-ahead} capability.
*/
public static CharSeeker charSeeker( CharReadable reader, int bufferSize, boolean readAhead, char quotationCharacter )
public static CharSeeker charSeeker( CharReadable reader, int bufferSize, boolean readAhead,
char quotationCharacter )
{
if ( readAhead )
{ // Thread that always has one buffer read ahead
Expand All @@ -60,7 +61,8 @@ public static CharSeeker charSeeker( CharReadable reader, int bufferSize, boolea
* @return {@link CharSeeker} reading and parsing data from {@code file}.
* @throws FileNotFoundException if the specified {@code file} doesn't exist.
*/
public static CharSeeker charSeeker( CharReadable reader, char quotationCharacter ) throws FileNotFoundException
public static CharSeeker charSeeker( CharReadable reader, char quotationCharacter )
throws FileNotFoundException
{
return charSeeker( reader, DEFAULT_BUFFER_SIZE, true, quotationCharacter );
}
Expand Down
11 changes: 2 additions & 9 deletions community/csv/src/main/java/org/neo4j/csv/reader/Mark.java
Expand Up @@ -30,7 +30,6 @@ public class Mark
{
public static int END_OF_LINE_CHARACTER = -1;

private int lineNumber;
private long startPosition;
private long position;
private int character;
Expand All @@ -42,9 +41,8 @@ public class Mark
* @param character use {@code -1} to denote that the matching character was an end-of-line or end-of-file
* @param quoted whether or not the original data was quoted.
*/
void set( int lineNumber, long startPosition, long position, int character, boolean quoted )
void set( long startPosition, long position, int character, boolean quoted )
{
this.lineNumber = lineNumber;
this.startPosition = startPosition;
this.position = position;
this.character = character;
Expand All @@ -62,11 +60,6 @@ public boolean isEndOfLine()
return character == -1;
}

public int lineNumber()
{
return lineNumber;
}

public boolean isQuoted()
{
return quoted;
Expand All @@ -93,6 +86,6 @@ long startPosition()
@Override
public String toString()
{
return format( "Mark[line:%d, from:%d, to:%d, qutoed:%b]", lineNumber, startPosition, position, quoted);
return format( "Mark[from:%d, to:%d, qutoed:%b]", startPosition, position, quoted);
}
}
Expand Up @@ -29,16 +29,20 @@
* Have multiple {@link CharReadable} instances look like one. The provided {@link CharReadable readables} should
* be opened lazily, in {@link Iterator#next()}, and will be closed in here, if they implement {@link Closeable}.
*/
public class MultiReadable implements CharReadable, Closeable
public class MultiReadable extends CharReadable.Adapter implements Closeable
{
private final RawIterator<CharReadable,IOException> actual;
private CharReadable current = Readables.EMPTY;
private int readFromCurrent;
private long previousReadersCollectivePosition;
private boolean lastEndedInNewLine = true;

public MultiReadable( RawIterator<CharReadable,IOException> actual )
public MultiReadable( RawIterator<CharReadable,IOException> actual ) throws IOException
{
this.actual = actual;
if ( actual.hasNext() )
{
current = actual.next();
}
}

@Override
Expand All @@ -50,21 +54,29 @@ public int read( char[] buffer, int offset, int length ) throws IOException
int readThisTime = current.read( buffer, offset + read, length - read );
if ( readThisTime == -1 )
{
if ( actual.hasNext() )
// Check if we've read anything at all before moving over to the new one.
// We do that so that we can get a "clean" move to the new source, so that
// information about progress and current source can be correctly provided by
// the caller of this method.
if ( read == 0 && actual.hasNext() )
{
previousReadersCollectivePosition += current.position();
closeCurrent();
current = actual.next();
for ( SourceMonitor monitor : monitors )
{
monitor.notify( toString() );
}

// Even if there's no line-ending at the end of this source we should introduce one
// otherwise the last line of this source and the first line of the next source will
// look like one long line.
if ( readFromCurrent > 0 )
if ( !lastEndedInNewLine )
{
buffer[offset + read++] = '\n';
previousReadersCollectivePosition++;
readFromCurrent = 0;
}
lastEndedInNewLine = false;
}
else
{
Expand All @@ -74,7 +86,7 @@ public int read( char[] buffer, int offset, int length ) throws IOException
else
{
read += readThisTime;
readFromCurrent += readThisTime;
lastEndedInNewLine = buffer[readThisTime-1] == '\n' || buffer[readThisTime-1] == '\r';
}
}
return read == 0 ? -1 : read;
Expand All @@ -96,4 +108,10 @@ public long position()
{
return previousReadersCollectivePosition + current.position();
}

@Override
public String toString()
{
return current.toString();
}
}

0 comments on commit 9117710

Please sign in to comment.