Skip to content

Commit

Permalink
Better API for modifyFlow
Browse files Browse the repository at this point in the history
  • Loading branch information
KarlSjostrand committed May 25, 2017
1 parent 09a08f7 commit e7a6ee2
Show file tree
Hide file tree
Showing 2 changed files with 55 additions and 39 deletions.
79 changes: 47 additions & 32 deletions src/main/scala/se/nimsa/dcm4che/streams/DicomFlows.scala
Original file line number Diff line number Diff line change
Expand Up @@ -205,25 +205,32 @@ object DicomFlows {
*/
def validateFlowWithContext(contexts: Seq[ValidationContext]) = Flow[ByteString].via(new DicomValidateFlow(Some(contexts)))

/**
* Class used to specify modifications to individual attributes of a dataset
* @param tag tag number
* @param modification a modification function
* @param insert if tag is absent in dataset it will be created and inserted when `true`
*/
case class TagModification(tag: Int, modification: ByteString => ByteString, insert: Boolean)

/**
* Simple transformation flow for inserting or overwriting the values of specified attributes. Only modifies or
* inserts attributes in the root dataset, not inside sequences. When inserting a new attribute, the corresponding
* transform will be called with an empty `ByteString`.
*
* @param insertIfAbsent Determine if absent attributes with specified transforms will be inserted or if this flow
* modfies attributes only.
* @param transforms Each transform is a 2-tuple of tag number and value transform
* @return the transformed flow of DICOM parts
* @param modifications Any number of `TagModification`s each specifying a tag number, a modification function, and
* a Boolean indicating whether absent values will be inserted or skipped.
* @return the modified flow of DICOM parts
*/
def modifyFlow(insertIfAbsent: Boolean, transforms: (Int, ByteString => ByteString)*): Flow[DicomPart, DicomPart, NotUsed] =
def modifyFlow(modifications: TagModification*): Flow[DicomPart, DicomPart, NotUsed] =
Flow[DicomPart]
.concat(Source.single(DicomEndMarker))
.statefulMapConcat {
() =>
var tagsToProcess = transforms.map(_._1)
var modificationsLeft = modifications.toList
var value = ByteString.empty
var headerMaybe: Option[DicomHeader] = None
var transformMaybe: Option[ByteString => ByteString] = None
var currentHeader: Option[DicomHeader] = None
var currentModification: Option[TagModification] = None
var sequenceDepth = 0
var bigEndian = false
var explicitVR = true
Expand All @@ -234,41 +241,49 @@ object DicomFlows {
}

def headerAndValue(tag: Int): List[DicomPart] = {
val transform = transforms.find(_._1 == tag).map(_._2).head
val valueBytes = transform(ByteString.empty)
val modification = modifications.find(_.tag == tag).map(_.modification).head
val valueBytes = modification(ByteString.empty)
val dictVr = StandardElementDictionary.INSTANCE.vrOf(tag)
if (dictVr == VR.SQ) throw new IllegalArgumentException("Cannot insert sequence attributes")
val vr = if (dictVr == VR.UN) VR.LO else dictVr
val isFmi = DicomParsing.isFileMetaInformation(tag)
val header = DicomHeader(tag, vr, valueBytes.length, isFmi, bigEndian, explicitVR)
val value = DicomValueChunk(bigEndian, valueBytes, last = true)
tagsToProcess = tagsToProcess.filterNot(_ == tag)
modificationsLeft = modificationsLeft.filterNot(_.tag == tag)
header :: value :: Nil
}

{
case header: DicomHeader if sequenceDepth == 0 && insertIfAbsent && tagsToProcess.exists(_ < header.tag) =>
updateSyntax(header)
val tag = tagsToProcess.find(_ < header.tag).head
headerAndValue(tag) ::: header :: Nil
case header: DicomHeader if sequenceDepth == 0 && tagsToProcess.contains(header.tag) =>
updateSyntax(header)
headerMaybe = Some(header)
value = ByteString.empty
transformMaybe = transforms.find(_._1 == header.tag).map(_._2)
tagsToProcess = tagsToProcess.filterNot(_ == header.tag)
Nil
case header: DicomHeader =>
updateSyntax(header)
header :: Nil
case chunk: DicomValueChunk if transformMaybe.isDefined && headerMaybe.isDefined =>
if (sequenceDepth == 0)
modificationsLeft
.find(_.tag < header.tag)
.filter(_.insert)
.map(modification => headerAndValue(modification.tag) ::: header :: Nil)
.getOrElse(modificationsLeft
.find(_.tag == header.tag)
.map { modification =>
currentHeader = Some(header)
value = ByteString.empty
currentModification = Some(modification)
modificationsLeft = modificationsLeft.filterNot(_.tag == header.tag)
Nil
}
.getOrElse(
header :: Nil
)
)
else
header :: Nil
case chunk: DicomValueChunk if currentModification.isDefined && currentHeader.isDefined =>
value = value ++ chunk.bytes
if (chunk.last) {
val newValue = transformMaybe.map(t => t(value)).getOrElse(value)
val newHeader = headerMaybe.get.withUpdatedLength(newValue.length.toShort)
transformMaybe = None
headerMaybe = None
newHeader :: DicomValueChunk(chunk.bigEndian, newValue, last = true) :: Nil
val newValue = currentModification.get.modification(value)
val newHeader = currentHeader.get.withUpdatedLength(newValue.length.toShort)
currentModification = None
currentHeader = None
newHeader :: DicomValueChunk(bigEndian, newValue, last = true) :: Nil
} else
Nil
case s: DicomSequence =>
Expand All @@ -277,10 +292,10 @@ object DicomFlows {
case s: DicomSequenceDelimitation =>
sequenceDepth -= 1
s :: Nil
case DicomEndMarker if insertIfAbsent =>
tagsToProcess.flatMap(tag => headerAndValue(tag)).toList
case DicomEndMarker =>
Nil
modificationsLeft
.filter(_.insert)
.flatMap(modification => headerAndValue(modification.tag))
case part =>
part :: Nil
}
Expand Down
15 changes: 8 additions & 7 deletions src/test/scala/se/nimsa/dcm4che/streams/DicomFlowsTest.scala
Original file line number Diff line number Diff line change
Expand Up @@ -254,7 +254,9 @@ class DicomFlowsTest extends TestKit(ActorSystem("DicomAttributesSinkSpec")) wit

val source = Source.single(bytes)
.via(new DicomPartFlow())
.via(modifyFlow(insertIfAbsent = false, (Tag.PatientName, _ => mikeBytes), (Tag.StudyDate, _ => ByteString.empty)))
.via(modifyFlow(
TagModification(Tag.PatientName, _ => mikeBytes, insert = false),
TagModification(Tag.StudyDate, _ => ByteString.empty, insert = false)))

source.runWith(TestSink.probe[DicomPart])
.expectHeader(Tag.PatientName, VR.PN, mikeBytes.length)
Expand All @@ -271,7 +273,7 @@ class DicomFlowsTest extends TestKit(ActorSystem("DicomAttributesSinkSpec")) wit

val source = Source.single(bytes)
.via(new DicomPartFlow())
.via(modifyFlow(insertIfAbsent = false, (Tag.PatientName, _ => mikeBytes)))
.via(modifyFlow(TagModification(Tag.PatientName, _ => mikeBytes, insert = false)))

source.runWith(TestSink.probe[DicomPart])
.expectSequence(Tag.DerivationCodeSequence)
Expand All @@ -290,7 +292,7 @@ class DicomFlowsTest extends TestKit(ActorSystem("DicomAttributesSinkSpec")) wit

val source = Source.single(bytes)
.via(new DicomPartFlow())
.via(modifyFlow(insertIfAbsent = true, (Tag.StudyDate, _ => studyDate.drop(8))))
.via(modifyFlow(TagModification(Tag.StudyDate, _ => studyDate.drop(8), insert = true)))

source.runWith(TestSink.probe[DicomPart])
.expectHeader(Tag.StudyDate, VR.DA, studyDate.length - 8)
Expand All @@ -306,7 +308,7 @@ class DicomFlowsTest extends TestKit(ActorSystem("DicomAttributesSinkSpec")) wit

val source = Source.single(bytes)
.via(new DicomPartFlow())
.via(modifyFlow(insertIfAbsent = true, (Tag.PatientName, _ => patientNameJohnDoe.drop(8))))
.via(modifyFlow(TagModification(Tag.PatientName, _ => patientNameJohnDoe.drop(8), insert = true)))

source.runWith(TestSink.probe[DicomPart])
.expectHeader(Tag.StudyDate, VR.DA, studyDate.length - 8)
Expand All @@ -324,9 +326,8 @@ class DicomFlowsTest extends TestKit(ActorSystem("DicomAttributesSinkSpec")) wit
.via(new DicomPartFlow())
.via(deflateDatasetFlow())
.via(modifyFlow(
insertIfAbsent = false,
(Tag.FileMetaInformationGroupLength, _ => fmiGroupLength(tsuidDeflatedExplicitLE)),
(Tag.TransferSyntaxUID, _ => tsuidDeflatedExplicitLE.drop(8))))
TagModification(Tag.FileMetaInformationGroupLength, _ => fmiGroupLength(tsuidDeflatedExplicitLE), insert = false),
TagModification(Tag.TransferSyntaxUID, _ => tsuidDeflatedExplicitLE.drop(8), insert = false)))
.map(_.bytes)
.via(new DicomPartFlow())

Expand Down

0 comments on commit e7a6ee2

Please sign in to comment.