From 3e6b3c5d904f2b351f6f17fc41c59c2a52a3b8a4 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Karl=20Sj=C3=B6strand?= Date: Sat, 13 May 2017 21:50:30 -0400 Subject: [PATCH] Added a flow for collecting a number of attributes --- .../se/nimsa/dcm4che/streams/DicomFlows.scala | 201 +++++++++++++----- .../se/nimsa/dcm4che/streams/DicomParts.scala | 18 +- .../streams/DicomAttributesSinkTest.scala | 38 +++- .../se/nimsa/dcm4che/streams/DicomData.scala | 6 + .../dcm4che/streams/DicomFlowsTest.scala | 89 +++++++- 5 files changed, 281 insertions(+), 71 deletions(-) diff --git a/src/main/scala/se/nimsa/dcm4che/streams/DicomFlows.scala b/src/main/scala/se/nimsa/dcm4che/streams/DicomFlows.scala index 63ff06f..74ae9a2 100644 --- a/src/main/scala/se/nimsa/dcm4che/streams/DicomFlows.scala +++ b/src/main/scala/se/nimsa/dcm4che/streams/DicomFlows.scala @@ -21,6 +21,7 @@ import java.util.zip.Deflater import akka.NotUsed import akka.stream.scaladsl.{Flow, Source} import akka.util.ByteString +import org.dcm4che3.io.DicomStreamException import se.nimsa.dcm4che.streams.DicomParts._ /** @@ -96,18 +97,21 @@ object DicomFlows { * Filter a stream of dicom parts such that all attributes that are group length elements except * file meta information group length, will be discarded. Group Length (gggg,0000) Standard Data Elements * have been retired in the standard. + * * @return the associated filter Flow */ def groupLengthDiscardFilter = blacklistFilter((tag: Int) => DicomParsing.isGroupLength(tag) && !DicomParsing.isFileMetaInformation(tag)) /** * Discards the file meta information. + * * @return the associated filter Flow */ def fmiDiscardFilter = blacklistFilter((tag: Int) => DicomParsing.isFileMetaInformation(tag), keepPreamble = false) /** * Blacklist filter for DICOM parts. + * * @param tagCondition blacklist tag condition * @param keepPreamble true if preamble should be kept, else false * @return Flow of filtered parts @@ -116,11 +120,12 @@ object DicomFlows { /** * Tag based whitelist filter for DICOM parts. + * * @param tagCondition whitelist condition * @param keepPreamble true if preamble should be kept, else false * @return Flow of filtered parts */ - def whitelistFilter(tagCondition: (Int) => Boolean, keepPreamble: Boolean = false): Flow[DicomPart, DicomPart, NotUsed] = tagFilter(tagCondition, isWhitelist = true, keepPreamble) + def whitelistFilter(tagCondition: (Int) => Boolean, keepPreamble: Boolean = false): Flow[DicomPart, DicomPart, NotUsed] = tagFilter(tagCondition, isWhitelist = true, keepPreamble) private def tagFilter(tagCondition: (Int) => Boolean, isWhitelist: Boolean, keepPreamble: Boolean) = Flow[DicomPart].statefulMapConcat { @@ -166,9 +171,9 @@ object DicomFlows { case _: DicomFragmentsDelimitation if discarding => Nil case _: DicomSequence if discarding => Nil - case _: DicomSequenceDelimitation if discarding => Nil + case _: DicomSequenceDelimitation if discarding => Nil - case dicomAttribute: DicomAttribute => + case dicomAttribute: DicomAttribute => discarding = shouldDiscard(dicomAttribute.header.tag, isWhitelist) if (discarding) { Nil @@ -241,72 +246,154 @@ object DicomFlows { * * @return */ - def deflateDatasetFlow() = { + def deflateDatasetFlow() = Flow[DicomPart] + .concat(Source.single(DicomEndMarker)) + .statefulMapConcat { + () => + var inFmi = false + val buffer = new Array[Byte](2048) + val deflater = new Deflater(-1, true) + + def deflate(dicomPart: DicomPart) = { + val input = dicomPart.bytes + deflater.setInput(input.toArray) + var output = ByteString.empty + while (!deflater.needsInput) { + val bytesDeflated = deflater.deflate(buffer) + output = output ++ ByteString(buffer.take(bytesDeflated)) + } + if (output.isEmpty) Nil else DicomDeflatedChunk(dicomPart.bigEndian, output) :: Nil + } - case object DicomEndMarker extends DicomPart { - def bigEndian = false - def bytes = ByteString.empty + def finishDeflating() = { + deflater.finish() + var output = ByteString.empty + var done = false + while (!done) { + val bytesDeflated = deflater.deflate(buffer) + if (bytesDeflated == 0) + done = true + else + output = output ++ ByteString(buffer.take(bytesDeflated)) + } + deflater.end() + if (output.isEmpty) Nil else DicomDeflatedChunk(bigEndian = false, output) :: Nil + } + + { + case preamble: DicomPreamble => // preamble, do not deflate + preamble :: Nil + case DicomEndMarker => // end of stream, make sure deflater writes final bytes if deflating has occurred + if (deflater.getBytesRead > 0) + finishDeflating() + else + Nil + case deflatedChunk: DicomDeflatedChunk => // already deflated, pass as-is + deflatedChunk :: Nil + case header: DicomHeader if header.isFmi => // FMI, do not deflate and remember we are in FMI + inFmi = true + header :: Nil + case attribute: DicomAttribute if attribute.header.isFmi => // Whole FMI attribute, same as above + inFmi = true + attribute :: Nil + case header: DicomHeader => // Dataset header, remember we are no longer in FMI, deflate + inFmi = false + deflate(header) + case dicomPart if inFmi => // For any dicom part within FMI, do not deflate + dicomPart :: Nil + case dicomPart => // For all other cases, deflate + deflate(dicomPart) + } } + /** + * Collect the attributes specified by the input set of tags while buffering all elements of the stream. When the + * stream has moved past the last attribute to collect, a DicomAttributesElement is emitted containing a list of + * DicomAttributeParts with the collected information, followed by all buffered elements. Remaining elements in the + * stream are immediately emitted downstream without buffering. + * + * This flow is used when there is a need to "look ahead" for certain information in the stream so that streamed + * elements can be processed correctly according to this information. As an example, an implementation may have + * different graph paths for different modalities and the modality must be known before any elements are processed. + * + * @param tags tag numbers of attributes to collect. Collection (and hence buffering) will end when the + * stream moves past the highest tag number + * @param maxBufferSize the maximum allowed size of the buffer (to avoid running out of memory). The flow will fail + * if this limit is exceed. Set to 0 for an unlimited buffer size + * @return A DicomPart Flow which will begin with a DicomAttributesPart followed by the input elements + */ + def collectAttributesFlow(tags: Set[Int], maxBufferSize: Int = 1000000): Flow[DicomPart, DicomPart, NotUsed] = Flow[DicomPart] .concat(Source.single(DicomEndMarker)) .statefulMapConcat { + val stopTag = if (tags.isEmpty) 0 else tags.max + () => - var inFmi = false - val buffer = new Array[Byte](2048) - val deflater = new Deflater(-1, true) - - def deflate(dicomPart: DicomPart) = { - val input = dicomPart.bytes - deflater.setInput(input.toArray) - var output = ByteString.empty - while (!deflater.needsInput) { - val bytesDeflated = deflater.deflate(buffer) - output = output ++ ByteString(buffer.take(bytesDeflated)) - } - if (output.isEmpty) Nil else DicomDeflatedChunk(dicomPart.bigEndian, output) :: Nil - } + var reachedEnd = false + var currentBufferSize = 0 + var currentAttribute: Option[DicomAttribute] = None + var buffer: List[DicomPart] = Nil + var attributes = Seq.empty[DicomAttribute] - def finishDeflating() = { - deflater.finish() - var output = ByteString.empty - var done = false - while (!done) { - val bytesDeflated = deflater.deflate(buffer) - if (bytesDeflated == 0) - done = true - else - output = output ++ ByteString(buffer.take(bytesDeflated)) - } - deflater.end() - if (output.isEmpty) Nil else DicomDeflatedChunk(bigEndian = false, output) :: Nil + def attributesAndBuffer() = { + val parts = DicomAttributes(attributes) :: buffer + + reachedEnd = true + buffer = Nil + currentBufferSize = 0 + + parts } { - case preamble: DicomPreamble => // preamble, do not deflate - preamble :: Nil - case DicomEndMarker => // end of stream, make sure deflater writes final bytes if deflating has occurred - if (deflater.getBytesRead > 0) - finishDeflating() - else - Nil - case deflatedChunk: DicomDeflatedChunk => // already deflated, pass as-is - deflatedChunk :: Nil - case header: DicomHeader if header.isFmi => // FMI, do not deflate and remember we are in FMI - inFmi = true - header :: Nil - case attribute: DicomAttribute if attribute.header.isFmi => // Whole FMI attribute, same as above - inFmi = true - attribute :: Nil - case header: DicomHeader => // Dataset header, remember we are no longer in FMI, deflate - inFmi = false - deflate(header) - case dicomPart if inFmi => // For any dicom part within FMI, do not deflate - dicomPart :: Nil - case dicomPart => // For all other cases, deflate - deflate(dicomPart) + case DicomEndMarker if reachedEnd => + Nil + + case DicomEndMarker => + attributesAndBuffer() + + case part if reachedEnd => + part :: Nil + + case part => + currentBufferSize = currentBufferSize + part.bytes.size + if (maxBufferSize > 0 && currentBufferSize > maxBufferSize) { + throw new DicomStreamException("Error collecting attributes: max buffer size exceeded") + } + + buffer = buffer :+ part + + part match { + case header: DicomHeader if tags.contains(header.tag) => + currentAttribute = Some(DicomAttribute(header, Seq.empty)) + Nil + + case header: DicomHeader => + currentAttribute = None + Nil + + case valueChunk: DicomValueChunk => + + currentAttribute match { + case Some(attribute) => + val updatedAttribute = attribute.copy(valueChunks = attribute.valueChunks :+ valueChunk) + currentAttribute = Some(updatedAttribute) + if (valueChunk.last) { + attributes = attributes :+ updatedAttribute + currentAttribute = None + if (attributes.head.header.tag >= stopTag) { + attributesAndBuffer() + } else + Nil + } else + Nil + + case None => Nil + } + + case _ => Nil + } } } - } } diff --git a/src/main/scala/se/nimsa/dcm4che/streams/DicomParts.scala b/src/main/scala/se/nimsa/dcm4che/streams/DicomParts.scala index 051ed7f..5f0cb79 100644 --- a/src/main/scala/se/nimsa/dcm4che/streams/DicomParts.scala +++ b/src/main/scala/se/nimsa/dcm4che/streams/DicomParts.scala @@ -37,9 +37,9 @@ object DicomParts { def withUpdatedLength(newLength: Int) : DicomHeader = { - val updated = if ((bytes.size >= 8) && (explicitVR) && (vr.headerLength == 8)) { //explicit vr + val updated = if ((bytes.size >= 8) && explicitVR && (vr.headerLength == 8)) { //explicit vr bytes.take(6) ++ DicomParsing.shortToBytes(newLength.toShort, bigEndian) - } else if ((bytes.size >= 12) && (explicitVR) && (vr.headerLength == 12)) { //explicit vr + } else if ((bytes.size >= 12) && explicitVR && (vr.headerLength == 12)) { //explicit vr bytes.take(8) ++ DicomParsing.intToBytes(newLength, bigEndian) } else { //implicit vr bytes.take(4) ++ DicomParsing.intToBytes(newLength, bigEndian) @@ -98,17 +98,27 @@ object DicomParts { } else { ByteString.fromArray(newBytes) } - DicomAttribute(updatedHeader, Seq(DicomValueChunk(header.bigEndian, updatedValue, true))) + DicomAttribute(updatedHeader, Seq(DicomValueChunk(header.bigEndian, updatedValue, last = true))) } // DA: A string of characters of the format YYYYMMDD, 8 bytes fixed def withUpdatedDateValue(newValue: String, cs: SpecificCharacterSet = SpecificCharacterSet.ASCII): DicomAttribute = { val newBytes = header.vr.toBytes(newValue, cs) val updatedValue = ByteString.fromArray(newBytes) - DicomAttribute(header, Seq(DicomValueChunk(header.bigEndian, updatedValue, true))) + DicomAttribute(header, Seq(DicomValueChunk(header.bigEndian, updatedValue, last = true))) } def asDicomParts: Seq[DicomPart] = header +: valueChunks } + case object DicomEndMarker extends DicomPart { + def bigEndian = false + def bytes = ByteString.empty + } + + case class DicomAttributes(attributes: Seq[DicomAttribute]) extends DicomPart { + def bigEndian = attributes.headOption.exists(_.bigEndian) + def bytes = attributes.map(_.bytes).reduce(_ ++ _) + } + } diff --git a/src/test/scala/se/nimsa/dcm4che/streams/DicomAttributesSinkTest.scala b/src/test/scala/se/nimsa/dcm4che/streams/DicomAttributesSinkTest.scala index 8ec807e..0cf6e4c 100644 --- a/src/test/scala/se/nimsa/dcm4che/streams/DicomAttributesSinkTest.scala +++ b/src/test/scala/se/nimsa/dcm4che/streams/DicomAttributesSinkTest.scala @@ -1,6 +1,6 @@ package se.nimsa.dcm4che.streams -import java.io.ByteArrayInputStream +import java.io.{ByteArrayInputStream, ByteArrayOutputStream} import java.util import akka.actor.ActorSystem @@ -8,8 +8,8 @@ import akka.stream.ActorMaterializer import akka.stream.scaladsl.Source import akka.testkit.TestKit import akka.util.ByteString -import org.dcm4che3.data.{Attributes, Fragments, Tag} -import org.dcm4che3.io.{DicomInputStream, DicomStreamException} +import org.dcm4che3.data._ +import org.dcm4che3.io.{DicomInputStream, DicomOutputStream, DicomStreamException} import org.scalatest.{AsyncFlatSpecLike, Matchers} import scala.concurrent.Future @@ -30,11 +30,18 @@ class DicomAttributesSinkTest extends TestKit(ActorSystem("DicomAttributesSinkSp (fmi, dataset) } + def toBytes(attributes: Attributes, tsuid: String): ByteString = { + val baos = new ByteArrayOutputStream() + val dos = new DicomOutputStream(baos, tsuid) + dos.writeDataset(null, attributes) + dos.close() + ByteString(baos.toByteArray) + } + def toFlowAttributes(bytes: ByteString): Future[(Option[Attributes], Option[Attributes])] = Source.single(bytes) .via(new DicomPartFlow()) .via(attributeFlow) - //.via(printFlow) .runWith(attributesSink) def assertEquivalentToDcm4che(bytes: ByteString) = { @@ -56,6 +63,11 @@ class DicomAttributesSinkTest extends TestKit(ActorSystem("DicomAttributesSinkSp assertEquivalentToDcm4che(bytes) } + it should "be equivalent to dcm4che for a file with big endian encoding" in { + val bytes = patientNameJohnDoeBE + assertEquivalentToDcm4che(bytes) + } + it should "accept but not produce FMI nor dataset attributes for a file consisting of a preamble only" in { val bytes = preamble @@ -137,4 +149,22 @@ class DicomAttributesSinkTest extends TestKit(ActorSystem("DicomAttributesSinkSp val bytes = seqStart ++ itemNoLength ++ seqStart ++ itemNoLength ++ patientNameJohnDoe ++ itemEnd ++ seqEnd ++ studyDate ++ itemEnd ++ seqEnd assertEquivalentToDcm4che(bytes) } + + it should "not handle non-standard encodings when specific character set is not specified" in { + val attr = new Attributes() + attr.setString(Tag.PatientName, VR.PN, "Ö₯") + val bytes = toBytes(attr, UID.ExplicitVRLittleEndian) + val attr2 = toAttributes(bytes)._2.get + attr2.getString(Tag.PatientName) should not be "Ö₯" + } + + it should "handle non-standard encodings" in { + val attr = new Attributes() + attr.setSpecificCharacterSet("ISO 2022 IR 100", "ISO 2022 IR 126") + attr.setString(Tag.PatientName, VR.PN, "Ö₯") + val bytes = toBytes(attr, UID.ExplicitVRLittleEndian) + val attr2 = toAttributes(bytes)._2.get + attr2.getString(Tag.PatientName) shouldBe "Ö₯" + } + } diff --git a/src/test/scala/se/nimsa/dcm4che/streams/DicomData.scala b/src/test/scala/se/nimsa/dcm4che/streams/DicomData.scala index 2ebfb2d..4f73898 100644 --- a/src/test/scala/se/nimsa/dcm4che/streams/DicomData.scala +++ b/src/test/scala/se/nimsa/dcm4che/streams/DicomData.scala @@ -170,6 +170,12 @@ object DicomData { def expectDicomError() = probe .request(1) .expectError() + + def expectAttributesPart(attributesPart: DicomAttributes) = probe + .request(1) + .expectNextChainingPF { + case p: DicomAttributes => p == attributesPart + } } } diff --git a/src/test/scala/se/nimsa/dcm4che/streams/DicomFlowsTest.scala b/src/test/scala/se/nimsa/dcm4che/streams/DicomFlowsTest.scala index b0d5ccd..2959943 100644 --- a/src/test/scala/se/nimsa/dcm4che/streams/DicomFlowsTest.scala +++ b/src/test/scala/se/nimsa/dcm4che/streams/DicomFlowsTest.scala @@ -8,9 +8,10 @@ import akka.stream.scaladsl.{FileIO, Source} import akka.stream.testkit.scaladsl.TestSink import akka.testkit.TestKit import akka.util.ByteString -import org.dcm4che3.data.{Tag, VR} +import org.dcm4che3.data._ import org.scalatest.{FlatSpecLike, Matchers} -import se.nimsa.dcm4che.streams.DicomParts.DicomPart +import se.nimsa.dcm4che.streams.DicomPartFlow.partFlow +import se.nimsa.dcm4che.streams.DicomParts.{DicomAttributes, DicomPart} class DicomFlowsTest extends TestKit(ActorSystem("DicomAttributesSinkSpec")) with FlatSpecLike with Matchers { @@ -195,7 +196,7 @@ class DicomFlowsTest extends TestKit(ActorSystem("DicomAttributesSinkSpec")) wit val file = new File(getClass.getResource("CT0055.dcm").toURI) val source = FileIO.fromPath(file.toPath) .via(new DicomPartFlow()) - .via(blacklistFilter((tag: Int) => DicomParsing.isFileMetaInformation(tag) , keepPreamble = false)) + .via(blacklistFilter((tag: Int) => DicomParsing.isFileMetaInformation(tag), keepPreamble = false)) source.runWith(TestSink.probe[DicomPart]) .expectHeader(Tag.SpecificCharacterSet) @@ -208,8 +209,7 @@ class DicomFlowsTest extends TestKit(ActorSystem("DicomAttributesSinkSpec")) wit val file = new File(getClass.getResource("CT0055.dcm").toURI) val source = FileIO.fromPath(file.toPath) .via(new DicomPartFlow()) - .via(blacklistFilter(DicomParsing.isPrivateAttribute(_))) - .via(printFlow[DicomPart]) + .via(blacklistFilter(DicomParsing.isPrivateAttribute)) source.runWith(TestSink.probe[DicomPart]) .expectPreamble() @@ -240,7 +240,6 @@ class DicomFlowsTest extends TestKit(ActorSystem("DicomAttributesSinkSpec")) wit val source = FileIO.fromPath(file.toPath) .via(DicomPartFlow.partFlow) .via(whitelistFilter(_ == Tag.PatientName)) - .via(printFlow[DicomPart]) source.runWith(TestSink.probe[DicomPart]) .expectHeader(Tag.PatientName) @@ -335,5 +334,83 @@ class DicomFlowsTest extends TestKit(ActorSystem("DicomAttributesSinkSpec")) wit source.runWith(TestSink.probe[DicomPart]) .expectDicomComplete() } + + "A collect attributes flow" should "first produce an attributes part followed by the input dicom parts" in { + val bytes = studyDate ++ patientNameJohnDoe + val tags = Set(Tag.StudyDate, Tag.PatientName) + val source = Source.single(bytes) + .via(partFlow) + .via(collectAttributesFlow(tags)) + + source.runWith(TestSink.probe[DicomPart]) + .request(1) + .expectNextChainingPF { + case DicomAttributes(attributes) => + attributes should have length 2 + attributes.head.header.tag shouldBe Tag.StudyDate + attributes(1).header.tag shouldBe Tag.PatientName + } + .expectHeader(Tag.StudyDate) + .expectValueChunk() + .expectHeader(Tag.PatientName) + .expectValueChunk() + .expectDicomComplete() + } + + it should "produce an empty attributes part when stream is empty" in { + val bytes = ByteString.empty + + val source = Source.single(bytes) + .via(partFlow) + .via(collectAttributesFlow(Set.empty)) + + source.runWith(TestSink.probe[DicomPart]) + .request(1) + .expectNextChainingPF { + case DicomAttributes(attributes) => attributes shouldBe empty + } + .expectDicomComplete() + } + + it should "produce an empty attributes part when no relevant attributes are present" in { + val bytes = patientNameJohnDoe ++ studyDate + + val source = Source.single(bytes) + .via(partFlow) + .via(collectAttributesFlow(Set(Tag.Modality, Tag.SeriesInstanceUID))) + + source.runWith(TestSink.probe[DicomPart]) + .request(1) + .expectNextChainingPF { + case DicomAttributes(attributes) => attributes shouldBe empty + } + .expectHeader(Tag.PatientName) + .expectValueChunk() + .expectHeader(Tag.StudyDate) + .expectValueChunk() + .expectDicomComplete() + } + + it should "apply the stop tag appropriately" in { + val bytes = studyDate ++ patientNameJohnDoe + + val source = Source.single(bytes) + .via(partFlow) + .via(collectAttributesFlow(Set(Tag.StudyDate))) + + source.runWith(TestSink.probe[DicomPart]) + .request(1) + .expectNextChainingPF { + case DicomAttributes(attributes) => + attributes should have length 1 + attributes.head.header.tag shouldBe Tag.StudyDate + } + .expectHeader(Tag.StudyDate) + .expectValueChunk() + .expectHeader(Tag.PatientName) + .expectValueChunk() + .expectDicomComplete() + } + }