From e8c9efcf381f02f7f9099067c3383192908d645a Mon Sep 17 00:00:00 2001 From: kobmic Date: Tue, 17 Oct 2017 13:49:36 +0200 Subject: [PATCH 1/8] post release version bump --- build.sbt | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/build.sbt b/build.sbt index f9f9d48..267776b 100644 --- a/build.sbt +++ b/build.sbt @@ -1,5 +1,5 @@ name := "dcm4che-streams" -version := "0.5" +version := "0.6-SNAPSHOT" organization := "se.nimsa" scalaVersion := "2.12.3" scalacOptions := Seq("-encoding", "UTF-8", "-Xlint", "-deprecation", "-unchecked", "-feature", "-target:jvm-1.8") From 9e03dd5f9a5c1aca00524645e1a114242a4cf2fa Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Karl=20Sj=C3=B6strand?= Date: Fri, 20 Oct 2017 10:25:10 -0400 Subject: [PATCH 2/8] Corrected FMI group length flow now supports big endian and implicit VR --- .../scala/se/nimsa/dcm4che/streams/DicomFlows.scala | 11 +++++++---- 1 file changed, 7 insertions(+), 4 deletions(-) diff --git a/src/main/scala/se/nimsa/dcm4che/streams/DicomFlows.scala b/src/main/scala/se/nimsa/dcm4che/streams/DicomFlows.scala index 5791cb3..b596f69 100644 --- a/src/main/scala/se/nimsa/dcm4che/streams/DicomFlows.scala +++ b/src/main/scala/se/nimsa/dcm4che/streams/DicomFlows.scala @@ -23,6 +23,7 @@ import akka.stream.scaladsl.{Flow, Source} import akka.util.ByteString import org.dcm4che3.data.{Tag, VR} import org.dcm4che3.io.DicomStreamException +import se.nimsa.dcm4che.streams.DicomParsing.{dicomInfo, Info} import se.nimsa.dcm4che.streams.DicomParts._ /** @@ -444,11 +445,13 @@ object DicomFlows { { case fmiAttributes: DicomAttributes => if (fmiAttributes.attributes.nonEmpty) { - val fmiAttributesNoLength = fmiAttributes.attributes - .filter(_.header.tag != Tag.FileMetaInformationGroupLength) + val info = dicomInfo(fmiAttributes.attributes.head.header.bytes).getOrElse(Info(bigEndian = false, explicitVR = true, hasFmi = true)) + val fmiAttributesNoLength = fmiAttributes.attributes.filter(_.header.tag != Tag.FileMetaInformationGroupLength) val length = fmiAttributesNoLength.map(_.bytes.length).sum - val lengthHeader = DicomHeader(Tag.FileMetaInformationGroupLength, VR.OB, 4, isFmi = true, bigEndian = false, explicitVR = true, ByteString(2, 0, 0, 0, 85, 76, 4, 0)) - val lengthChunk = DicomValueChunk(bigEndian = false, intToBytesLE(length), last = true) + val lengthBytes = tagToBytes(Tag.FileMetaInformationGroupLength, info.bigEndian) ++ + (if (info.explicitVR) ByteString("UL") ++ shortToBytes(4, info.bigEndian) else intToBytes(4, info.bigEndian)) + val lengthHeader = DicomHeader(Tag.FileMetaInformationGroupLength, VR.UL, 4, isFmi = true, info.bigEndian, info.explicitVR, lengthBytes) + val lengthChunk = DicomValueChunk(info.bigEndian, intToBytes(length, info.bigEndian), last = true) val fmiParts = fmiAttributesNoLength.toList.flatMap(attribute => attribute.header :: attribute.valueChunks.toList) fmi = lengthHeader :: lengthChunk :: fmiParts } From c4829709a2f8c8f102e56ea54125d998ef931774 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Karl=20Sj=C3=B6strand?= Date: Sun, 5 Nov 2017 18:17:57 -0500 Subject: [PATCH 3/8] Added option to drain upstream when dicom validation fails. May be beneficial when upstream is a http entity received from a client. If bytes are not drained/consumed, the connnection will be abruptly closed which may hurt performance --- .../se/nimsa/dcm4che/streams/DicomFlows.scala | 6 +- .../dcm4che/streams/DicomValidateFlow.scala | 137 ++++++++++-------- .../streams/DicomValidateFlowsTest.scala | 87 ++++++++--- 3 files changed, 148 insertions(+), 82 deletions(-) diff --git a/src/main/scala/se/nimsa/dcm4che/streams/DicomFlows.scala b/src/main/scala/se/nimsa/dcm4che/streams/DicomFlows.scala index b596f69..a20b7f0 100644 --- a/src/main/scala/se/nimsa/dcm4che/streams/DicomFlows.scala +++ b/src/main/scala/se/nimsa/dcm4che/streams/DicomFlows.scala @@ -180,14 +180,14 @@ object DicomFlows { * attribute found */ val validateFlow: Flow[ByteString, ByteString, NotUsed] = - Flow[ByteString].via(new DicomValidateFlow(None)) + Flow[ByteString].via(new DicomValidateFlow(None, drainIncoming = false)) /** * A flow which passes on the input bytes unchanged, fails for non-DICOM files, validates for DICOM files with supported * Media Storage SOP Class UID, Transfer Syntax UID combination passed as context */ - def validateFlowWithContext(contexts: Seq[ValidationContext]): Flow[ByteString, ByteString, NotUsed] = - Flow[ByteString].via(new DicomValidateFlow(Some(contexts))) + def validateFlowWithContext(contexts: Seq[ValidationContext], drainIncoming: Boolean): Flow[ByteString, ByteString, NotUsed] = + Flow[ByteString].via(new DicomValidateFlow(Some(contexts), drainIncoming)) /** * A flow which deflates the dataset but leaves the meta information intact. Useful when the dicom parsing in `DicomParseFlow` diff --git a/src/main/scala/se/nimsa/dcm4che/streams/DicomValidateFlow.scala b/src/main/scala/se/nimsa/dcm4che/streams/DicomValidateFlow.scala index b4a076c..cf228e2 100644 --- a/src/main/scala/se/nimsa/dcm4che/streams/DicomValidateFlow.scala +++ b/src/main/scala/se/nimsa/dcm4che/streams/DicomValidateFlow.scala @@ -27,17 +27,20 @@ import org.dcm4che3.data.Tag._ /** * A flow which passes on the input bytes unchanged, fails for non-DICOM files, validates for DICOM files with supported * Media Storage SOP Class UID, Transfer Syntax UID combination passed as context. + * * @param contexts supported MediaStorageSOPClassUID, TransferSynatxUID combinations */ -class DicomValidateFlow(contexts: Option[Seq[ValidationContext]]) extends GraphStage[FlowShape[ByteString, ByteString]] { +class DicomValidateFlow(contexts: Option[Seq[ValidationContext]], drainIncoming: Boolean) extends GraphStage[FlowShape[ByteString, ByteString]] { val in: Inlet[ByteString] = Inlet[ByteString]("DicomValidateFlow.in") val out: Outlet[ByteString] = Outlet[ByteString]("DicomValidateFlow.out") override val shape: FlowShape[ByteString, ByteString] = FlowShape.of(in, out) override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = new GraphStageLogic(shape) { var buffer: ByteString = ByteString.empty - var isValidated = false - // 378 bytes should be enough when passing context - Media Storage SOP Class UID and Transfer Syntax UID + var isValidated: Option[Boolean] = None + var failException: Option[Throwable] = None + + // 378 bytes should be enough when passing context - Media Storage SOP Class UID and Transfer Syntax UID // otherwise read to first header val maxBufferLength: Int = if (contexts.isDefined) 512 else 140 @@ -49,60 +52,65 @@ class DicomValidateFlow(contexts: Option[Seq[ValidationContext]]) extends GraphS override def onPush(): Unit = { val chunk = grab(in) - if (isValidated) - push(out, chunk) - else { - buffer = buffer ++ chunk - - if (buffer.length >= maxBufferLength) { - if (isPreamble(buffer)) { - if (DicomParsing.isHeader(buffer.drop(dicomPreambleLength))) { + isValidated match { + case None => + buffer = buffer ++ chunk + + if (buffer.length >= maxBufferLength) + if (isPreamble(buffer)) + if (DicomParsing.isHeader(buffer.drop(dicomPreambleLength))) + if (contexts.isDefined) { + val info = DicomParsing.dicomInfo(buffer.drop(dicomPreambleLength)).get + validateFileMetaInformation(buffer.drop(dicomPreambleLength), info) + } else + setValidated() + else + setFailed(new DicomStreamException("Not a DICOM stream")) + else if (DicomParsing.isHeader(buffer)) if (contexts.isDefined) { - val info = DicomParsing.dicomInfo(buffer.drop(dicomPreambleLength)).get - validateFileMetaInformation(buffer.drop(dicomPreambleLength), info) - } else { + val info = DicomParsing.dicomInfo(buffer).get + validateSOPClassUID(buffer, info) + } else setValidated() - } - } else { - setFailed() - } - } else if (DicomParsing.isHeader(buffer)) { - if (contexts.isDefined) { - val info = DicomParsing.dicomInfo(buffer).get - validateSOPClassUID(buffer, info) - } else { - setValidated() - } - } else { - setFailed() - } - } else { + else + setFailed(new DicomStreamException("Not a DICOM stream")) + else + pull(in) + case Some(true) => + push(out, chunk) + case Some(false) => + println("draining...") pull(in) - } } } override def onUpstreamFinish(): Unit = { - if (!isValidated) - if (contexts.isDefined) { - if (buffer.length >= dicomPreambleLength && isPreamble(buffer)) { - val info = DicomParsing.dicomInfo(buffer.drop(dicomPreambleLength)).get - validateFileMetaInformation(buffer.drop(dicomPreambleLength), info) - } else if (buffer.length >= 8 && DicomParsing.isHeader(buffer)) { - val info = DicomParsing.dicomInfo(buffer).get - validateSOPClassUID(buffer, info) + isValidated match { + case None => + if (contexts.isDefined) { + if (buffer.length >= dicomPreambleLength && isPreamble(buffer)) { + val info = DicomParsing.dicomInfo(buffer.drop(dicomPreambleLength)).get + validateFileMetaInformation(buffer.drop(dicomPreambleLength), info) + } else if (buffer.length >= 8 && DicomParsing.isHeader(buffer)) { + val info = DicomParsing.dicomInfo(buffer).get + validateSOPClassUID(buffer, info) + } else { + setFailed(new DicomStreamException("Not a DICOM stream")) + } } else { - setFailed() + if (buffer.length == dicomPreambleLength && isPreamble(buffer)) + setValidated() + else if (buffer.length >= 8 && DicomParsing.isHeader(buffer)) + setValidated() + else + setFailed(new DicomStreamException("Not a DICOM stream")) } - } else { - if (buffer.length == dicomPreambleLength && isPreamble(buffer)) - setValidated() - else if (buffer.length >= 8 && DicomParsing.isHeader(buffer)) - setValidated() - else - setFailed() - } - completeStage() + completeStage() + case Some(true) => + completeStage() + case Some(false) => + failStage(failException.get) + } } @@ -124,7 +132,7 @@ class DicomValidateFlow(contexts: Option[Seq[ValidationContext]]) extends GraphS if (contexts.get.contains(currentContext)) { setValidated() } else { - failStage(new DicomStreamException(s"The presentation context [SOPClassUID = ${mscu.value.utf8String}, TransferSyntaxUID = ${tsuid.value.utf8String}] is not supported")) + setFailed(new DicomStreamException(s"The presentation context [SOPClassUID = ${mscu.value.utf8String}, TransferSyntaxUID = ${tsuid.value.utf8String}] is not supported")) } } } @@ -147,24 +155,26 @@ class DicomValidateFlow(contexts: Option[Seq[ValidationContext]]) extends GraphS if (contexts.get.contains(currentContext)) { setValidated() } else { - failStage(new DicomStreamException(s"The presentation context [SOPClassUID = ${scuid.value.utf8String}, TransferSyntaxUID = $tsuid] is not supported")) + setFailed(new DicomStreamException(s"The presentation context [SOPClassUID = ${scuid.value.utf8String}, TransferSyntaxUID = $tsuid] is not supported")) } } } /** * Utility method. Search after DICOM Header in byte stream. - * @param data bytes stream - * @param info info object obtained earlier - * @param fieldName dicom field name, used for error log - * @param found field found condition, typically comparison of dicom tags + * + * @param data bytes stream + * @param info info object obtained earlier + * @param fieldName dicom field name, used for error log + * @param found field found condition, typically comparison of dicom tags * @param stopSearching stop condition * @return */ - def findAndValidateField(data: ByteString, info: Info, fieldName: String, found: (Int) => Boolean, stopSearching: (Int) => Boolean ): (Boolean, ByteString) = { + def findAndValidateField(data: ByteString, info: Info, fieldName: String, found: (Int) => Boolean, stopSearching: (Int) => Boolean): (Boolean, ByteString) = { var currentTag = -1 var failed = false var currentData = data + def takeMax8(buffer: ByteString) = if (buffer.size >= 8) buffer.take(8) else buffer while (!found(currentTag) && !failed) { @@ -173,7 +183,7 @@ class DicomValidateFlow(contexts: Option[Seq[ValidationContext]]) extends GraphS val (tag, _, headerLength, length) = maybeHeader.get if (tag < currentTag) { failed = true - failStage(new DicomStreamException(s"Parse error. Invalid tag order or invalid DICOM header: ${takeMax8(currentData)}.")) + setFailed(new DicomStreamException(s"Parse error. Invalid tag order or invalid DICOM header: ${takeMax8(currentData)}.")) } else { currentTag = tag if (!found(tag)) { @@ -182,24 +192,31 @@ class DicomValidateFlow(contexts: Option[Seq[ValidationContext]]) extends GraphS if (stopSearching(currentTag) || currentData.size < 8) { // read past stop criteria without finding tag, or not enough data left in buffer failed = true - failStage(new DicomStreamException(s"Not a valid DICOM file. Could not find $fieldName.")) + setFailed(new DicomStreamException(s"Not a valid DICOM file. Could not find $fieldName.")) } } } else { // could not parse header failed = true - failStage(new DicomStreamException(s"Parse error. Invalid DICOM header: ${takeMax8(currentData)}.")) + setFailed(new DicomStreamException(s"Parse error. Invalid DICOM header: ${takeMax8(currentData)}.")) } } (failed, currentData) } def setValidated(): Unit = { - isValidated = true + isValidated = Some(true) push(out, buffer) } - def setFailed(): Unit = failStage(new DicomStreamException("Not a DICOM stream")) + def setFailed(e: Throwable): Unit = { + isValidated = Some(false) + failException = Some(e) + if (drainIncoming) + pull(in) + else + failStage(failException.get) + } }) } diff --git a/src/test/scala/se/nimsa/dcm4che/streams/DicomValidateFlowsTest.scala b/src/test/scala/se/nimsa/dcm4che/streams/DicomValidateFlowsTest.scala index 01d587a..dc6b4e4 100644 --- a/src/test/scala/se/nimsa/dcm4che/streams/DicomValidateFlowsTest.scala +++ b/src/test/scala/se/nimsa/dcm4che/streams/DicomValidateFlowsTest.scala @@ -2,14 +2,15 @@ package se.nimsa.dcm4che.streams import akka.actor.ActorSystem import akka.stream.ActorMaterializer -import akka.stream.scaladsl.Source +import akka.stream.scaladsl.{Sink, Source} import akka.stream.testkit.scaladsl.TestSink import akka.testkit.TestKit import akka.util.ByteString import org.dcm4che3.data.UID._ import org.scalatest.{BeforeAndAfterAll, FlatSpecLike, Matchers} -import scala.concurrent.ExecutionContextExecutor +import scala.concurrent.{Await, ExecutionContextExecutor} +import scala.concurrent.duration.DurationInt class DicomValidateFlowsTest extends TestKit(ActorSystem("DicomValidateFlowsSpec")) with FlatSpecLike with Matchers with BeforeAndAfterAll { @@ -100,7 +101,7 @@ class DicomValidateFlowsTest extends TestKit(ActorSystem("DicomValidateFlowsSpec val source = Source.single(bytes) .via(new Chunker(1)) - .via(validateFlowWithContext(contexts)) + .via(validateFlowWithContext(contexts, drainIncoming = false)) source.runWith(TestSink.probe[ByteString]) .request(1) @@ -125,7 +126,7 @@ class DicomValidateFlowsTest extends TestKit(ActorSystem("DicomValidateFlowsSpec // test with more than 512 bytes var source = Source.single(moreThan512Bytes) .via(new Chunker(1)) - .via(validateFlowWithContext(contexts)) + .via(validateFlowWithContext(contexts, drainIncoming = false)) source.runWith(TestSink.probe[ByteString]) .request(1) @@ -136,7 +137,7 @@ class DicomValidateFlowsTest extends TestKit(ActorSystem("DicomValidateFlowsSpec // test with less than 512 bytes source = Source.single(bytes) .via(new Chunker(1)) - .via(validateFlowWithContext(contexts)) + .via(validateFlowWithContext(contexts, drainIncoming = false)) source.runWith(TestSink.probe[ByteString]) .request(1) @@ -161,7 +162,7 @@ class DicomValidateFlowsTest extends TestKit(ActorSystem("DicomValidateFlowsSpec // test with more than 512 bytes var source = Source.single(moreThan512Bytes) .via(new Chunker(1)) - .via(validateFlowWithContext(contexts)) + .via(validateFlowWithContext(contexts, drainIncoming = false)) source.runWith(TestSink.probe[ByteString]) .request(1) @@ -170,7 +171,7 @@ class DicomValidateFlowsTest extends TestKit(ActorSystem("DicomValidateFlowsSpec // test with less than 512 bytes source = Source.single(bytes) .via(new Chunker(1)) - .via(validateFlowWithContext(contexts)) + .via(validateFlowWithContext(contexts, drainIncoming = false)) source.runWith(TestSink.probe[ByteString]) .request(1) @@ -192,7 +193,7 @@ class DicomValidateFlowsTest extends TestKit(ActorSystem("DicomValidateFlowsSpec // test with more than 512 bytes var source = Source.single(moreThan512Bytes) .via(new Chunker(1)) - .via(validateFlowWithContext(contexts)) + .via(validateFlowWithContext(contexts, drainIncoming = false)) source.runWith(TestSink.probe[ByteString]) .request(1) @@ -203,7 +204,7 @@ class DicomValidateFlowsTest extends TestKit(ActorSystem("DicomValidateFlowsSpec // test with less than 512 bytes source = Source.single(bytes) .via(new Chunker(1)) - .via(validateFlowWithContext(contexts)) + .via(validateFlowWithContext(contexts, drainIncoming = false)) source.runWith(TestSink.probe[ByteString]) .request(1) @@ -224,7 +225,7 @@ class DicomValidateFlowsTest extends TestKit(ActorSystem("DicomValidateFlowsSpec // test with more than 512 bytes var source = Source.single(moreThan512Bytes) .via(new Chunker(1)) - .via(validateFlowWithContext(contexts)) + .via(validateFlowWithContext(contexts, drainIncoming = false)) source.runWith(TestSink.probe[ByteString]) .request(1) @@ -235,7 +236,7 @@ class DicomValidateFlowsTest extends TestKit(ActorSystem("DicomValidateFlowsSpec // test with less than 512 bytes source = Source.single(bytes) .via(new Chunker(1)) - .via(validateFlowWithContext(contexts)) + .via(validateFlowWithContext(contexts, drainIncoming = false)) source.runWith(TestSink.probe[ByteString]) .request(1) @@ -263,7 +264,7 @@ class DicomValidateFlowsTest extends TestKit(ActorSystem("DicomValidateFlowsSpec // test with more than 512 bytes var source = Source.single(moreThan512Bytes) - .via(validateFlowWithContext(contexts)) + .via(validateFlowWithContext(contexts, drainIncoming = false)) source.runWith(TestSink.probe[ByteString]) .request(1) @@ -271,7 +272,7 @@ class DicomValidateFlowsTest extends TestKit(ActorSystem("DicomValidateFlowsSpec // test with less than 512 bytes source = Source.single(bytes) - .via(validateFlowWithContext(contexts)) + .via(validateFlowWithContext(contexts, drainIncoming = false)) source.runWith(TestSink.probe[ByteString]) .request(1) @@ -287,7 +288,7 @@ class DicomValidateFlowsTest extends TestKit(ActorSystem("DicomValidateFlowsSpec // test with more than 512 bytes var source = Source.single(moreThan512Bytes) - .via(validateFlowWithContext(contexts)) + .via(validateFlowWithContext(contexts, drainIncoming = false)) source.runWith(TestSink.probe[ByteString]) .request(1) @@ -295,7 +296,7 @@ class DicomValidateFlowsTest extends TestKit(ActorSystem("DicomValidateFlowsSpec // test with less than 512 bytes source = Source.single(bytes) - .via(validateFlowWithContext(contexts)) + .via(validateFlowWithContext(contexts, drainIncoming = false)) source.runWith(TestSink.probe[ByteString]) .request(1) @@ -312,7 +313,7 @@ class DicomValidateFlowsTest extends TestKit(ActorSystem("DicomValidateFlowsSpec // test with more than 512 bytes var source = Source.single(moreThan512Bytes) - .via(validateFlowWithContext(contexts)) + .via(validateFlowWithContext(contexts, drainIncoming = false)) source.runWith(TestSink.probe[ByteString]) .request(1) @@ -320,7 +321,7 @@ class DicomValidateFlowsTest extends TestKit(ActorSystem("DicomValidateFlowsSpec // test with less than 512 bytes source = Source.single(bytes) - .via(validateFlowWithContext(contexts)) + .via(validateFlowWithContext(contexts, drainIncoming = false)) source.runWith(TestSink.probe[ByteString]) .request(1) @@ -337,7 +338,7 @@ class DicomValidateFlowsTest extends TestKit(ActorSystem("DicomValidateFlowsSpec // test with more than 512 bytes var source = Source.single(moreThan512Bytes) - .via(validateFlowWithContext(contexts)) + .via(validateFlowWithContext(contexts, drainIncoming = false)) source.runWith(TestSink.probe[ByteString]) .request(1) @@ -346,7 +347,7 @@ class DicomValidateFlowsTest extends TestKit(ActorSystem("DicomValidateFlowsSpec // test with less than 512 bytes source = Source.single(bytes) - .via(validateFlowWithContext(contexts)) + .via(validateFlowWithContext(contexts, drainIncoming = false)) source.runWith(TestSink.probe[ByteString]) .request(1) @@ -354,4 +355,52 @@ class DicomValidateFlowsTest extends TestKit(ActorSystem("DicomValidateFlowsSpec .expectComplete() } + it should "stop requesting data once validation has failed if asked to not drain incoming data" in { + var nItems = 0 + + val bytesSource = Source.fromIterator(() => (1 to 10000) + .map(_.toByte).iterator) + .grouped(1000) + .map(bytes => ByteString(bytes.toArray)) + .map(bs => { + nItems += 1 + bs + }) + + val f = bytesSource + .via(new DicomValidateFlow(None, drainIncoming = false)) + .runWith(Sink.ignore) + + Await.ready(f, 5.seconds) + + nItems shouldBe 1 + } + + it should "keep requesting data until finished after validation failed, but not emit more data when asked to drain incoming data" in { + var nItemsRequested = 0 + var nItemsEmitted = 0 + + val bytesSource = Source.fromIterator(() => (1 to 10000) + .map(_.toByte).iterator) + .grouped(1000) + .map(bytes => ByteString(bytes.toArray)) + .map(bs => { + nItemsRequested += 1 + bs + }) + + val f = bytesSource + .via(new DicomValidateFlow(None, drainIncoming = true)) + .map(bs => { + nItemsEmitted += 1 + bs + }) + .runWith(Sink.ignore) + + Await.ready(f, 5.seconds) + + nItemsRequested shouldBe 10 + nItemsEmitted shouldBe 0 + } + } From 0a99cae99d15be90692a9d946b88ae19551d13d8 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Karl=20Sj=C3=B6strand?= Date: Sun, 5 Nov 2017 18:41:54 -0500 Subject: [PATCH 4/8] Formatting changes --- .../dcm4che/streams/DicomValidateFlow.scala | 35 ++++++++----------- 1 file changed, 14 insertions(+), 21 deletions(-) diff --git a/src/main/scala/se/nimsa/dcm4che/streams/DicomValidateFlow.scala b/src/main/scala/se/nimsa/dcm4che/streams/DicomValidateFlow.scala index cf228e2..49811eb 100644 --- a/src/main/scala/se/nimsa/dcm4che/streams/DicomValidateFlow.scala +++ b/src/main/scala/se/nimsa/dcm4che/streams/DicomValidateFlow.scala @@ -46,9 +46,8 @@ class DicomValidateFlow(contexts: Option[Seq[ValidationContext]], drainIncoming: setHandlers(in, out, new InHandler with OutHandler { - override def onPull(): Unit = { + override def onPull(): Unit = pull(in) - } override def onPush(): Unit = { val chunk = grab(in) @@ -87,24 +86,21 @@ class DicomValidateFlow(contexts: Option[Seq[ValidationContext]], drainIncoming: override def onUpstreamFinish(): Unit = { isValidated match { case None => - if (contexts.isDefined) { + if (contexts.isDefined) if (buffer.length >= dicomPreambleLength && isPreamble(buffer)) { val info = DicomParsing.dicomInfo(buffer.drop(dicomPreambleLength)).get validateFileMetaInformation(buffer.drop(dicomPreambleLength), info) } else if (buffer.length >= 8 && DicomParsing.isHeader(buffer)) { val info = DicomParsing.dicomInfo(buffer).get validateSOPClassUID(buffer, info) - } else { - setFailed(new DicomStreamException("Not a DICOM stream")) - } - } else { - if (buffer.length == dicomPreambleLength && isPreamble(buffer)) - setValidated() - else if (buffer.length >= 8 && DicomParsing.isHeader(buffer)) - setValidated() - else + } else setFailed(new DicomStreamException("Not a DICOM stream")) - } + else if (buffer.length == dicomPreambleLength && isPreamble(buffer)) + setValidated() + else if (buffer.length >= 8 && DicomParsing.isHeader(buffer)) + setValidated() + else + setFailed(new DicomStreamException("Not a DICOM stream")) completeStage() case Some(true) => completeStage() @@ -129,11 +125,10 @@ class DicomValidateFlow(contexts: Option[Seq[ValidationContext]], drainIncoming: val tsuid = DicomParsing.parseUIDAttribute(currentData, info.explicitVR, info.bigEndian) val currentContext = ValidationContext(mscu.value.utf8String, tsuid.value.utf8String) - if (contexts.get.contains(currentContext)) { + if (contexts.get.contains(currentContext)) setValidated() - } else { + else setFailed(new DicomStreamException(s"The presentation context [SOPClassUID = ${mscu.value.utf8String}, TransferSyntaxUID = ${tsuid.value.utf8String}] is not supported")) - } } } } @@ -152,11 +147,10 @@ class DicomValidateFlow(contexts: Option[Seq[ValidationContext]], drainIncoming: val tsuid = info.assumedTransferSyntax val currentContext = ValidationContext(scuid.value.utf8String, tsuid) - if (contexts.get.contains(currentContext)) { + if (contexts.get.contains(currentContext)) setValidated() - } else { + else setFailed(new DicomStreamException(s"The presentation context [SOPClassUID = ${scuid.value.utf8String}, TransferSyntaxUID = $tsuid] is not supported")) - } } } @@ -186,9 +180,8 @@ class DicomValidateFlow(contexts: Option[Seq[ValidationContext]], drainIncoming: setFailed(new DicomStreamException(s"Parse error. Invalid tag order or invalid DICOM header: ${takeMax8(currentData)}.")) } else { currentTag = tag - if (!found(tag)) { + if (!found(tag)) currentData = currentData.drop(headerLength + length.toInt) - } if (stopSearching(currentTag) || currentData.size < 8) { // read past stop criteria without finding tag, or not enough data left in buffer failed = true From 787504672ddccbd09e101080fdd0f4603f52a340 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Karl=20Sj=C3=B6strand?= Date: Sun, 5 Nov 2017 18:50:06 -0500 Subject: [PATCH 5/8] No longer attempting to pull more data when upstream is already finished --- .../scala/se/nimsa/dcm4che/streams/DicomValidateFlow.scala | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/src/main/scala/se/nimsa/dcm4che/streams/DicomValidateFlow.scala b/src/main/scala/se/nimsa/dcm4che/streams/DicomValidateFlow.scala index 49811eb..dc4c140 100644 --- a/src/main/scala/se/nimsa/dcm4che/streams/DicomValidateFlow.scala +++ b/src/main/scala/se/nimsa/dcm4che/streams/DicomValidateFlow.scala @@ -21,7 +21,7 @@ import akka.stream.stage.{GraphStage, GraphStageLogic, InHandler, OutHandler} import akka.util.ByteString import org.dcm4che3.io.DicomStreamException import se.nimsa.dcm4che.streams.DicomFlows.ValidationContext -import se.nimsa.dcm4che.streams.DicomParsing.{Info, isPreamble, dicomPreambleLength} +import se.nimsa.dcm4che.streams.DicomParsing.{Info, dicomPreambleLength, isPreamble} import org.dcm4che3.data.Tag._ /** @@ -94,13 +94,13 @@ class DicomValidateFlow(contexts: Option[Seq[ValidationContext]], drainIncoming: val info = DicomParsing.dicomInfo(buffer).get validateSOPClassUID(buffer, info) } else - setFailed(new DicomStreamException("Not a DICOM stream")) + failStage(new DicomStreamException("Not a DICOM stream")) else if (buffer.length == dicomPreambleLength && isPreamble(buffer)) setValidated() else if (buffer.length >= 8 && DicomParsing.isHeader(buffer)) setValidated() else - setFailed(new DicomStreamException("Not a DICOM stream")) + failStage(new DicomStreamException("Not a DICOM stream")) completeStage() case Some(true) => completeStage() From 911d305dbcef6e96078cc8dc6f99de8065ffa88c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Karl=20Sj=C3=B6strand?= Date: Sun, 5 Nov 2017 19:02:35 -0500 Subject: [PATCH 6/8] Trying not to pull incoming after upstream has finished --- .../dcm4che/streams/DicomValidateFlow.scala | 25 ++++++++++++------- 1 file changed, 16 insertions(+), 9 deletions(-) diff --git a/src/main/scala/se/nimsa/dcm4che/streams/DicomValidateFlow.scala b/src/main/scala/se/nimsa/dcm4che/streams/DicomValidateFlow.scala index dc4c140..0d18dc8 100644 --- a/src/main/scala/se/nimsa/dcm4che/streams/DicomValidateFlow.scala +++ b/src/main/scala/se/nimsa/dcm4che/streams/DicomValidateFlow.scala @@ -21,7 +21,7 @@ import akka.stream.stage.{GraphStage, GraphStageLogic, InHandler, OutHandler} import akka.util.ByteString import org.dcm4che3.io.DicomStreamException import se.nimsa.dcm4che.streams.DicomFlows.ValidationContext -import se.nimsa.dcm4che.streams.DicomParsing.{Info, dicomPreambleLength, isPreamble} +import se.nimsa.dcm4che.streams.DicomParsing.{Info, isPreamble, dicomPreambleLength} import org.dcm4che3.data.Tag._ /** @@ -61,24 +61,33 @@ class DicomValidateFlow(contexts: Option[Seq[ValidationContext]], drainIncoming: if (contexts.isDefined) { val info = DicomParsing.dicomInfo(buffer.drop(dicomPreambleLength)).get validateFileMetaInformation(buffer.drop(dicomPreambleLength), info) + if (!isValidated.getOrElse(true) && drainIncoming) + pull(in) } else setValidated() - else + else { setFailed(new DicomStreamException("Not a DICOM stream")) + if (drainIncoming) + pull(in) + } else if (DicomParsing.isHeader(buffer)) if (contexts.isDefined) { val info = DicomParsing.dicomInfo(buffer).get validateSOPClassUID(buffer, info) + if (!isValidated.getOrElse(true) && drainIncoming) + pull(in) } else setValidated() - else + else { setFailed(new DicomStreamException("Not a DICOM stream")) + if (drainIncoming) + pull(in) + } else pull(in) case Some(true) => push(out, chunk) case Some(false) => - println("draining...") pull(in) } } @@ -94,13 +103,13 @@ class DicomValidateFlow(contexts: Option[Seq[ValidationContext]], drainIncoming: val info = DicomParsing.dicomInfo(buffer).get validateSOPClassUID(buffer, info) } else - failStage(new DicomStreamException("Not a DICOM stream")) + setFailed(new DicomStreamException("Not a DICOM stream")) else if (buffer.length == dicomPreambleLength && isPreamble(buffer)) setValidated() else if (buffer.length >= 8 && DicomParsing.isHeader(buffer)) setValidated() else - failStage(new DicomStreamException("Not a DICOM stream")) + setFailed(new DicomStreamException("Not a DICOM stream")) completeStage() case Some(true) => completeStage() @@ -205,9 +214,7 @@ class DicomValidateFlow(contexts: Option[Seq[ValidationContext]], drainIncoming: def setFailed(e: Throwable): Unit = { isValidated = Some(false) failException = Some(e) - if (drainIncoming) - pull(in) - else + if (!drainIncoming) failStage(failException.get) } From 95c1f758e70fb85900769efdbb493b595b3eb019 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Karl=20Sj=C3=B6strand?= Date: Sun, 5 Nov 2017 19:13:05 -0500 Subject: [PATCH 7/8] Trying not to pull incoming after upstream has finished --- .../dcm4che/streams/DicomValidateFlow.scala | 52 ++++++++----------- 1 file changed, 23 insertions(+), 29 deletions(-) diff --git a/src/main/scala/se/nimsa/dcm4che/streams/DicomValidateFlow.scala b/src/main/scala/se/nimsa/dcm4che/streams/DicomValidateFlow.scala index 0d18dc8..c27eac3 100644 --- a/src/main/scala/se/nimsa/dcm4che/streams/DicomValidateFlow.scala +++ b/src/main/scala/se/nimsa/dcm4che/streams/DicomValidateFlow.scala @@ -60,28 +60,20 @@ class DicomValidateFlow(contexts: Option[Seq[ValidationContext]], drainIncoming: if (DicomParsing.isHeader(buffer.drop(dicomPreambleLength))) if (contexts.isDefined) { val info = DicomParsing.dicomInfo(buffer.drop(dicomPreambleLength)).get - validateFileMetaInformation(buffer.drop(dicomPreambleLength), info) - if (!isValidated.getOrElse(true) && drainIncoming) - pull(in) + validateFileMetaInformation(buffer.drop(dicomPreambleLength), info, upstreamHasFinished = false) } else setValidated() else { - setFailed(new DicomStreamException("Not a DICOM stream")) - if (drainIncoming) - pull(in) + setFailed(new DicomStreamException("Not a DICOM stream"), upstreamHasFinished = false) } else if (DicomParsing.isHeader(buffer)) if (contexts.isDefined) { val info = DicomParsing.dicomInfo(buffer).get - validateSOPClassUID(buffer, info) - if (!isValidated.getOrElse(true) && drainIncoming) - pull(in) + validateSOPClassUID(buffer, info, upstreamHasFinished = false) } else setValidated() else { - setFailed(new DicomStreamException("Not a DICOM stream")) - if (drainIncoming) - pull(in) + setFailed(new DicomStreamException("Not a DICOM stream"), upstreamHasFinished = false) } else pull(in) @@ -98,18 +90,18 @@ class DicomValidateFlow(contexts: Option[Seq[ValidationContext]], drainIncoming: if (contexts.isDefined) if (buffer.length >= dicomPreambleLength && isPreamble(buffer)) { val info = DicomParsing.dicomInfo(buffer.drop(dicomPreambleLength)).get - validateFileMetaInformation(buffer.drop(dicomPreambleLength), info) + validateFileMetaInformation(buffer.drop(dicomPreambleLength), info, upstreamHasFinished = true) } else if (buffer.length >= 8 && DicomParsing.isHeader(buffer)) { val info = DicomParsing.dicomInfo(buffer).get - validateSOPClassUID(buffer, info) + validateSOPClassUID(buffer, info, upstreamHasFinished = true) } else - setFailed(new DicomStreamException("Not a DICOM stream")) + setFailed(new DicomStreamException("Not a DICOM stream"), upstreamHasFinished = true) else if (buffer.length == dicomPreambleLength && isPreamble(buffer)) setValidated() else if (buffer.length >= 8 && DicomParsing.isHeader(buffer)) setValidated() else - setFailed(new DicomStreamException("Not a DICOM stream")) + setFailed(new DicomStreamException("Not a DICOM stream"), upstreamHasFinished = true) completeStage() case Some(true) => completeStage() @@ -120,15 +112,15 @@ class DicomValidateFlow(contexts: Option[Seq[ValidationContext]], drainIncoming: // Find and validate MediaSOPClassUID and TranferSyntaxUID - private def validateFileMetaInformation(data: ByteString, info: Info): Unit = { + private def validateFileMetaInformation(data: ByteString, info: Info, upstreamHasFinished: Boolean): Unit = { var currentData = data - val (failed, tailData) = findAndValidateField(data, info, "MediaStorageSOPClassUID", (tag: Int) => tag == MediaStorageSOPClassUID, (tag: Int) => (tag & 0xFFFF0000) > 0x00020000) + val (failed, tailData) = findAndValidateField(data, info, "MediaStorageSOPClassUID", (tag: Int) => tag == MediaStorageSOPClassUID, (tag: Int) => (tag & 0xFFFF0000) > 0x00020000, upstreamHasFinished) if (!failed) { currentData = tailData val mscu = DicomParsing.parseUIDAttribute(currentData, info.explicitVR, info.bigEndian) - val (nextFailed, nextTailData) = findAndValidateField(currentData, info, "TransferSyntaxUID", (tag: Int) => tag == TransferSyntaxUID, (tag: Int) => (tag & 0xFFFF0000) > 0x00020000) + val (nextFailed, nextTailData) = findAndValidateField(currentData, info, "TransferSyntaxUID", (tag: Int) => tag == TransferSyntaxUID, (tag: Int) => (tag & 0xFFFF0000) > 0x00020000, upstreamHasFinished) if (!nextFailed) { currentData = nextTailData val tsuid = DicomParsing.parseUIDAttribute(currentData, info.explicitVR, info.bigEndian) @@ -137,16 +129,16 @@ class DicomValidateFlow(contexts: Option[Seq[ValidationContext]], drainIncoming: if (contexts.get.contains(currentContext)) setValidated() else - setFailed(new DicomStreamException(s"The presentation context [SOPClassUID = ${mscu.value.utf8String}, TransferSyntaxUID = ${tsuid.value.utf8String}] is not supported")) + setFailed(new DicomStreamException(s"The presentation context [SOPClassUID = ${mscu.value.utf8String}, TransferSyntaxUID = ${tsuid.value.utf8String}] is not supported"), upstreamHasFinished) } } } // Find and validate SOPCLassUID - private def validateSOPClassUID(data: ByteString, info: Info): Unit = { + private def validateSOPClassUID(data: ByteString, info: Info, upstreamHasFinished: Boolean): Unit = { - val (failed, tailData) = findAndValidateField(data, info, "SOPClassUID", (tag: Int) => tag == SOPClassUID, (tag: Int) => tag > SOPClassUID) + val (failed, tailData) = findAndValidateField(data, info, "SOPClassUID", (tag: Int) => tag == SOPClassUID, (tag: Int) => tag > SOPClassUID, upstreamHasFinished) if (!failed) { // SOP CLass UID @@ -159,7 +151,7 @@ class DicomValidateFlow(contexts: Option[Seq[ValidationContext]], drainIncoming: if (contexts.get.contains(currentContext)) setValidated() else - setFailed(new DicomStreamException(s"The presentation context [SOPClassUID = ${scuid.value.utf8String}, TransferSyntaxUID = $tsuid] is not supported")) + setFailed(new DicomStreamException(s"The presentation context [SOPClassUID = ${scuid.value.utf8String}, TransferSyntaxUID = $tsuid] is not supported"), upstreamHasFinished) } } @@ -173,7 +165,7 @@ class DicomValidateFlow(contexts: Option[Seq[ValidationContext]], drainIncoming: * @param stopSearching stop condition * @return */ - def findAndValidateField(data: ByteString, info: Info, fieldName: String, found: (Int) => Boolean, stopSearching: (Int) => Boolean): (Boolean, ByteString) = { + def findAndValidateField(data: ByteString, info: Info, fieldName: String, found: Int => Boolean, stopSearching: Int => Boolean, upstreamHasFinished: Boolean): (Boolean, ByteString) = { var currentTag = -1 var failed = false var currentData = data @@ -186,7 +178,7 @@ class DicomValidateFlow(contexts: Option[Seq[ValidationContext]], drainIncoming: val (tag, _, headerLength, length) = maybeHeader.get if (tag < currentTag) { failed = true - setFailed(new DicomStreamException(s"Parse error. Invalid tag order or invalid DICOM header: ${takeMax8(currentData)}.")) + setFailed(new DicomStreamException(s"Parse error. Invalid tag order or invalid DICOM header: ${takeMax8(currentData)}."), upstreamHasFinished) } else { currentTag = tag if (!found(tag)) @@ -194,13 +186,13 @@ class DicomValidateFlow(contexts: Option[Seq[ValidationContext]], drainIncoming: if (stopSearching(currentTag) || currentData.size < 8) { // read past stop criteria without finding tag, or not enough data left in buffer failed = true - setFailed(new DicomStreamException(s"Not a valid DICOM file. Could not find $fieldName.")) + setFailed(new DicomStreamException(s"Not a valid DICOM file. Could not find $fieldName."), upstreamHasFinished) } } } else { // could not parse header failed = true - setFailed(new DicomStreamException(s"Parse error. Invalid DICOM header: ${takeMax8(currentData)}.")) + setFailed(new DicomStreamException(s"Parse error. Invalid DICOM header: ${takeMax8(currentData)}."), upstreamHasFinished) } } (failed, currentData) @@ -211,10 +203,12 @@ class DicomValidateFlow(contexts: Option[Seq[ValidationContext]], drainIncoming: push(out, buffer) } - def setFailed(e: Throwable): Unit = { + def setFailed(e: Throwable, upstreamHasFinished: Boolean): Unit = { isValidated = Some(false) failException = Some(e) - if (!drainIncoming) + if (!upstreamHasFinished && drainIncoming) + pull(in) + else failStage(failException.get) } From 43fb25d2149229153d64628e6b959659f6cce50e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Karl=20Sj=C3=B6strand?= Date: Mon, 6 Nov 2017 19:58:57 -0500 Subject: [PATCH 8/8] Prepare for release 0.6 --- build.sbt | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/build.sbt b/build.sbt index 267776b..6336beb 100644 --- a/build.sbt +++ b/build.sbt @@ -1,7 +1,7 @@ name := "dcm4che-streams" -version := "0.6-SNAPSHOT" +version := "0.6" organization := "se.nimsa" -scalaVersion := "2.12.3" +scalaVersion := "2.12.4" scalacOptions := Seq("-encoding", "UTF-8", "-Xlint", "-deprecation", "-unchecked", "-feature", "-target:jvm-1.8") scalacOptions in (Compile, doc) ++= Seq( "-no-link-warnings" // Suppresses problems with Scaladoc @throws links