diff --git a/src/main/scala/se/nimsa/dcm4che/streams/DicomFlows.scala b/src/main/scala/se/nimsa/dcm4che/streams/DicomFlows.scala index 9eb61e5..f4d6d6e 100644 --- a/src/main/scala/se/nimsa/dcm4che/streams/DicomFlows.scala +++ b/src/main/scala/se/nimsa/dcm4che/streams/DicomFlows.scala @@ -205,25 +205,32 @@ object DicomFlows { */ def validateFlowWithContext(contexts: Seq[ValidationContext]) = Flow[ByteString].via(new DicomValidateFlow(Some(contexts))) + /** + * Class used to specify modifications to individual attributes of a dataset + * @param tag tag number + * @param modification a modification function + * @param insert if tag is absent in dataset it will be created and inserted when `true` + */ + case class TagModification(tag: Int, modification: ByteString => ByteString, insert: Boolean) + /** * Simple transformation flow for inserting or overwriting the values of specified attributes. Only modifies or * inserts attributes in the root dataset, not inside sequences. When inserting a new attribute, the corresponding * transform will be called with an empty `ByteString`. * - * @param insertIfAbsent Determine if absent attributes with specified transforms will be inserted or if this flow - * modfies attributes only. - * @param transforms Each transform is a 2-tuple of tag number and value transform - * @return the transformed flow of DICOM parts + * @param modifications Any number of `TagModification`s each specifying a tag number, a modification function, and + * a Boolean indicating whether absent values will be inserted or skipped. + * @return the modified flow of DICOM parts */ - def modifyFlow(insertIfAbsent: Boolean, transforms: (Int, ByteString => ByteString)*): Flow[DicomPart, DicomPart, NotUsed] = + def modifyFlow(modifications: TagModification*): Flow[DicomPart, DicomPart, NotUsed] = Flow[DicomPart] .concat(Source.single(DicomEndMarker)) .statefulMapConcat { () => - var tagsToProcess = transforms.map(_._1) + var modificationsLeft = modifications.toList var value = ByteString.empty - var headerMaybe: Option[DicomHeader] = None - var transformMaybe: Option[ByteString => ByteString] = None + var currentHeader: Option[DicomHeader] = None + var currentModification: Option[TagModification] = None var sequenceDepth = 0 var bigEndian = false var explicitVR = true @@ -234,41 +241,49 @@ object DicomFlows { } def headerAndValue(tag: Int): List[DicomPart] = { - val transform = transforms.find(_._1 == tag).map(_._2).head - val valueBytes = transform(ByteString.empty) + val modification = modifications.find(_.tag == tag).map(_.modification).head + val valueBytes = modification(ByteString.empty) val dictVr = StandardElementDictionary.INSTANCE.vrOf(tag) if (dictVr == VR.SQ) throw new IllegalArgumentException("Cannot insert sequence attributes") val vr = if (dictVr == VR.UN) VR.LO else dictVr val isFmi = DicomParsing.isFileMetaInformation(tag) val header = DicomHeader(tag, vr, valueBytes.length, isFmi, bigEndian, explicitVR) val value = DicomValueChunk(bigEndian, valueBytes, last = true) - tagsToProcess = tagsToProcess.filterNot(_ == tag) + modificationsLeft = modificationsLeft.filterNot(_.tag == tag) header :: value :: Nil } { - case header: DicomHeader if sequenceDepth == 0 && insertIfAbsent && tagsToProcess.exists(_ < header.tag) => - updateSyntax(header) - val tag = tagsToProcess.find(_ < header.tag).head - headerAndValue(tag) ::: header :: Nil - case header: DicomHeader if sequenceDepth == 0 && tagsToProcess.contains(header.tag) => - updateSyntax(header) - headerMaybe = Some(header) - value = ByteString.empty - transformMaybe = transforms.find(_._1 == header.tag).map(_._2) - tagsToProcess = tagsToProcess.filterNot(_ == header.tag) - Nil case header: DicomHeader => updateSyntax(header) - header :: Nil - case chunk: DicomValueChunk if transformMaybe.isDefined && headerMaybe.isDefined => + if (sequenceDepth == 0) + modificationsLeft + .find(_.tag < header.tag) + .filter(_.insert) + .map(modification => headerAndValue(modification.tag) ::: header :: Nil) + .getOrElse(modificationsLeft + .find(_.tag == header.tag) + .map { modification => + currentHeader = Some(header) + value = ByteString.empty + currentModification = Some(modification) + modificationsLeft = modificationsLeft.filterNot(_.tag == header.tag) + Nil + } + .getOrElse( + header :: Nil + ) + ) + else + header :: Nil + case chunk: DicomValueChunk if currentModification.isDefined && currentHeader.isDefined => value = value ++ chunk.bytes if (chunk.last) { - val newValue = transformMaybe.map(t => t(value)).getOrElse(value) - val newHeader = headerMaybe.get.withUpdatedLength(newValue.length.toShort) - transformMaybe = None - headerMaybe = None - newHeader :: DicomValueChunk(chunk.bigEndian, newValue, last = true) :: Nil + val newValue = currentModification.get.modification(value) + val newHeader = currentHeader.get.withUpdatedLength(newValue.length.toShort) + currentModification = None + currentHeader = None + newHeader :: DicomValueChunk(bigEndian, newValue, last = true) :: Nil } else Nil case s: DicomSequence => @@ -277,10 +292,10 @@ object DicomFlows { case s: DicomSequenceDelimitation => sequenceDepth -= 1 s :: Nil - case DicomEndMarker if insertIfAbsent => - tagsToProcess.flatMap(tag => headerAndValue(tag)).toList case DicomEndMarker => - Nil + modificationsLeft + .filter(_.insert) + .flatMap(modification => headerAndValue(modification.tag)) case part => part :: Nil } diff --git a/src/test/scala/se/nimsa/dcm4che/streams/DicomFlowsTest.scala b/src/test/scala/se/nimsa/dcm4che/streams/DicomFlowsTest.scala index 8b34e30..e76a775 100644 --- a/src/test/scala/se/nimsa/dcm4che/streams/DicomFlowsTest.scala +++ b/src/test/scala/se/nimsa/dcm4che/streams/DicomFlowsTest.scala @@ -254,7 +254,9 @@ class DicomFlowsTest extends TestKit(ActorSystem("DicomAttributesSinkSpec")) wit val source = Source.single(bytes) .via(new DicomPartFlow()) - .via(modifyFlow(insertIfAbsent = false, (Tag.PatientName, _ => mikeBytes), (Tag.StudyDate, _ => ByteString.empty))) + .via(modifyFlow( + TagModification(Tag.PatientName, _ => mikeBytes, insert = false), + TagModification(Tag.StudyDate, _ => ByteString.empty, insert = false))) source.runWith(TestSink.probe[DicomPart]) .expectHeader(Tag.PatientName, VR.PN, mikeBytes.length) @@ -271,7 +273,7 @@ class DicomFlowsTest extends TestKit(ActorSystem("DicomAttributesSinkSpec")) wit val source = Source.single(bytes) .via(new DicomPartFlow()) - .via(modifyFlow(insertIfAbsent = false, (Tag.PatientName, _ => mikeBytes))) + .via(modifyFlow(TagModification(Tag.PatientName, _ => mikeBytes, insert = false))) source.runWith(TestSink.probe[DicomPart]) .expectSequence(Tag.DerivationCodeSequence) @@ -290,7 +292,7 @@ class DicomFlowsTest extends TestKit(ActorSystem("DicomAttributesSinkSpec")) wit val source = Source.single(bytes) .via(new DicomPartFlow()) - .via(modifyFlow(insertIfAbsent = true, (Tag.StudyDate, _ => studyDate.drop(8)))) + .via(modifyFlow(TagModification(Tag.StudyDate, _ => studyDate.drop(8), insert = true))) source.runWith(TestSink.probe[DicomPart]) .expectHeader(Tag.StudyDate, VR.DA, studyDate.length - 8) @@ -306,7 +308,7 @@ class DicomFlowsTest extends TestKit(ActorSystem("DicomAttributesSinkSpec")) wit val source = Source.single(bytes) .via(new DicomPartFlow()) - .via(modifyFlow(insertIfAbsent = true, (Tag.PatientName, _ => patientNameJohnDoe.drop(8)))) + .via(modifyFlow(TagModification(Tag.PatientName, _ => patientNameJohnDoe.drop(8), insert = true))) source.runWith(TestSink.probe[DicomPart]) .expectHeader(Tag.StudyDate, VR.DA, studyDate.length - 8) @@ -324,9 +326,8 @@ class DicomFlowsTest extends TestKit(ActorSystem("DicomAttributesSinkSpec")) wit .via(new DicomPartFlow()) .via(deflateDatasetFlow()) .via(modifyFlow( - insertIfAbsent = false, - (Tag.FileMetaInformationGroupLength, _ => fmiGroupLength(tsuidDeflatedExplicitLE)), - (Tag.TransferSyntaxUID, _ => tsuidDeflatedExplicitLE.drop(8)))) + TagModification(Tag.FileMetaInformationGroupLength, _ => fmiGroupLength(tsuidDeflatedExplicitLE), insert = false), + TagModification(Tag.TransferSyntaxUID, _ => tsuidDeflatedExplicitLE.drop(8), insert = false))) .map(_.bytes) .via(new DicomPartFlow())