Skip to content

Commit

Permalink
Added a flow for collecting a number of attributes
Browse files Browse the repository at this point in the history
  • Loading branch information
KarlSjostrand committed May 14, 2017
1 parent 72fca51 commit 3e6b3c5
Show file tree
Hide file tree
Showing 5 changed files with 281 additions and 71 deletions.
201 changes: 144 additions & 57 deletions src/main/scala/se/nimsa/dcm4che/streams/DicomFlows.scala
Expand Up @@ -21,6 +21,7 @@ import java.util.zip.Deflater
import akka.NotUsed
import akka.stream.scaladsl.{Flow, Source}
import akka.util.ByteString
import org.dcm4che3.io.DicomStreamException
import se.nimsa.dcm4che.streams.DicomParts._

/**
Expand Down Expand Up @@ -96,18 +97,21 @@ object DicomFlows {
* Filter a stream of dicom parts such that all attributes that are group length elements except
* file meta information group length, will be discarded. Group Length (gggg,0000) Standard Data Elements
* have been retired in the standard.
*
* @return the associated filter Flow
*/
def groupLengthDiscardFilter = blacklistFilter((tag: Int) => DicomParsing.isGroupLength(tag) && !DicomParsing.isFileMetaInformation(tag))

/**
* Discards the file meta information.
*
* @return the associated filter Flow
*/
def fmiDiscardFilter = blacklistFilter((tag: Int) => DicomParsing.isFileMetaInformation(tag), keepPreamble = false)

/**
* Blacklist filter for DICOM parts.
*
* @param tagCondition blacklist tag condition
* @param keepPreamble true if preamble should be kept, else false
* @return Flow of filtered parts
Expand All @@ -116,11 +120,12 @@ object DicomFlows {

/**
* Tag based whitelist filter for DICOM parts.
*
* @param tagCondition whitelist condition
* @param keepPreamble true if preamble should be kept, else false
* @return Flow of filtered parts
*/
def whitelistFilter(tagCondition: (Int) => Boolean, keepPreamble: Boolean = false): Flow[DicomPart, DicomPart, NotUsed] = tagFilter(tagCondition, isWhitelist = true, keepPreamble)
def whitelistFilter(tagCondition: (Int) => Boolean, keepPreamble: Boolean = false): Flow[DicomPart, DicomPart, NotUsed] = tagFilter(tagCondition, isWhitelist = true, keepPreamble)


private def tagFilter(tagCondition: (Int) => Boolean, isWhitelist: Boolean, keepPreamble: Boolean) = Flow[DicomPart].statefulMapConcat {
Expand Down Expand Up @@ -166,9 +171,9 @@ object DicomFlows {
case _: DicomFragmentsDelimitation if discarding => Nil

case _: DicomSequence if discarding => Nil
case _: DicomSequenceDelimitation if discarding => Nil
case _: DicomSequenceDelimitation if discarding => Nil

case dicomAttribute: DicomAttribute =>
case dicomAttribute: DicomAttribute =>
discarding = shouldDiscard(dicomAttribute.header.tag, isWhitelist)
if (discarding) {
Nil
Expand Down Expand Up @@ -241,72 +246,154 @@ object DicomFlows {
*
* @return
*/
def deflateDatasetFlow() = {
def deflateDatasetFlow() = Flow[DicomPart]
.concat(Source.single(DicomEndMarker))
.statefulMapConcat {
() =>
var inFmi = false
val buffer = new Array[Byte](2048)
val deflater = new Deflater(-1, true)

def deflate(dicomPart: DicomPart) = {
val input = dicomPart.bytes
deflater.setInput(input.toArray)
var output = ByteString.empty
while (!deflater.needsInput) {
val bytesDeflated = deflater.deflate(buffer)
output = output ++ ByteString(buffer.take(bytesDeflated))
}
if (output.isEmpty) Nil else DicomDeflatedChunk(dicomPart.bigEndian, output) :: Nil
}

case object DicomEndMarker extends DicomPart {
def bigEndian = false
def bytes = ByteString.empty
def finishDeflating() = {
deflater.finish()
var output = ByteString.empty
var done = false
while (!done) {
val bytesDeflated = deflater.deflate(buffer)
if (bytesDeflated == 0)
done = true
else
output = output ++ ByteString(buffer.take(bytesDeflated))
}
deflater.end()
if (output.isEmpty) Nil else DicomDeflatedChunk(bigEndian = false, output) :: Nil
}

{
case preamble: DicomPreamble => // preamble, do not deflate
preamble :: Nil
case DicomEndMarker => // end of stream, make sure deflater writes final bytes if deflating has occurred
if (deflater.getBytesRead > 0)
finishDeflating()
else
Nil
case deflatedChunk: DicomDeflatedChunk => // already deflated, pass as-is
deflatedChunk :: Nil
case header: DicomHeader if header.isFmi => // FMI, do not deflate and remember we are in FMI
inFmi = true
header :: Nil
case attribute: DicomAttribute if attribute.header.isFmi => // Whole FMI attribute, same as above
inFmi = true
attribute :: Nil
case header: DicomHeader => // Dataset header, remember we are no longer in FMI, deflate
inFmi = false
deflate(header)
case dicomPart if inFmi => // For any dicom part within FMI, do not deflate
dicomPart :: Nil
case dicomPart => // For all other cases, deflate
deflate(dicomPart)
}
}

/**
* Collect the attributes specified by the input set of tags while buffering all elements of the stream. When the
* stream has moved past the last attribute to collect, a DicomAttributesElement is emitted containing a list of
* DicomAttributeParts with the collected information, followed by all buffered elements. Remaining elements in the
* stream are immediately emitted downstream without buffering.
*
* This flow is used when there is a need to "look ahead" for certain information in the stream so that streamed
* elements can be processed correctly according to this information. As an example, an implementation may have
* different graph paths for different modalities and the modality must be known before any elements are processed.
*
* @param tags tag numbers of attributes to collect. Collection (and hence buffering) will end when the
* stream moves past the highest tag number
* @param maxBufferSize the maximum allowed size of the buffer (to avoid running out of memory). The flow will fail
* if this limit is exceed. Set to 0 for an unlimited buffer size
* @return A DicomPart Flow which will begin with a DicomAttributesPart followed by the input elements
*/
def collectAttributesFlow(tags: Set[Int], maxBufferSize: Int = 1000000): Flow[DicomPart, DicomPart, NotUsed] =
Flow[DicomPart]
.concat(Source.single(DicomEndMarker))
.statefulMapConcat {
val stopTag = if (tags.isEmpty) 0 else tags.max

() =>
var inFmi = false
val buffer = new Array[Byte](2048)
val deflater = new Deflater(-1, true)

def deflate(dicomPart: DicomPart) = {
val input = dicomPart.bytes
deflater.setInput(input.toArray)
var output = ByteString.empty
while (!deflater.needsInput) {
val bytesDeflated = deflater.deflate(buffer)
output = output ++ ByteString(buffer.take(bytesDeflated))
}
if (output.isEmpty) Nil else DicomDeflatedChunk(dicomPart.bigEndian, output) :: Nil
}
var reachedEnd = false
var currentBufferSize = 0
var currentAttribute: Option[DicomAttribute] = None
var buffer: List[DicomPart] = Nil
var attributes = Seq.empty[DicomAttribute]

def finishDeflating() = {
deflater.finish()
var output = ByteString.empty
var done = false
while (!done) {
val bytesDeflated = deflater.deflate(buffer)
if (bytesDeflated == 0)
done = true
else
output = output ++ ByteString(buffer.take(bytesDeflated))
}
deflater.end()
if (output.isEmpty) Nil else DicomDeflatedChunk(bigEndian = false, output) :: Nil
def attributesAndBuffer() = {
val parts = DicomAttributes(attributes) :: buffer

reachedEnd = true
buffer = Nil
currentBufferSize = 0

parts
}

{
case preamble: DicomPreamble => // preamble, do not deflate
preamble :: Nil
case DicomEndMarker => // end of stream, make sure deflater writes final bytes if deflating has occurred
if (deflater.getBytesRead > 0)
finishDeflating()
else
Nil
case deflatedChunk: DicomDeflatedChunk => // already deflated, pass as-is
deflatedChunk :: Nil
case header: DicomHeader if header.isFmi => // FMI, do not deflate and remember we are in FMI
inFmi = true
header :: Nil
case attribute: DicomAttribute if attribute.header.isFmi => // Whole FMI attribute, same as above
inFmi = true
attribute :: Nil
case header: DicomHeader => // Dataset header, remember we are no longer in FMI, deflate
inFmi = false
deflate(header)
case dicomPart if inFmi => // For any dicom part within FMI, do not deflate
dicomPart :: Nil
case dicomPart => // For all other cases, deflate
deflate(dicomPart)
case DicomEndMarker if reachedEnd =>
Nil

case DicomEndMarker =>
attributesAndBuffer()

case part if reachedEnd =>
part :: Nil

case part =>
currentBufferSize = currentBufferSize + part.bytes.size
if (maxBufferSize > 0 && currentBufferSize > maxBufferSize) {
throw new DicomStreamException("Error collecting attributes: max buffer size exceeded")
}

buffer = buffer :+ part

part match {
case header: DicomHeader if tags.contains(header.tag) =>
currentAttribute = Some(DicomAttribute(header, Seq.empty))
Nil

case header: DicomHeader =>
currentAttribute = None
Nil

case valueChunk: DicomValueChunk =>

currentAttribute match {
case Some(attribute) =>
val updatedAttribute = attribute.copy(valueChunks = attribute.valueChunks :+ valueChunk)
currentAttribute = Some(updatedAttribute)
if (valueChunk.last) {
attributes = attributes :+ updatedAttribute
currentAttribute = None
if (attributes.head.header.tag >= stopTag) {
attributesAndBuffer()
} else
Nil
} else
Nil

case None => Nil
}

case _ => Nil
}
}
}
}

}
18 changes: 14 additions & 4 deletions src/main/scala/se/nimsa/dcm4che/streams/DicomParts.scala
Expand Up @@ -37,9 +37,9 @@ object DicomParts {

def withUpdatedLength(newLength: Int) : DicomHeader = {

val updated = if ((bytes.size >= 8) && (explicitVR) && (vr.headerLength == 8)) { //explicit vr
val updated = if ((bytes.size >= 8) && explicitVR && (vr.headerLength == 8)) { //explicit vr
bytes.take(6) ++ DicomParsing.shortToBytes(newLength.toShort, bigEndian)
} else if ((bytes.size >= 12) && (explicitVR) && (vr.headerLength == 12)) { //explicit vr
} else if ((bytes.size >= 12) && explicitVR && (vr.headerLength == 12)) { //explicit vr
bytes.take(8) ++ DicomParsing.intToBytes(newLength, bigEndian)
} else { //implicit vr
bytes.take(4) ++ DicomParsing.intToBytes(newLength, bigEndian)
Expand Down Expand Up @@ -98,17 +98,27 @@ object DicomParts {
} else {
ByteString.fromArray(newBytes)
}
DicomAttribute(updatedHeader, Seq(DicomValueChunk(header.bigEndian, updatedValue, true)))
DicomAttribute(updatedHeader, Seq(DicomValueChunk(header.bigEndian, updatedValue, last = true)))
}

// DA: A string of characters of the format YYYYMMDD, 8 bytes fixed
def withUpdatedDateValue(newValue: String, cs: SpecificCharacterSet = SpecificCharacterSet.ASCII): DicomAttribute = {
val newBytes = header.vr.toBytes(newValue, cs)
val updatedValue = ByteString.fromArray(newBytes)
DicomAttribute(header, Seq(DicomValueChunk(header.bigEndian, updatedValue, true)))
DicomAttribute(header, Seq(DicomValueChunk(header.bigEndian, updatedValue, last = true)))
}

def asDicomParts: Seq[DicomPart] = header +: valueChunks
}

case object DicomEndMarker extends DicomPart {
def bigEndian = false
def bytes = ByteString.empty
}

case class DicomAttributes(attributes: Seq[DicomAttribute]) extends DicomPart {
def bigEndian = attributes.headOption.exists(_.bigEndian)
def bytes = attributes.map(_.bytes).reduce(_ ++ _)
}

}
@@ -1,15 +1,15 @@
package se.nimsa.dcm4che.streams

import java.io.ByteArrayInputStream
import java.io.{ByteArrayInputStream, ByteArrayOutputStream}
import java.util

import akka.actor.ActorSystem
import akka.stream.ActorMaterializer
import akka.stream.scaladsl.Source
import akka.testkit.TestKit
import akka.util.ByteString
import org.dcm4che3.data.{Attributes, Fragments, Tag}
import org.dcm4che3.io.{DicomInputStream, DicomStreamException}
import org.dcm4che3.data._
import org.dcm4che3.io.{DicomInputStream, DicomOutputStream, DicomStreamException}
import org.scalatest.{AsyncFlatSpecLike, Matchers}

import scala.concurrent.Future
Expand All @@ -30,11 +30,18 @@ class DicomAttributesSinkTest extends TestKit(ActorSystem("DicomAttributesSinkSp
(fmi, dataset)
}

def toBytes(attributes: Attributes, tsuid: String): ByteString = {
val baos = new ByteArrayOutputStream()
val dos = new DicomOutputStream(baos, tsuid)
dos.writeDataset(null, attributes)
dos.close()
ByteString(baos.toByteArray)
}

def toFlowAttributes(bytes: ByteString): Future[(Option[Attributes], Option[Attributes])] =
Source.single(bytes)
.via(new DicomPartFlow())
.via(attributeFlow)
//.via(printFlow)
.runWith(attributesSink)

def assertEquivalentToDcm4che(bytes: ByteString) = {
Expand All @@ -56,6 +63,11 @@ class DicomAttributesSinkTest extends TestKit(ActorSystem("DicomAttributesSinkSp
assertEquivalentToDcm4che(bytes)
}

it should "be equivalent to dcm4che for a file with big endian encoding" in {
val bytes = patientNameJohnDoeBE
assertEquivalentToDcm4che(bytes)
}

it should "accept but not produce FMI nor dataset attributes for a file consisting of a preamble only" in {
val bytes = preamble

Expand Down Expand Up @@ -137,4 +149,22 @@ class DicomAttributesSinkTest extends TestKit(ActorSystem("DicomAttributesSinkSp
val bytes = seqStart ++ itemNoLength ++ seqStart ++ itemNoLength ++ patientNameJohnDoe ++ itemEnd ++ seqEnd ++ studyDate ++ itemEnd ++ seqEnd
assertEquivalentToDcm4che(bytes)
}

it should "not handle non-standard encodings when specific character set is not specified" in {
val attr = new Attributes()
attr.setString(Tag.PatientName, VR.PN, "Ö₯")
val bytes = toBytes(attr, UID.ExplicitVRLittleEndian)
val attr2 = toAttributes(bytes)._2.get
attr2.getString(Tag.PatientName) should not be "Ö₯"
}

it should "handle non-standard encodings" in {
val attr = new Attributes()
attr.setSpecificCharacterSet("ISO 2022 IR 100", "ISO 2022 IR 126")
attr.setString(Tag.PatientName, VR.PN, "Ö₯")
val bytes = toBytes(attr, UID.ExplicitVRLittleEndian)
val attr2 = toAttributes(bytes)._2.get
attr2.getString(Tag.PatientName) shouldBe "Ö₯"
}

}
6 changes: 6 additions & 0 deletions src/test/scala/se/nimsa/dcm4che/streams/DicomData.scala
Expand Up @@ -170,6 +170,12 @@ object DicomData {
def expectDicomError() = probe
.request(1)
.expectError()

def expectAttributesPart(attributesPart: DicomAttributes) = probe
.request(1)
.expectNextChainingPF {
case p: DicomAttributes => p == attributesPart
}
}

}

0 comments on commit 3e6b3c5

Please sign in to comment.