Navigation Menu

Skip to content

Commit

Permalink
Added constructor to support finite set of NMEA input
Browse files Browse the repository at this point in the history
  • Loading branch information
tbsalling committed Feb 3, 2020
1 parent 24f69fe commit c6f3b3a
Show file tree
Hide file tree
Showing 3 changed files with 79 additions and 19 deletions.
21 changes: 13 additions & 8 deletions src/main/java/dk/tbsalling/aismessages/AISInputStreamReader.java
Expand Up @@ -20,8 +20,8 @@
import dk.tbsalling.aismessages.nmea.NMEAMessageHandler;
import dk.tbsalling.aismessages.nmea.NMEAMessageInputStreamReader;

import java.io.IOException;
import java.io.InputStream;
import java.util.List;
import java.util.function.Consumer;

/**
Expand All @@ -35,22 +35,27 @@
*/
public class AISInputStreamReader {

public AISInputStreamReader(InputStream inputStream, Consumer<? super AISMessage> aisMessageConsumer) {
public AISInputStreamReader(List<String> stringQueue, Consumer<? super AISMessage> aisMessageConsumer) {
this.nmeaMessageHandler = new NMEAMessageHandler("SRC", aisMessageConsumer);
this.nmeaMessageInputStreamReader = new NMEAMessageInputStreamReader(stringQueue, this.nmeaMessageHandler::accept);
}

public AISInputStreamReader(InputStream inputStream, Consumer<? super AISMessage> aisMessageConsumer) {
this.nmeaMessageHandler = new NMEAMessageHandler("SRC", aisMessageConsumer);
this.nmeaMessageInputStreamReader = new NMEAMessageInputStreamReader(inputStream, this.nmeaMessageHandler::accept);
}
}

public final void requestStop() {
this.nmeaMessageInputStreamReader.requestStop();
}
public final void requestStop() {
this.nmeaMessageInputStreamReader.requestStop();
}

public final boolean isStopRequested() {
return this.nmeaMessageInputStreamReader.isStopRequested();
}

public void run() throws IOException {
public void run() {
this.nmeaMessageInputStreamReader.run();
}
}

private final NMEAMessageHandler nmeaMessageHandler;
private final NMEAMessageInputStreamReader nmeaMessageInputStreamReader;
Expand Down
Expand Up @@ -26,31 +26,60 @@
import java.io.InputStream;
import java.io.InputStreamReader;
import java.nio.charset.Charset;
import java.util.LinkedList;
import java.util.List;
import java.util.Objects;
import java.util.Queue;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Consumer;
import java.util.function.Supplier;

import static java.lang.System.Logger.Level.*;

public class NMEAMessageInputStreamReader {

private static final System.Logger LOG = System.getLogger(NMEAMessageInputStreamReader.class.getName());

public NMEAMessageInputStreamReader(List<String> nmeaStrings, Consumer<? super NMEAMessage> nmeaMessageHandler) {
Objects.requireNonNull(nmeaStrings, "nmeaStrings cannot be null.");
Objects.requireNonNull(nmeaMessageHandler, "nmeaMessageHandler cannot be null.");

if (nmeaStrings instanceof Queue)
this.stringSupplier = () -> ((Queue<String>) nmeaStrings).poll();
else {
final Queue<String> nmeaStringsQueue = new LinkedList<>(nmeaStrings);
this.stringSupplier = () -> nmeaStringsQueue.poll();
}

this.nmeaMessageHandler = nmeaMessageHandler;
}

public NMEAMessageInputStreamReader(InputStream inputStream, Consumer<? super NMEAMessage> nmeaMessageHandler) {
this.nmeaMessageHandler = nmeaMessageHandler;
this.inputStream = inputStream;

InputStreamReader reader = new InputStreamReader(inputStream, Charset.defaultCharset());
BufferedReader bufferedReader = new BufferedReader(reader);
this.stringSupplier = () -> {
try {
return bufferedReader.readLine();
} catch (IOException e) {
throw new RuntimeException(e.getMessage(), e);
}
};
}

public final void requestStop() {
this.stopRequested.set(true);
}

public void run() throws IOException {
LOG.log(INFO, "NMEAMessageInputStreamReader running.");
public void run() {
LOG.log(INFO, "NMEAMessageInputStreamReader running.");

InputStreamReader reader = new InputStreamReader(inputStream, Charset.defaultCharset());
BufferedReader bufferedReader = new BufferedReader(reader);
String string;
while ((string = bufferedReader.readLine()) != null && !isStopRequested()) {
while ((string = stringSupplier.get()) != null) {
if (isStopRequested())
break;

try {
NMEAMessage nmea = NMEAMessage.fromString(string);
nmeaMessageHandler.accept(nmea);
Expand All @@ -72,6 +101,6 @@ public final Boolean isStopRequested() {
}

private final AtomicBoolean stopRequested = new AtomicBoolean(false);
private final InputStream inputStream;
private final Supplier<String> stringSupplier;
private final Consumer<? super NMEAMessage> nmeaMessageHandler;
}
Expand Up @@ -8,6 +8,9 @@
import java.io.IOException;
import java.io.InputStream;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.LinkedList;
import java.util.List;
import java.util.function.Consumer;

public class NMEAMessageInputStreamReaderTest {
Expand All @@ -20,14 +23,37 @@ public static void setUp() {
}

@Test
public void catchesInvalidMessageExceptions() throws IOException {
public void catchesInvalidMessageExceptionsInInputStream() throws IOException {
String nmeaStream =
"!AIVDM,1,1,,B,402=481uaUcf;OQ55JS9ITi025Jp,0*2B\n" +
"!AIVDM,1,1,,B,58LAM242B9POUKWWW<0a>0<4E<58,0*6E\n" + // invalid
"!AIVDM,1,1,,A,33nr7t001f13KNTOahh2@QpF00vh,0*58\n";
"!AIVDM,1,1,,B,402=481uaUcf;OQ55JS9ITi025Jp,0*2B\n" +
"!AIVDM,1,1,,B,58LAM242B9POUKWWW<0a>0<4E<58,0*6E\n" + // invalid
"!AIVDM,1,1,,A,33nr7t001f13KNTOahh2@QpF00vh,0*58\n";

InputStream inputStream = new ByteArrayInputStream(nmeaStream.getBytes(StandardCharsets.UTF_8));

new NMEAMessageInputStreamReader(inputStream, nmeaMessageHandler).run();
}

@Test
public void catchesInvalidMessageExceptionsInList() throws IOException {
List<String> nmeaQueue = new ArrayList<>(List.of(
"!AIVDM,1,1,,B,402=481uaUcf;OQ55JS9ITi025Jp,0*2B",
"!AIVDM,1,1,,B,58LAM242B9POUKWWW<0a>0<4E<58,0*6E", // invalid
"!AIVDM,1,1,,A,33nr7t001f13KNTOahh2@QpF00vh,0*58"
));

new NMEAMessageInputStreamReader(nmeaQueue, nmeaMessageHandler).run();
}

@Test
public void catchesInvalidMessageExceptionsInQueue() throws IOException {
List<String> nmeaQueue = new LinkedList<>(List.of(
"!AIVDM,1,1,,B,402=481uaUcf;OQ55JS9ITi025Jp,0*2B",
"!AIVDM,1,1,,B,58LAM242B9POUKWWW<0a>0<4E<58,0*6E", // invalid
"!AIVDM,1,1,,A,33nr7t001f13KNTOahh2@QpF00vh,0*58"
));

new NMEAMessageInputStreamReader(nmeaQueue, nmeaMessageHandler).run();
}

}

0 comments on commit c6f3b3a

Please sign in to comment.