Skip to content

Commit

Permalink
Merge 43fb25d into 6fc83cc
Browse files Browse the repository at this point in the history
  • Loading branch information
KarlSjostrand committed Nov 7, 2017
2 parents 6fc83cc + 43fb25d commit 1fd7673
Show file tree
Hide file tree
Showing 4 changed files with 163 additions and 100 deletions.
4 changes: 2 additions & 2 deletions build.sbt
@@ -1,7 +1,7 @@
name := "dcm4che-streams"
version := "0.5"
version := "0.6"
organization := "se.nimsa"
scalaVersion := "2.12.3"
scalaVersion := "2.12.4"
scalacOptions := Seq("-encoding", "UTF-8", "-Xlint", "-deprecation", "-unchecked", "-feature", "-target:jvm-1.8")
scalacOptions in (Compile, doc) ++= Seq(
"-no-link-warnings" // Suppresses problems with Scaladoc @throws links
Expand Down
17 changes: 10 additions & 7 deletions src/main/scala/se/nimsa/dcm4che/streams/DicomFlows.scala
Expand Up @@ -23,6 +23,7 @@ import akka.stream.scaladsl.{Flow, Source}
import akka.util.ByteString
import org.dcm4che3.data.{Tag, VR}
import org.dcm4che3.io.DicomStreamException
import se.nimsa.dcm4che.streams.DicomParsing.{dicomInfo, Info}
import se.nimsa.dcm4che.streams.DicomParts._

/**
Expand Down Expand Up @@ -179,14 +180,14 @@ object DicomFlows {
* attribute found
*/
val validateFlow: Flow[ByteString, ByteString, NotUsed] =
Flow[ByteString].via(new DicomValidateFlow(None))
Flow[ByteString].via(new DicomValidateFlow(None, drainIncoming = false))

/**
* A flow which passes on the input bytes unchanged, fails for non-DICOM files, validates for DICOM files with supported
* Media Storage SOP Class UID, Transfer Syntax UID combination passed as context
*/
def validateFlowWithContext(contexts: Seq[ValidationContext]): Flow[ByteString, ByteString, NotUsed] =
Flow[ByteString].via(new DicomValidateFlow(Some(contexts)))
def validateFlowWithContext(contexts: Seq[ValidationContext], drainIncoming: Boolean): Flow[ByteString, ByteString, NotUsed] =
Flow[ByteString].via(new DicomValidateFlow(Some(contexts), drainIncoming))

/**
* A flow which deflates the dataset but leaves the meta information intact. Useful when the dicom parsing in `DicomParseFlow`
Expand Down Expand Up @@ -444,11 +445,13 @@ object DicomFlows {
{
case fmiAttributes: DicomAttributes =>
if (fmiAttributes.attributes.nonEmpty) {
val fmiAttributesNoLength = fmiAttributes.attributes
.filter(_.header.tag != Tag.FileMetaInformationGroupLength)
val info = dicomInfo(fmiAttributes.attributes.head.header.bytes).getOrElse(Info(bigEndian = false, explicitVR = true, hasFmi = true))
val fmiAttributesNoLength = fmiAttributes.attributes.filter(_.header.tag != Tag.FileMetaInformationGroupLength)
val length = fmiAttributesNoLength.map(_.bytes.length).sum
val lengthHeader = DicomHeader(Tag.FileMetaInformationGroupLength, VR.OB, 4, isFmi = true, bigEndian = false, explicitVR = true, ByteString(2, 0, 0, 0, 85, 76, 4, 0))
val lengthChunk = DicomValueChunk(bigEndian = false, intToBytesLE(length), last = true)
val lengthBytes = tagToBytes(Tag.FileMetaInformationGroupLength, info.bigEndian) ++
(if (info.explicitVR) ByteString("UL") ++ shortToBytes(4, info.bigEndian) else intToBytes(4, info.bigEndian))
val lengthHeader = DicomHeader(Tag.FileMetaInformationGroupLength, VR.UL, 4, isFmi = true, info.bigEndian, info.explicitVR, lengthBytes)
val lengthChunk = DicomValueChunk(info.bigEndian, intToBytes(length, info.bigEndian), last = true)
val fmiParts = fmiAttributesNoLength.toList.flatMap(attribute => attribute.header :: attribute.valueChunks.toList)
fmi = lengthHeader :: lengthChunk :: fmiParts
}
Expand Down
155 changes: 83 additions & 72 deletions src/main/scala/se/nimsa/dcm4che/streams/DicomValidateFlow.scala
Expand Up @@ -27,114 +27,118 @@ import org.dcm4che3.data.Tag._
/**
* A flow which passes on the input bytes unchanged, fails for non-DICOM files, validates for DICOM files with supported
* Media Storage SOP Class UID, Transfer Syntax UID combination passed as context.
*
* @param contexts supported MediaStorageSOPClassUID, TransferSynatxUID combinations
*/
class DicomValidateFlow(contexts: Option[Seq[ValidationContext]]) extends GraphStage[FlowShape[ByteString, ByteString]] {
class DicomValidateFlow(contexts: Option[Seq[ValidationContext]], drainIncoming: Boolean) extends GraphStage[FlowShape[ByteString, ByteString]] {
val in: Inlet[ByteString] = Inlet[ByteString]("DicomValidateFlow.in")
val out: Outlet[ByteString] = Outlet[ByteString]("DicomValidateFlow.out")
override val shape: FlowShape[ByteString, ByteString] = FlowShape.of(in, out)

override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = new GraphStageLogic(shape) {
var buffer: ByteString = ByteString.empty
var isValidated = false
// 378 bytes should be enough when passing context - Media Storage SOP Class UID and Transfer Syntax UID
var isValidated: Option[Boolean] = None
var failException: Option[Throwable] = None

// 378 bytes should be enough when passing context - Media Storage SOP Class UID and Transfer Syntax UID
// otherwise read to first header
val maxBufferLength: Int = if (contexts.isDefined) 512 else 140

setHandlers(in, out, new InHandler with OutHandler {

override def onPull(): Unit = {
override def onPull(): Unit =
pull(in)
}

override def onPush(): Unit = {
val chunk = grab(in)
if (isValidated)
push(out, chunk)
else {
buffer = buffer ++ chunk

if (buffer.length >= maxBufferLength) {
if (isPreamble(buffer)) {
if (DicomParsing.isHeader(buffer.drop(dicomPreambleLength))) {
isValidated match {
case None =>
buffer = buffer ++ chunk

if (buffer.length >= maxBufferLength)
if (isPreamble(buffer))
if (DicomParsing.isHeader(buffer.drop(dicomPreambleLength)))
if (contexts.isDefined) {
val info = DicomParsing.dicomInfo(buffer.drop(dicomPreambleLength)).get
validateFileMetaInformation(buffer.drop(dicomPreambleLength), info, upstreamHasFinished = false)
} else
setValidated()
else {
setFailed(new DicomStreamException("Not a DICOM stream"), upstreamHasFinished = false)
}
else if (DicomParsing.isHeader(buffer))
if (contexts.isDefined) {
val info = DicomParsing.dicomInfo(buffer.drop(dicomPreambleLength)).get
validateFileMetaInformation(buffer.drop(dicomPreambleLength), info)
} else {
val info = DicomParsing.dicomInfo(buffer).get
validateSOPClassUID(buffer, info, upstreamHasFinished = false)
} else
setValidated()
}
} else {
setFailed()
else {
setFailed(new DicomStreamException("Not a DICOM stream"), upstreamHasFinished = false)
}
} else if (DicomParsing.isHeader(buffer)) {
if (contexts.isDefined) {
val info = DicomParsing.dicomInfo(buffer).get
validateSOPClassUID(buffer, info)
} else {
setValidated()
}
} else {
setFailed()
}
} else {
else
pull(in)
case Some(true) =>
push(out, chunk)
case Some(false) =>
pull(in)
}
}
}

override def onUpstreamFinish(): Unit = {
if (!isValidated)
if (contexts.isDefined) {
if (buffer.length >= dicomPreambleLength && isPreamble(buffer)) {
val info = DicomParsing.dicomInfo(buffer.drop(dicomPreambleLength)).get
validateFileMetaInformation(buffer.drop(dicomPreambleLength), info)
} else if (buffer.length >= 8 && DicomParsing.isHeader(buffer)) {
val info = DicomParsing.dicomInfo(buffer).get
validateSOPClassUID(buffer, info)
} else {
setFailed()
}
} else {
if (buffer.length == dicomPreambleLength && isPreamble(buffer))
isValidated match {
case None =>
if (contexts.isDefined)
if (buffer.length >= dicomPreambleLength && isPreamble(buffer)) {
val info = DicomParsing.dicomInfo(buffer.drop(dicomPreambleLength)).get
validateFileMetaInformation(buffer.drop(dicomPreambleLength), info, upstreamHasFinished = true)
} else if (buffer.length >= 8 && DicomParsing.isHeader(buffer)) {
val info = DicomParsing.dicomInfo(buffer).get
validateSOPClassUID(buffer, info, upstreamHasFinished = true)
} else
setFailed(new DicomStreamException("Not a DICOM stream"), upstreamHasFinished = true)
else if (buffer.length == dicomPreambleLength && isPreamble(buffer))
setValidated()
else if (buffer.length >= 8 && DicomParsing.isHeader(buffer))
setValidated()
else
setFailed()
}
completeStage()
setFailed(new DicomStreamException("Not a DICOM stream"), upstreamHasFinished = true)
completeStage()
case Some(true) =>
completeStage()
case Some(false) =>
failStage(failException.get)
}
}


// Find and validate MediaSOPClassUID and TranferSyntaxUID
private def validateFileMetaInformation(data: ByteString, info: Info): Unit = {
private def validateFileMetaInformation(data: ByteString, info: Info, upstreamHasFinished: Boolean): Unit = {
var currentData = data

val (failed, tailData) = findAndValidateField(data, info, "MediaStorageSOPClassUID", (tag: Int) => tag == MediaStorageSOPClassUID, (tag: Int) => (tag & 0xFFFF0000) > 0x00020000)
val (failed, tailData) = findAndValidateField(data, info, "MediaStorageSOPClassUID", (tag: Int) => tag == MediaStorageSOPClassUID, (tag: Int) => (tag & 0xFFFF0000) > 0x00020000, upstreamHasFinished)
if (!failed) {
currentData = tailData
val mscu = DicomParsing.parseUIDAttribute(currentData, info.explicitVR, info.bigEndian)

val (nextFailed, nextTailData) = findAndValidateField(currentData, info, "TransferSyntaxUID", (tag: Int) => tag == TransferSyntaxUID, (tag: Int) => (tag & 0xFFFF0000) > 0x00020000)
val (nextFailed, nextTailData) = findAndValidateField(currentData, info, "TransferSyntaxUID", (tag: Int) => tag == TransferSyntaxUID, (tag: Int) => (tag & 0xFFFF0000) > 0x00020000, upstreamHasFinished)
if (!nextFailed) {
currentData = nextTailData
val tsuid = DicomParsing.parseUIDAttribute(currentData, info.explicitVR, info.bigEndian)

val currentContext = ValidationContext(mscu.value.utf8String, tsuid.value.utf8String)
if (contexts.get.contains(currentContext)) {
if (contexts.get.contains(currentContext))
setValidated()
} else {
failStage(new DicomStreamException(s"The presentation context [SOPClassUID = ${mscu.value.utf8String}, TransferSyntaxUID = ${tsuid.value.utf8String}] is not supported"))
}
else
setFailed(new DicomStreamException(s"The presentation context [SOPClassUID = ${mscu.value.utf8String}, TransferSyntaxUID = ${tsuid.value.utf8String}] is not supported"), upstreamHasFinished)
}
}
}


// Find and validate SOPCLassUID
private def validateSOPClassUID(data: ByteString, info: Info): Unit = {
private def validateSOPClassUID(data: ByteString, info: Info, upstreamHasFinished: Boolean): Unit = {

val (failed, tailData) = findAndValidateField(data, info, "SOPClassUID", (tag: Int) => tag == SOPClassUID, (tag: Int) => tag > SOPClassUID)
val (failed, tailData) = findAndValidateField(data, info, "SOPClassUID", (tag: Int) => tag == SOPClassUID, (tag: Int) => tag > SOPClassUID, upstreamHasFinished)

if (!failed) {
// SOP CLass UID
Expand All @@ -144,27 +148,28 @@ class DicomValidateFlow(contexts: Option[Seq[ValidationContext]]) extends GraphS
val tsuid = info.assumedTransferSyntax

val currentContext = ValidationContext(scuid.value.utf8String, tsuid)
if (contexts.get.contains(currentContext)) {
if (contexts.get.contains(currentContext))
setValidated()
} else {
failStage(new DicomStreamException(s"The presentation context [SOPClassUID = ${scuid.value.utf8String}, TransferSyntaxUID = $tsuid] is not supported"))
}
else
setFailed(new DicomStreamException(s"The presentation context [SOPClassUID = ${scuid.value.utf8String}, TransferSyntaxUID = $tsuid] is not supported"), upstreamHasFinished)
}
}

/**
* Utility method. Search after DICOM Header in byte stream.
* @param data bytes stream
* @param info info object obtained earlier
* @param fieldName dicom field name, used for error log
* @param found field found condition, typically comparison of dicom tags
*
* @param data bytes stream
* @param info info object obtained earlier
* @param fieldName dicom field name, used for error log
* @param found field found condition, typically comparison of dicom tags
* @param stopSearching stop condition
* @return
*/
def findAndValidateField(data: ByteString, info: Info, fieldName: String, found: (Int) => Boolean, stopSearching: (Int) => Boolean ): (Boolean, ByteString) = {
def findAndValidateField(data: ByteString, info: Info, fieldName: String, found: Int => Boolean, stopSearching: Int => Boolean, upstreamHasFinished: Boolean): (Boolean, ByteString) = {
var currentTag = -1
var failed = false
var currentData = data

def takeMax8(buffer: ByteString) = if (buffer.size >= 8) buffer.take(8) else buffer

while (!found(currentTag) && !failed) {
Expand All @@ -173,33 +178,39 @@ class DicomValidateFlow(contexts: Option[Seq[ValidationContext]]) extends GraphS
val (tag, _, headerLength, length) = maybeHeader.get
if (tag < currentTag) {
failed = true
failStage(new DicomStreamException(s"Parse error. Invalid tag order or invalid DICOM header: ${takeMax8(currentData)}."))
setFailed(new DicomStreamException(s"Parse error. Invalid tag order or invalid DICOM header: ${takeMax8(currentData)}."), upstreamHasFinished)
} else {
currentTag = tag
if (!found(tag)) {
if (!found(tag))
currentData = currentData.drop(headerLength + length.toInt)
}
if (stopSearching(currentTag) || currentData.size < 8) {
// read past stop criteria without finding tag, or not enough data left in buffer
failed = true
failStage(new DicomStreamException(s"Not a valid DICOM file. Could not find $fieldName."))
setFailed(new DicomStreamException(s"Not a valid DICOM file. Could not find $fieldName."), upstreamHasFinished)
}
}
} else {
// could not parse header
failed = true
failStage(new DicomStreamException(s"Parse error. Invalid DICOM header: ${takeMax8(currentData)}."))
setFailed(new DicomStreamException(s"Parse error. Invalid DICOM header: ${takeMax8(currentData)}."), upstreamHasFinished)
}
}
(failed, currentData)
}

def setValidated(): Unit = {
isValidated = true
isValidated = Some(true)
push(out, buffer)
}

def setFailed(): Unit = failStage(new DicomStreamException("Not a DICOM stream"))
def setFailed(e: Throwable, upstreamHasFinished: Boolean): Unit = {
isValidated = Some(false)
failException = Some(e)
if (!upstreamHasFinished && drainIncoming)
pull(in)
else
failStage(failException.get)
}

})
}
Expand Down

0 comments on commit 1fd7673

Please sign in to comment.