Skip to content

Commit

Permalink
Added a bulk data filter, with tests
Browse files Browse the repository at this point in the history
  • Loading branch information
KarlSjostrand committed Jun 4, 2017
1 parent 55b0a43 commit 7d797b5
Show file tree
Hide file tree
Showing 4 changed files with 124 additions and 8 deletions.
68 changes: 62 additions & 6 deletions src/main/scala/se/nimsa/dcm4che/streams/DicomFlows.scala
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ import java.util.zip.Deflater
import akka.NotUsed
import akka.stream.scaladsl.{Flow, Source}
import akka.util.ByteString
import org.dcm4che3.data.{StandardElementDictionary, VR}
import org.dcm4che3.data.{StandardElementDictionary, Tag, VR}
import org.dcm4che3.io.DicomStreamException
import se.nimsa.dcm4che.streams.DicomParts._

Expand Down Expand Up @@ -94,6 +94,14 @@ object DicomFlows {
*/
def whitelistFilter(tagsWhitelist: Seq[Int]): Flow[DicomPart, DicomPart, NotUsed] = whitelistFilter(tagsWhitelist.contains(_))

/**
* Filter a stream of dicom parts such that attributes with tags in the black list are discarded.
*
* @param tagsBlacklist list of tags to discard.
* @return the associated filter Flow
*/
def blacklistFilter(tagsBlacklist: Seq[Int]): Flow[DicomPart, DicomPart, NotUsed] = blacklistFilter(tagsBlacklist.contains(_))

/**
* Filter a stream of dicom parts such that all attributes that are group length elements except
* file meta information group length, will be discarded. Group Length (gggg,0000) Standard Data Elements
Expand Down Expand Up @@ -124,7 +132,7 @@ object DicomFlows {
*
* @param tagCondition whitelist condition
* @param keepPreamble true if preamble should be kept, else false
* @return Flow of filtered parts
* @return Flow of filtered parts
*/
def whitelistFilter(tagCondition: (Int) => Boolean, keepPreamble: Boolean = false): Flow[DicomPart, DicomPart, NotUsed] = tagFilter(tagCondition, isWhitelist = true, keepPreamble)

Expand Down Expand Up @@ -207,9 +215,10 @@ object DicomFlows {

/**
* Class used to specify modifications to individual attributes of a dataset
* @param tag tag number
*
* @param tag tag number
* @param modification a modification function
* @param insert if tag is absent in dataset it will be created and inserted when `true`
* @param insert if tag is absent in dataset it will be created and inserted when `true`
*/
case class TagModification(tag: Int, modification: ByteString => ByteString, insert: Boolean)

Expand Down Expand Up @@ -274,7 +283,7 @@ object DicomFlows {
Nil
}
.getOrElse(header :: Nil)
inserts ::: modify
inserts ::: modify
} else
header :: Nil
case chunk: DicomValueChunk if currentModification.isDefined && currentHeader.isDefined =>
Expand Down Expand Up @@ -311,7 +320,7 @@ object DicomFlows {
* attributes. At that stage, in order to maintain valid DICOM information, one can either change the transfer syntax to
* an appropriate value for non-deflated data, or deflate the data again. This flow helps with the latter.
*
* @return
* @return the associated DicomPart Flow
*/
def deflateDatasetFlow() = Flow[DicomPart]
.concat(Source.single(DicomEndMarker))
Expand Down Expand Up @@ -463,4 +472,51 @@ object DicomFlows {
}
}

/**
* Remove attributes from stream that may contain large quantities of data (bulk data)
* @return the associated DicomPart Flow
*/
val bulkDataFilter = Flow[DicomPart]
.statefulMapConcat {

def normalizeRepeatingGroup(tag: Int) = {
val gg000000 = tag & 0xffe00000
if (gg000000 == 0x50000000 || gg000000 == 0x60000000) tag & 0xffe0ffff else tag
}

() =>
var sequenceStack = Seq.empty[DicomSequence]
var discarding = false

{
case sq: DicomSequence =>
sequenceStack = sq +: sequenceStack
sq :: Nil
case sqd: DicomSequenceDelimitation =>
sequenceStack = sequenceStack.drop(1)
sqd :: Nil
case dh: DicomHeader =>
discarding =
normalizeRepeatingGroup(dh.tag) match {
case Tag.PixelDataProviderURL => true
case Tag.AudioSampleData => true
case Tag.CurveData => true
case Tag.SpectroscopyData => true
case Tag.OverlayData => true
case Tag.EncapsulatedDocument => true
case Tag.FloatPixelData => true
case Tag.DoubleFloatPixelData => true
case Tag.PixelData => sequenceStack.isEmpty
case Tag.WaveformData => sequenceStack.length == 1 && sequenceStack.head.tag == Tag.WaveformSequence
case _ => false
}
if (discarding) Nil else dh :: Nil
case dvc: DicomValueChunk =>
if (discarding) Nil else dvc :: Nil
case p: DicomPart =>
discarding = false
p :: Nil
}
}

}
2 changes: 1 addition & 1 deletion src/main/scala/se/nimsa/dcm4che/streams/DicomParts.scala
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ object DicomParts {
}

case class DicomValueChunk(bigEndian: Boolean, bytes: ByteString, last: Boolean) extends DicomPart {
override def toString = s"DicomValueChunk ${if (last) "(last) " else ""}length = ${bytes.length} ${if (bigEndian) "(big endian) " else ""}ASCII = '${new String(bytes.toArray, "US-ASCII")}' $bytes"
override def toString = s"DicomValueChunk ${if (last) "(last) " else ""}length = ${bytes.length} ${if (bigEndian) "(big endian) " else ""}ASCII = '${bytes.utf8String}' $bytes"
}

case class DicomDeflatedChunk(bigEndian: Boolean, bytes: ByteString) extends DicomPart
Expand Down
2 changes: 2 additions & 0 deletions src/test/scala/se/nimsa/dcm4che/streams/DicomData.scala
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ object DicomData {
val pixeDataFragments = ByteString(224, 127, 16, 0, 79, 87, 0, 0, 255, 255, 255, 255) // VR = OW, length = -1

val seqStart = ByteString(0x08, 0x00, 0x15, 0x92, 'S', 'Q', 0, 0, -1, -1, -1, -1)
val waveformSeqStart = ByteString(0x00, 0x54, 0x00, 0x01, 'S', 'Q', 0, 0, -1, -1, -1, -1)

// file meta with wrong transfer syntax:
// implicit little endian (not conforming to standard)
Expand All @@ -73,6 +74,7 @@ object DicomData {
val tsuidExplicitLEImplicitLE = ByteString(2, 0, 16, 0, 20, 0, 0, 0, '1', '.', '2', '.', '8', '4', '0', '.', '1', '0', '0', '0', '8', '.', '1', '.', '2', '.', '1', 0)

def pixelData(length: Int) = ByteString(0xe0, 0x7f, 0x10, 0x00, 0x4f, 0x42, 0, 0) ++ DicomParsing.intToBytes(length, bigEndian = false) ++ ByteString(new Array[Byte](length))
def waveformData(length: Int) = ByteString(0x00, 0x54, 0x10, 0x10, 0x4f, 0x42, 0, 0) ++ DicomParsing.intToBytes(length, bigEndian = false) ++ ByteString(new Array[Byte](length))


implicit class DicomPartProbe(probe: TestSubscriber.Probe[DicomPart]) {
Expand Down
60 changes: 59 additions & 1 deletion src/test/scala/se/nimsa/dcm4che/streams/DicomFlowsTest.scala
Original file line number Diff line number Diff line change
Expand Up @@ -209,7 +209,7 @@ class DicomFlowsTest extends TestKit(ActorSystem("DicomAttributesSinkSpec")) wit
val file = new File(getClass.getResource("CT0055.dcm").toURI)
val source = FileIO.fromPath(file.toPath)
.via(new DicomPartFlow())
.via(blacklistFilter(DicomParsing.isPrivateAttribute))
.via(blacklistFilter(DicomParsing.isPrivateAttribute _))

source.runWith(TestSink.probe[DicomPart])
.expectPreamble()
Expand Down Expand Up @@ -478,5 +478,63 @@ class DicomFlowsTest extends TestKit(ActorSystem("DicomAttributesSinkSpec")) wit
.expectDicomComplete()
}

"The buld data filter flow" should "remove pixel data" in {
val bytes = preamble ++ fmiGroupLength(tsuidExplicitLE) ++ tsuidExplicitLE ++ patientNameJohnDoe ++ pixelData(1000)

val source = Source.single(bytes)
.via(DicomPartFlow.partFlow)
.via(bulkDataFilter)

source.runWith(TestSink.probe[DicomPart])
.expectPreamble()
.expectHeader(Tag.FileMetaInformationGroupLength)
.expectValueChunk()
.expectHeader(Tag.TransferSyntaxUID)
.expectValueChunk()
.expectHeader(Tag.PatientName)
.expectValueChunk()
.expectDicomComplete()
}

it should "not remove pixel data in sequences" in {
val bytes = seqStart ++ itemNoLength ++ patientNameJohnDoe ++ pixelData(100) ++ itemEnd ++ seqEnd

val source = Source.single(bytes)
.via(DicomPartFlow.partFlow)
.via(bulkDataFilter)

source.runWith(TestSink.probe[DicomPart])
.expectSequence(Tag.DerivationCodeSequence)
.expectItem()
.expectHeader(Tag.PatientName)
.expectValueChunk()
.expectHeader(Tag.PixelData)
.expectValueChunk()
.expectItemDelimitation()
.expectSequenceDelimitation()
.expectDicomComplete()
}

it should "only remove waveform data when inside waveform sequence" in {
val bytes = waveformSeqStart ++ itemNoLength ++ patientNameJohnDoe ++ waveformData(100) ++ itemEnd ++ seqEnd ++ patientNameJohnDoe ++ waveformData(100)

val source = Source.single(bytes)
.via(DicomPartFlow.partFlow)
.via(bulkDataFilter)

source.runWith(TestSink.probe[DicomPart])
.expectSequence(Tag.WaveformSequence)
.expectItem()
.expectHeader(Tag.PatientName)
.expectValueChunk()
.expectItemDelimitation()
.expectSequenceDelimitation()
.expectHeader(Tag.PatientName)
.expectValueChunk()
.expectHeader(Tag.WaveformData)
.expectValueChunk()
.expectDicomComplete()
}

}

0 comments on commit 7d797b5

Please sign in to comment.