Skip to content

Commit

Permalink
Merge 1d4231d into 44a072b
Browse files Browse the repository at this point in the history
  • Loading branch information
kobmic committed Oct 17, 2017
2 parents 44a072b + 1d4231d commit c7b0250
Show file tree
Hide file tree
Showing 28 changed files with 2,882 additions and 1,025 deletions.
92 changes: 61 additions & 31 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -21,60 +21,90 @@ The following example reads a DICOM file from disk, validates that it is a DICOM
and writes it to a new file.

```scala
import akka.stream.scaladsl.FileIO
import java.nio.file.Paths
import org.dcm4che3.data.Tag
import se.nimsa.dcm4che.streams.DicomFlows._
import se.nimsa.dcm4che.streams.DicomPartFlow._

FileIO.fromPath(Paths.get("source-file.dcm"))
.via(validateFlow)
.via(partFlow)
.via(blacklistFilter(DicomParsing.isPrivateAttribute(_)))
.via(parseFlow)
.via(tagFilter(_ => true)(tagPath => tagPath.toList.map(_.tag).exists(isPrivateAttribute))) // no private attributes anywhere on tag path
.map(_.bytes)
.runWith(FileIO.toPath(Paths.get("target-file.dcm")))
```

Same result can be achieved with a whitelist filter instead, but we need to tell the filter
to keep the preamble:
Care should be taken when modifying DICOM data so that the resulting data is still valid. For instance, group length
tags may need to be removed or updated after modifying attributes. Here is an example that modifies the `PatientName`
and `SOPInstanceUID` attributes. To ensure the resulting data is valid, group length tags in the dataset are removed and
the meta information group tag is updated.

```scala
import akka.stream.scaladsl.FileIO
import java.nio.file.Paths
import org.dcm4che3.data.Tag
import se.nimsa.dcm4che.streams.DicomFlows._
import se.nimsa.dcm4che.streams.DicomPartFlow._
val updatedSOPInstanceUID = padToEvenLength(ByteString(createUID()), VR.UI)

FileIO.fromPath(Paths.get("source-file.dcm"))
.via(validateFlow)
.via(partFlow)
.via(whitelistFilter(!DicomParsing.isPrivateAttribute(_), keepPreamble = true))
.via(parseFlow)
.via(groupLengthDiscardFilter) // discard group length attributes in dataset
.via(modifyFlow(
TagModification.endsWith(TagPath.fromTag(Tag.PatientName), _ => padToEvenLength(ByteString("John Doe"), VR.PN), insert = false),
TagModification.endsWith(TagPath.fromTag(Tag.MediaStorageSOPInstanceUID), _ => updatedSOPInstanceUID, insert = false),
TagModification.endsWith(TagPath.fromTag(Tag.SOPInstanceUID), _ => updatedSOPInstanceUID, insert = true),
))
.via(fmiGroupLengthFlow()) // update group length in meta information, if present
.map(_.bytes)
.runWith(FileIO.toPath(Paths.get("target-file.dcm")))
```


The next example materializes the above stream as dcm4che `Attributes` objects instead of writing data to disk.

The next example materializes a stream as a dcm4che `Attributes` objects instead of writing data to disk.

```scala
import akka.stream.scaladsl.FileIO
import java.nio.file.Paths
import org.dcm4che3.data.{Attributes, Tag}
import scala.concurrent.Future
import se.nimsa.dcm4che.streams.DicomAttributesSink._
import se.nimsa.dcm4che.streams.DicomFlows._
import se.nimsa.dcm4che.streams.DicomPartFlow._

val futureAttributes: Future[(Option[Attributes], Option[Attributes])] =
FileIO.fromPath(Paths.get("source-file.dcm"))
.via(validateFlow)
.via(partFlow)
.via(whitelistFilter(Seq(Tag.PatientName, Tag.PatientID)))
.via(parseFlow)
.via(attributeFlow) // must turn headers + chunks into complete attributes before materializing
.runWith(attributesSink)

futureAttributes.map {
case (maybeMetaInformation, maybeDataset) => ??? // do something with attributes here
}
```
```

New non-trivial DICOM flows can be built using a modular system of capabilities that are mixed in as appropriate with a
core class implementing a common base interface. The base interface for DICOM flows is `DicomFlow` and new flows are
created using the `DicomFlowFactory.create` method. The `DicomFlow` interface defines a series of events, one for each
type of `DicomPart` that is produced when parsing DICOM data with `DicomParseFlow`. The core events are:
```scala
def onPreamble(part: DicomPreamble): List[DicomPart]
def onHeader(part: DicomHeader): List[DicomPart]
def onValueChunk(part: DicomValueChunk): List[DicomPart]
def onSequenceStart(part: DicomSequence): List[DicomPart]
def onSequenceEnd(part: DicomSequenceDelimitation): List[DicomPart]
def onFragmentsStart(part: DicomFragments): List[DicomPart]
def onFragmentsEnd(part: DicomFragmentsDelimitation): List[DicomPart]
def onSequenceItemStart(part: DicomSequenceItem): List[DicomPart]
def onSequenceItemEnd(part: DicomSequenceItemDelimitation): List[DicomPart]
def onFragmentsItemStart(part: DicomFragmentsItem): List[DicomPart]
def onDeflatedChunk(part: DicomDeflatedChunk): List[DicomPart]
def onUnknownPart(part: DicomUnknownPart): List[DicomPart]
def onPart(part: DicomPart): List[DicomPart]
```
Default behavior to these events are implemented in core classes. The most natural behavior is to simply pass parts on
down the stream, e.g.
```scala
def onPreamble(part: DicomPreamble): List[DicomPart] = part :: Nil
def onHeader(part: DicomHeader): List[DicomPart] = part :: Nil
...
```
This behavior is implemented in the `IdentityFlow` core class. Another option is to defer handling to the `onPart` method
which is implemented in the `DeferToPartFlow` core class. This is appropriate for flows which define a common
behavior for all part types.

To give an example of a custom flow, here is the implementation of a filter that removes
nested sequences from a dataset. We define a nested dataset as a sequence with `depth > 1` given that the root dataset
has `depth = 0`.
```scala
def nestedSequencesFilter() = DicomFlowFactory.create(new DeferToPartFlow with TagPathTracking {
override def onPart(part: DicomPart): List[DicomPart] = if (tagPath.depth() > 1) Nil else part :: Nil
})
```
In this example, we chose to use `DeferToPartFlow` as the core class and mixed in the `TagPathTracking` capability
which gives access to a `tagPath: TagPath` variable at all times which is automatically updated as the flow progresses.
Note that flows with internal state should be defined as functions (`def`) rather than constants/variables `val`/`var`
to avoid shared state within or between flows.
11 changes: 7 additions & 4 deletions build.sbt
Original file line number Diff line number Diff line change
@@ -1,8 +1,11 @@
name := "dcm4che-streams"
version := "0.4"
version := "0.5"
organization := "se.nimsa"
scalaVersion := "2.12.2"
scalaVersion := "2.12.3"
scalacOptions := Seq("-encoding", "UTF-8", "-Xlint", "-deprecation", "-unchecked", "-feature", "-target:jvm-1.8")
scalacOptions in (Compile, doc) ++= Seq(
"-no-link-warnings" // Suppresses problems with Scaladoc @throws links
)

// repos

Expand All @@ -13,13 +16,13 @@ resolvers ++= Seq(
// deps

libraryDependencies ++= {
val akkaVersion = "2.5.2"
val akkaVersion = "2.5.6"
Seq(
"com.typesafe.akka" %% "akka-stream" % akkaVersion,
"com.typesafe.akka" %% "akka-slf4j" % akkaVersion,
"org.slf4j" % "slf4j-simple" % "1.7.25",
"org.dcm4che" % "dcm4che-core" % "3.3.8" % "provided",
"org.scalatest" %% "scalatest" % "3.0.3" % "test",
"org.scalatest" %% "scalatest" % "3.0.4" % "test",
"com.typesafe.akka" %% "akka-stream-testkit" % akkaVersion % "test"
)
}
Expand Down
2 changes: 1 addition & 1 deletion project/build.properties
Original file line number Diff line number Diff line change
@@ -1 +1 @@
sbt.version=0.13.13
sbt.version=0.13.16
6 changes: 3 additions & 3 deletions project/plugins.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,8 @@ resolvers += Resolver.typesafeRepo("releases")

resolvers += Classpaths.typesafeReleases

addSbtPlugin("de.heikoseeberger" % "sbt-header" % "2.0.0")
addSbtPlugin("de.heikoseeberger" % "sbt-header" % "3.0.2")

addSbtPlugin("org.scoverage" % "sbt-scoverage" % "1.5.0")
addSbtPlugin("org.scoverage" % "sbt-scoverage" % "1.5.1")

addSbtPlugin("org.scoverage" % "sbt-coveralls" % "1.1.0")
addSbtPlugin("org.scoverage" % "sbt-coveralls" % "1.2.2")
12 changes: 6 additions & 6 deletions src/main/scala/se/nimsa/dcm4che/streams/ByteStringParser.scala
Original file line number Diff line number Diff line change
Expand Up @@ -37,10 +37,10 @@ abstract class ByteStringParser[T] extends GraphStage[FlowShape[ByteString, T]]

import ByteStringParser._

protected val bytesIn = Inlet[ByteString]("bytesIn")
protected val objOut = Outlet[T]("objOut")
protected val bytesIn: Inlet[ByteString] = Inlet[ByteString]("bytesIn")
protected val objOut: Outlet[T] = Outlet[T]("objOut")

override def initialAttributes = Attributes.name("ByteStringParser")
override def initialAttributes: Attributes = Attributes.name("ByteStringParser")

final override val shape = FlowShape(bytesIn, objOut)

Expand All @@ -54,14 +54,14 @@ abstract class ByteStringParser[T] extends GraphStage[FlowShape[ByteString, T]]

final protected def startWith(step: ParseStep[T]): Unit = current = step

final protected def startInflating(inflateData: InflateData, reader: ByteReader) = {
final protected def startInflating(inflateData: InflateData, reader: ByteReader): Unit = {
if (this.inflateData.isDefined)
throw new IllegalStateException("Inflating can only be started once")
this.inflateData = Some(inflateData)
deflatedBuffer = reader.takeAll()
}

final protected def isInflating = inflateData.isDefined
final protected def isInflating: Boolean = inflateData.isDefined

protected def recursionLimit: Int = 100000

Expand Down Expand Up @@ -234,7 +234,7 @@ object ByteStringParser {

private[this] var off = 0

def setInput(input: ByteString) = {
def setInput(input: ByteString): Unit = {
this.input = input
off = 0
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,13 +63,13 @@ object DicomAttributesSink {
}

/**
* Creates a <code>Sink</code> which ingests DICOM parts as output by the <code>DicomPartFlow</code> followed by the
* Creates a <code>Sink</code> which ingests DICOM parts as output by the <code>DicomParseFlow</code> followed by the
* <code>DicomFlows.attributeFlow</code> and materializes into two dcm4che <code>Attributes</code> objects, one for
* meta data and one for the dataset.
*
* Based heavily and exclusively on the dcm4che
* <a href="https://github.com/dcm4che/dcm4che/blob/master/dcm4che-core/src/test/java/org/dcm4che3/io/DicomInputStreamTest.java">DicomInputStream</a>
* class (complementing what is not covered by <code>DicomPartFlow</code>.
* <a href="https://github.com/dcm4che/dcm4che/blob/master/dcm4che-core/src/main/java/org/dcm4che3/io/DicomInputStream.java">DicomInputStream</a>
* class (complementing what is not covered by <code>DicomParseFlow</code>.
*
* @param ec an implicit ExecutionContext
* @return a <code>Sink</code> for materializing a flow of DICOM parts into dcm4che <code>Attribute</code>s.
Expand Down Expand Up @@ -140,7 +140,7 @@ object DicomAttributesSink {
}
attributesSinkData.copy(maybeAttributesData = attributesSinkData.maybeAttributesData.map(_.copy(currentFragments = None)))

case _: DicomItem =>
case _: DicomSequenceItem =>
val maybeAttributesData = attributesSinkData.maybeAttributesData.flatMap { attributesData =>
attributesData.sequenceStack.headOption.map { seq =>
val attributes = new Attributes(seq.getParent.bigEndian)
Expand All @@ -150,7 +150,7 @@ object DicomAttributesSink {
}
attributesSinkData.copy(maybeAttributesData = maybeAttributesData)

case _: DicomItemDelimitation =>
case _: DicomSequenceItemDelimitation =>
val maybeAttributesData = attributesSinkData.maybeAttributesData.map { attributesData =>
attributesData.attributesStack.headOption
.map { attributes =>
Expand Down

0 comments on commit c7b0250

Please sign in to comment.