Skip to content

Commit

Permalink
Merge 6a0058e into 586763c
Browse files Browse the repository at this point in the history
  • Loading branch information
KarlSjostrand committed Jul 25, 2017
2 parents 586763c + 6a0058e commit 6610890
Show file tree
Hide file tree
Showing 13 changed files with 1,032 additions and 259 deletions.
18 changes: 7 additions & 11 deletions build.sbt
@@ -1,15 +1,9 @@
import de.heikoseeberger.sbtheader.license.Apache2_0
name := "dcm4che-streams"
version := "0.3"
version := "0.4"
organization := "se.nimsa"
scalaVersion := "2.12.2"
crossScalaVersions := Seq("2.11.8", "2.12.2")
scalacOptions := Seq("-encoding", "UTF-8", "-Xlint", "-deprecation", "-unchecked", "-feature", "-target:jvm-1.8")

// define the project

lazy val root = (project in file(".")).enablePlugins(GitBranchPrompt)

// repos

resolvers ++= Seq(
Expand All @@ -19,13 +13,13 @@ resolvers ++= Seq(
// deps

libraryDependencies ++= {
val akkaVersion = "2.4.17"
val akkaVersion = "2.5.2"
Seq(
"com.typesafe.akka" %% "akka-stream" % akkaVersion,
"com.typesafe.akka" %% "akka-slf4j" % akkaVersion,
"org.slf4j" % "slf4j-simple" % "1.7.22",
"org.slf4j" % "slf4j-simple" % "1.7.25",
"org.dcm4che" % "dcm4che-core" % "3.3.8" % "provided",
"org.scalatest" %% "scalatest" % "3.0.1" % "test",
"org.scalatest" %% "scalatest" % "3.0.3" % "test",
"com.typesafe.akka" %% "akka-stream-testkit" % akkaVersion % "test"
)
}
Expand All @@ -34,7 +28,9 @@ updateOptions := updateOptions.value.withCachedResolution(true)

// for automatic license stub generation

headers := Map("scala" -> Apache2_0("2017", "Lars Edenbrandt"))
organizationName := "Lars Edenbrandt"
startYear := Some(2017)
licenses += ("Apache-2.0", new URL("https://www.apache.org/licenses/LICENSE-2.0.txt"))

// publish
publishMavenStyle := true
Expand Down
6 changes: 1 addition & 5 deletions project/plugins.sbt
Expand Up @@ -2,11 +2,7 @@ resolvers += Resolver.typesafeRepo("releases")

resolvers += Classpaths.typesafeReleases

addSbtPlugin("com.typesafe.sbt" % "sbt-git" % "0.8.5")

addSbtPlugin("com.timushev.sbt" % "sbt-updates" % "0.2.0")

addSbtPlugin("de.heikoseeberger" % "sbt-header" % "1.5.1")
addSbtPlugin("de.heikoseeberger" % "sbt-header" % "2.0.0")

addSbtPlugin("org.scoverage" % "sbt-scoverage" % "1.5.0")

Expand Down
164 changes: 61 additions & 103 deletions src/main/scala/se/nimsa/dcm4che/streams/DicomFlows.scala
Expand Up @@ -21,7 +21,7 @@ import java.util.zip.Deflater
import akka.NotUsed
import akka.stream.scaladsl.{Flow, Source}
import akka.util.ByteString
import org.dcm4che3.data.{StandardElementDictionary, VR}
import org.dcm4che3.data.Tag
import org.dcm4che3.io.DicomStreamException
import se.nimsa.dcm4che.streams.DicomParts._

Expand Down Expand Up @@ -94,6 +94,14 @@ object DicomFlows {
*/
def whitelistFilter(tagsWhitelist: Seq[Int]): Flow[DicomPart, DicomPart, NotUsed] = whitelistFilter(tagsWhitelist.contains(_))

/**
* Filter a stream of dicom parts such that attributes with tags in the black list are discarded.
*
* @param tagsBlacklist list of tags to discard.
* @return the associated filter Flow
*/
def blacklistFilter(tagsBlacklist: Seq[Int]): Flow[DicomPart, DicomPart, NotUsed] = blacklistFilter(tagsBlacklist.contains(_))

/**
* Filter a stream of dicom parts such that all attributes that are group length elements except
* file meta information group length, will be discarded. Group Length (gggg,0000) Standard Data Elements
Expand Down Expand Up @@ -124,7 +132,7 @@ object DicomFlows {
*
* @param tagCondition whitelist condition
* @param keepPreamble true if preamble should be kept, else false
* @return Flow of filtered parts
* @return Flow of filtered parts
*/
def whitelistFilter(tagCondition: (Int) => Boolean, keepPreamble: Boolean = false): Flow[DicomPart, DicomPart, NotUsed] = tagFilter(tagCondition, isWhitelist = true, keepPreamble)

Expand Down Expand Up @@ -205,113 +213,13 @@ object DicomFlows {
*/
def validateFlowWithContext(contexts: Seq[ValidationContext]) = Flow[ByteString].via(new DicomValidateFlow(Some(contexts)))

/**
* Class used to specify modifications to individual attributes of a dataset
* @param tag tag number
* @param modification a modification function
* @param insert if tag is absent in dataset it will be created and inserted when `true`
*/
case class TagModification(tag: Int, modification: ByteString => ByteString, insert: Boolean)

/**
* Simple modification flow for inserting or overwriting the values of specified attributes. Only modifies or
* inserts attributes in the root dataset, not inside sequences. When inserting a new attribute, the corresponding
* modification function will be called with an empty `ByteString`.
*
* @param modifications Any number of `TagModification`s each specifying a tag number, a modification function, and
* a Boolean indicating whether absent values will be inserted or skipped.
* @return the modified flow of DICOM parts
*/
def modifyFlow(modifications: TagModification*): Flow[DicomPart, DicomPart, NotUsed] =
Flow[DicomPart]
.concat(Source.single(DicomEndMarker))
.statefulMapConcat {
() =>
var modificationsLeft = modifications.toList
var value = ByteString.empty
var currentHeader: Option[DicomHeader] = None
var currentModification: Option[TagModification] = None
var sequenceDepth = 0
var bigEndian = false
var explicitVR = true
var hasAttributes = false

def updateSyntax(header: DicomHeader): Unit = {
bigEndian = header.bigEndian
explicitVR = header.explicitVR
hasAttributes = true
}

def headerAndValue(tag: Int): List[DicomPart] = {
val modification = modifications.find(_.tag == tag).map(_.modification).head
val valueBytes = modification(ByteString.empty)
val dictVr = StandardElementDictionary.INSTANCE.vrOf(tag)
if (dictVr == VR.SQ) throw new IllegalArgumentException("Cannot insert sequence attributes")
val vr = if (dictVr == VR.UN) VR.LO else dictVr
val isFmi = DicomParsing.isFileMetaInformation(tag)
val header = DicomHeader(tag, vr, valueBytes.length, isFmi, bigEndian, explicitVR)
val value = DicomValueChunk(bigEndian, valueBytes, last = true)
modificationsLeft = modificationsLeft.filterNot(_.tag == tag)
header :: value :: Nil
}

{
case header: DicomHeader =>
updateSyntax(header)
if (sequenceDepth == 0) {
val inserts = modificationsLeft
.filter(_.insert)
.filter(_.tag < header.tag)
.sortBy(_.tag)
.flatMap(modification => headerAndValue(modification.tag))
val modify = modificationsLeft
.find(_.tag == header.tag)
.map { modification =>
currentHeader = Some(header)
value = ByteString.empty
currentModification = Some(modification)
modificationsLeft = modificationsLeft.filterNot(_.tag == header.tag)
Nil
}
.getOrElse(header :: Nil)
inserts ::: modify
} else
header :: Nil
case chunk: DicomValueChunk if currentModification.isDefined && currentHeader.isDefined =>
value = value ++ chunk.bytes
if (chunk.last) {
val newValue = currentModification.get.modification(value)
val newHeader = currentHeader.get.withUpdatedLength(newValue.length.toShort)
currentModification = None
currentHeader = None
newHeader :: DicomValueChunk(bigEndian, newValue, last = true) :: Nil
} else
Nil
case s: DicomSequence =>
sequenceDepth += 1
s :: Nil
case s: DicomSequenceDelimitation =>
sequenceDepth -= 1
s :: Nil
case DicomEndMarker if hasAttributes =>
modificationsLeft
.filter(_.insert)
.sortBy(_.tag)
.flatMap(modification => headerAndValue(modification.tag))
case DicomEndMarker =>
Nil
case part =>
part :: Nil
}
}

/**
* A flow which deflates the dataset but leaves the meta information intact. Useful when the dicom parsing in DicomPartFlow
* has inflated a deflated (1.2.840.10008.1.2.1.99 or 1.2.840.10008.1.2.4.95) file, and analyzed and possibly transformed its
* attributes. At that stage, in order to maintain valid DICOM information, one can either change the transfer syntax to
* an appropriate value for non-deflated data, or deflate the data again. This flow helps with the latter.
*
* @return
* @return the associated DicomPart Flow
*/
def deflateDatasetFlow() = Flow[DicomPart]
.concat(Source.single(DicomEndMarker))
Expand Down Expand Up @@ -463,4 +371,54 @@ object DicomFlows {
}
}

/**
* Remove attributes from stream that may contain large quantities of data (bulk data)
*
* Rules ported from [[https://github.com/dcm4che/dcm4che/blob/3.3.8/dcm4che-core/src/main/java/org/dcm4che3/io/BulkDataDescriptor.java#L58 dcm4che]].
* Defined [[http://dicom.nema.org/medical/dicom/current/output/html/part04.html#table_Z.1-1 here in the DICOM standard]].
* @return the associated DicomPart Flow
*/
val bulkDataFilter = Flow[DicomPart]
.statefulMapConcat {

def normalizeRepeatingGroup(tag: Int) = {
val gg000000 = tag & 0xffe00000
if (gg000000 == 0x50000000 || gg000000 == 0x60000000) tag & 0xffe0ffff else tag
}

() =>
var sequenceStack = Seq.empty[DicomSequence]
var discarding = false

{
case sq: DicomSequence =>
sequenceStack = sq +: sequenceStack
sq :: Nil
case sqd: DicomSequenceDelimitation =>
sequenceStack = sequenceStack.drop(1)
sqd :: Nil
case dh: DicomHeader =>
discarding =
normalizeRepeatingGroup(dh.tag) match {
case Tag.PixelDataProviderURL => true
case Tag.AudioSampleData => true
case Tag.CurveData => true
case Tag.SpectroscopyData => true
case Tag.OverlayData => true
case Tag.EncapsulatedDocument => true
case Tag.FloatPixelData => true
case Tag.DoubleFloatPixelData => true
case Tag.PixelData => sequenceStack.isEmpty
case Tag.WaveformData => sequenceStack.length == 1 && sequenceStack.head.tag == Tag.WaveformSequence
case _ => false
}
if (discarding) Nil else dh :: Nil
case dvc: DicomValueChunk =>
if (discarding) Nil else dvc :: Nil
case p: DicomPart =>
discarding = false
p :: Nil
}
}

}
129 changes: 129 additions & 0 deletions src/main/scala/se/nimsa/dcm4che/streams/DicomModifyFlow.scala
@@ -0,0 +1,129 @@
package se.nimsa.dcm4che.streams

import akka.NotUsed
import akka.stream.scaladsl.{Flow, Source}
import akka.util.ByteString
import org.dcm4che3.data.{StandardElementDictionary, VR}
import se.nimsa.dcm4che.streams.DicomParts._
import se.nimsa.dcm4che.streams.TagPath.{TagPathSequence, TagPathTag}

object DicomModifyFlow {

/**
* Class used to specify modifications to individual attributes of a dataset
*
* @param tagPath tag path
* @param modification a modification function
* @param insert if tag is absent in dataset it will be created and inserted when `true`
*/
case class TagModification(tagPath: TagPathTag, modification: ByteString => ByteString, insert: Boolean)

/**
* Modification flow for inserting or overwriting the values of specified attributes. When inserting a new attribute,
* the corresponding modification function will be called with an empty `ByteString`. A modification is specified by
* a tag path, a modification function from current value bytes to their replacement, and a flag specifying whether
* the attribute should be inserted if not present. Attributes are inserted only if they point to existing datasets.
* That means that sequences are never inserted, only modified. Insertion works according to the `TagPathTag#contains`
* method, meaning that if sequence wildcards are used in modifications they will apply to all items in a sequence.
*
* @param modifications Any number of `TagModification`s each specifying a tag path, a modification function, and
* a Boolean indicating whether absent values will be inserted or skipped.
* @return the modified flow of DICOM parts
*/
def modifyFlow(modifications: TagModification*): Flow[DicomPart, DicomPart, NotUsed] =
Flow[DicomPart]
.concat(Source.single(DicomEndMarker))
.statefulMapConcat {
() =>
val sortedModifications = modifications.toList.sortWith((a, b) => a.tagPath < b.tagPath)

var currentModification: Option[TagModification] = None // current modification
var currentHeader: Option[DicomHeader] = None // header of current attribute being modified
var currentTagPath: Option[TagPath] = None
var value = ByteString.empty // value of current attribute being modified
var bigEndian = false // endinaness of current attribute
var explicitVR = true // VR representation of current attribute

var tagPathSequence: Option[TagPathSequence] = None // current sequence path. None = currently in root dataset

def updateSyntax(header: DicomHeader): Unit = {
bigEndian = header.bigEndian
explicitVR = header.explicitVR
}

def headerAndValueParts(tagPath: TagPath, modification: ByteString => ByteString): List[DicomPart] = {
val valueBytes = modification(ByteString.empty)
val vr = StandardElementDictionary.INSTANCE.vrOf(tagPath.tag)
if (vr == VR.UN) throw new IllegalArgumentException("Tag is not present in dictionary, cannot determine value representation")
if (vr == VR.SQ) throw new IllegalArgumentException("Cannot insert sequence attributes")
val isFmi = DicomParsing.isFileMetaInformation(tagPath.tag)
val header = DicomHeader(tagPath.tag, vr, valueBytes.length, isFmi, bigEndian, explicitVR)
val value = DicomValueChunk(bigEndian, valueBytes, last = true)
header :: value :: Nil
}

def isBetween(tagToTest: TagPath, upperTag: TagPath, lowerTagMaybe: Option[TagPath]) =
tagToTest < upperTag && lowerTagMaybe.forall(_ < tagToTest)

def isInDataset(tagToTest: TagPath, sequenceMaybe: Option[TagPathSequence]) =
sequenceMaybe.map(tagToTest.contains).getOrElse(tagToTest.isRoot)

{
case header: DicomHeader =>
updateSyntax(header)
val tagPath = tagPathSequence.map(_.thenTag(header.tag)).getOrElse(TagPath.fromTag(header.tag))
val insertParts = sortedModifications
.filter(_.insert)
.filter(m => isBetween(m.tagPath, tagPath, currentTagPath))
.filter(m => isInDataset(m.tagPath, tagPathSequence))
.flatMap(m => headerAndValueParts(m.tagPath, m.modification))
val modifyPart = sortedModifications
.find(_.tagPath.contains(tagPath))
.map { tagModification =>
currentHeader = Some(header)
value = ByteString.empty
currentModification = Some(tagModification)
Nil
}
.getOrElse(header :: Nil)
currentTagPath = Some(tagPath)
insertParts ::: modifyPart
case chunk: DicomValueChunk if currentModification.isDefined && currentHeader.isDefined =>
value = value ++ chunk.bytes
if (chunk.last) {
val newValue = currentModification.get.modification(value)
val newHeader = currentHeader.get.withUpdatedLength(newValue.length.toShort)
currentModification = None
currentHeader = None
newHeader :: DicomValueChunk(bigEndian, newValue, last = true) :: Nil
} else
Nil
case sequence: DicomSequence =>
tagPathSequence = tagPathSequence.map(_.thenSequence(sequence.tag)).orElse(Some(TagPath.fromSequence(sequence.tag)))
sequence :: Nil
case sequenceDelimitation: DicomSequenceDelimitation =>
tagPathSequence = tagPathSequence.flatMap(_.previous)
sequenceDelimitation :: Nil
case fragments: DicomFragments =>
tagPathSequence = tagPathSequence.map(_.thenSequence(fragments.tag)).orElse(Some(TagPath.fromSequence(fragments.tag)))
fragments :: Nil
case fragmentsDelimitation: DicomFragmentsDelimitation =>
tagPathSequence = tagPathSequence.flatMap(_.previous)
fragmentsDelimitation :: Nil
case item: DicomItem =>
tagPathSequence = tagPathSequence.flatMap(s => s.previous.map(_.thenSequence(s.tag, item.index)).orElse(Some(TagPath.fromSequence(s.tag, item.index))))
item :: Nil
case itemDelimitation: DicomItemDelimitation =>
itemDelimitation :: Nil
case DicomEndMarker =>
sortedModifications
.filter(_.insert)
.filter(_.tagPath.isRoot)
.filter(m => currentTagPath.exists(_ < m.tagPath))
.flatMap(m => headerAndValueParts(m.tagPath, m.modification))
case part =>
part :: Nil
}
}

}

0 comments on commit 6610890

Please sign in to comment.