Skip to content

Commit

Permalink
first complete
Browse files Browse the repository at this point in the history
  • Loading branch information
osiegmar committed Aug 18, 2023
1 parent 649b1a3 commit ad15324
Show file tree
Hide file tree
Showing 8 changed files with 209 additions and 106 deletions.
35 changes: 26 additions & 9 deletions src/example/java/example/RandomAccessCsvReaderExample.java
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,16 @@
import java.nio.file.Path;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.stream.Stream;

import de.siegmar.fastcsv.reader.CommentStrategy;
import de.siegmar.fastcsv.reader.CountingStatusListener;
import de.siegmar.fastcsv.reader.CsvRow;
import de.siegmar.fastcsv.reader.RandomAccessCsvReader;
import de.siegmar.fastcsv.reader.StatusMonitor;
import de.siegmar.fastcsv.reader.StatusListener;
import de.siegmar.fastcsv.writer.CsvWriter;

@SuppressWarnings("PMD.SystemPrintln")
Expand Down Expand Up @@ -97,16 +100,30 @@ private static void multiple(final Path file) throws IOException, ExecutionExcep
private static void statusMonitor(final Path file) throws IOException, InterruptedException, ExecutionException {
System.out.printf("# Read file with a total of %,d bytes%n", Files.size(file));

try (RandomAccessCsvReader csv = RandomAccessCsvReader.builder().build(file)) {
// Indexing takes place in background – we can easily monitor the current status without blocking
final StatusMonitor statusMonitor = csv.getStatusMonitor();
final StatusListener statusListener = new CountingStatusListener();

do {
// Print current status
System.out.println(statusMonitor);
} while (!csv.awaitIndex(250, TimeUnit.MILLISECONDS));
final RandomAccessCsvReader csv = RandomAccessCsvReader.builder()
.statusListener(statusListener)
.build(file);

System.out.printf("Finished reading file with a total of %,d rows%n%n", csv.size().get());
try (csv) {
// Indexing takes place in background – we can easily monitor the current status without blocking
final ScheduledExecutorService executor = Executors.newSingleThreadScheduledExecutor();
executor.scheduleAtFixedRate(() -> System.out.println(statusListener),
0, 250, TimeUnit.MILLISECONDS);

final CompletableFuture<Integer> future = csv.size()
.whenComplete((size, err) -> {
executor.shutdown();
if (err != null) {
err.printStackTrace(System.err);
} else {
System.out.printf("Finished reading file with a total of %,d rows%n%n", size);
}
});

// Wait for the completion of the future
future.join();
}
}

Expand Down
61 changes: 44 additions & 17 deletions src/intTest/java/blackbox/reader/RandomAccessCsvReaderTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;

import org.assertj.core.api.InstanceOfAssertFactory;
Expand All @@ -31,7 +32,7 @@
import de.siegmar.fastcsv.reader.CommentStrategy;
import de.siegmar.fastcsv.reader.CsvRow;
import de.siegmar.fastcsv.reader.RandomAccessCsvReader;
import de.siegmar.fastcsv.reader.StatusMonitor;
import de.siegmar.fastcsv.reader.StatusListener;
import testutil.CsvRowAssert;

@ExtendWith(SoftAssertionsExtension.class)
Expand Down Expand Up @@ -77,7 +78,22 @@ void readerToString() throws IOException {
file, UTF_8, ',', '"', CommentStrategy.NONE, '#');
}

// TODO binary data test
// Softly does not work (IllegalAccessException: module org.assertj.core does not read module common)
@Test
void unicode() throws IOException {
final Path file = prepareTestFile("abc\nüöä\nabc");
assertThat(builder().build(file).readRow(0))
.succeedsWithin(TIMEOUT, CSV_ROW)
.fields().singleElement().isEqualTo("abc");

assertThat(builder().build(file).readRow(1))
.succeedsWithin(TIMEOUT, CSV_ROW)
.fields().singleElement().isEqualTo("üöä");

assertThat(builder().build(file).readRow(2))
.succeedsWithin(TIMEOUT, CSV_ROW)
.fields().singleElement().isEqualTo("abc");
}

private RandomAccessCsvReader build(final String data) throws IOException {
return builder().build(prepareTestFile(data));
Expand All @@ -88,13 +104,7 @@ private static RandomAccessCsvReader.RandomAccessCsvReaderBuilder builder() {
}

private Path prepareTestFile(final String s) throws IOException {
final byte[] data = s
.replaceAll("␊", "\n")
.replaceAll("␍", "\r")
.replaceAll("'", "\"")
.getBytes(UTF_8);

return prepareTestFile(data);
return prepareTestFile(s.getBytes(UTF_8));
}

private Path prepareTestFile(final byte[] data) throws IOException {
Expand Down Expand Up @@ -182,7 +192,7 @@ void nullFile() {
void negativePosition() {
assertThatThrownBy(() -> build(TEST_STRING).readRow(-1))
.isInstanceOf(IllegalArgumentException.class)
.hasMessage("Record# must be >= 0");
.hasMessage("Row# must be >= 0");
}

@Test
Expand Down Expand Up @@ -211,12 +221,29 @@ void awaitDuration() throws IOException, ExecutionException, InterruptedExceptio

@Test
void finalStatus() throws IOException, ExecutionException, InterruptedException {
try (RandomAccessCsvReader reader = build("foo␊bar")) {
final AtomicInteger rowCount = new AtomicInteger();
final AtomicInteger bytesRead = new AtomicInteger();

final StatusListener statusListener = new StatusListener() {
@Override
public void readRow() {
rowCount.incrementAndGet();
}

@Override
public void readBytes(final int bytes) {
bytesRead.addAndGet(bytes);
}
};

final RandomAccessCsvReader reader = builder()
.statusListener(statusListener)
.build(prepareTestFile("foo\nbar"));

try (reader) {
assertThat(reader.awaitIndex(1, TimeUnit.SECONDS)).isTrue();
assertThat(reader.getStatusMonitor())
.returns(2L, StatusMonitor::getRowCount)
.returns(7L, StatusMonitor::getReadBytes)
.asString().isEqualTo("Read 2 lines (7 bytes)");
assertThat(rowCount).hasValue(2);
assertThat(bytesRead).hasValue(7);
}
}

Expand Down Expand Up @@ -244,7 +271,7 @@ void oneLine() throws IOException {
// Softly does not work (IllegalAccessException: module org.assertj.core does not read module common)
@Test
void twoLines() throws IOException {
try (RandomAccessCsvReader reader = build("012,foo␊345,bar")) {
try (RandomAccessCsvReader reader = build("012,foo\n345,bar")) {

assertThat(reader.size())
.succeedsWithin(TIMEOUT)
Expand Down Expand Up @@ -286,7 +313,7 @@ void start1End2() throws IOException, ExecutionException, InterruptedException,
private List<CsvRow> readRows(final int firstRecord, final int maxRecords)
throws InterruptedException, ExecutionException, TimeoutException, IOException {

return build("1␊2␊3␊4␊5")
return build("1\n2\n3\n4\n5")
.readRows(firstRecord, maxRecords)
.get(1, TimeUnit.SECONDS)
.collect(Collectors.toList());
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
package de.siegmar.fastcsv.reader;

import java.util.concurrent.atomic.AtomicLong;

/**
* Simple implementation of {@link StatusListener} to count read rows and bytes.
*/
public class CountingStatusListener implements StatusListener {

protected volatile long totalSize;
protected final AtomicLong rowCnt = new AtomicLong();
protected final AtomicLong byteCnt = new AtomicLong();

@SuppressWarnings("checkstyle:HiddenField")
@Override
public void initialize(final long totalSize) {
this.totalSize = totalSize;
}

@Override
public void readRow() {
rowCnt.incrementAndGet();
}

@Override
public void readBytes(final int bytes) {
byteCnt.addAndGet(bytes);
}

@Override
public String toString() {
final long byteCntVal = this.byteCnt.longValue();
final double percentage = byteCntVal * 100.0 / totalSize;
return String.format("Read %,d rows and %,d of %,d bytes (%.2f %%)",
rowCnt.longValue(), byteCntVal, totalSize, percentage);
}

}
1 change: 1 addition & 0 deletions src/main/java/de/siegmar/fastcsv/reader/CsvScanner.java
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@ private void consumeCommentedRow() throws IOException {
}
}

@SuppressWarnings("PMD.AssignmentInOperand")
private boolean consumeQuotedField() throws IOException {
int d;
while ((d = buf.get()) != -1) {
Expand Down
Loading

0 comments on commit ad15324

Please sign in to comment.