/
DicomPartFlow.scala
275 lines (244 loc) · 12.1 KB
/
DicomPartFlow.scala
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
/*
* Copyright 2017 Lars Edenbrandt
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package se.nimsa.dcm4che.streams
import java.util.zip.Inflater
import akka.stream._
import akka.stream.stage._
import akka.util.ByteString
import org.dcm4che3.data.{ElementDictionary, VR}
import org.dcm4che3.io.DicomStreamException
import org.dcm4che3.util.TagUtils
import se.nimsa.dcm4che.streams.DicomParts._
/**
* Flow which ingests a stream of bytes and outputs a stream of DICOM file parts such as specified by the <code>DicomPart</code>
* trait. Example DICOM parts are the preamble, headers (tag, VR, length), value chunks (the data in an attribute divided into chunks),
* items, sequences and fragments.
*
* This class is heavily and exclusively based on the dcm4che
* <a href="https://github.com/dcm4che/dcm4che/blob/master/dcm4che-core/src/test/java/org/dcm4che3/io/DicomInputStreamTest.java">DicomInputStream</a>
* class, but adapted to output streaming results using AKKA Streams.
*
* @param chunkSize the maximum size of a DICOM attribute data chunk
* @param stopTag optional stop tag (exclusive) after which reading of incoming data bytes is stopped
* @param inflate indicates whether deflated DICOM data should be deflated and parsed or passed on as deflated data chunks.
*/
class DicomPartFlow(chunkSize: Int = 8192, stopTag: Option[Int] = None, inflate: Boolean = true) extends ByteStringParser[DicomPart] with DicomParsing {
import ByteStringParser._
val transferSyntaxLengthLimit = 1024
protected class DicomParsingLogic extends ParsingLogic with StageLogging {
sealed trait HeaderState {
val bigEndian: Boolean
val explicitVR: Boolean
}
case class DatasetHeaderState(bigEndian: Boolean, explicitVR: Boolean) extends HeaderState
case class FmiHeaderState(tsuid: Option[String], bigEndian: Boolean, explicitVR: Boolean, hasFmi: Boolean, pos: Long, fmiEndPos: Option[Long]) extends HeaderState
case class ValueState(bigEndian: Boolean, bytesLeft: Int, nextStep: ParseStep[DicomPart])
case class FragmentsState(bigEndian: Boolean, explicitVR: Boolean) extends HeaderState
abstract class DicomParseStep extends ParseStep[DicomPart] {
override def onTruncation(reader: ByteReader): Unit = throw new DicomStreamException("DICOM file is truncated")
}
case object AtBeginning extends DicomParseStep {
def parse(reader: ByteReader) = {
val maybePreamble =
if (!isUpstreamClosed || reader.remainingSize >= DICOM_PREAMBLE_LENGTH) {
reader.ensure(DICOM_PREAMBLE_LENGTH)
if (DicomParsing.isPreamble(reader.remainingData.take(DICOM_PREAMBLE_LENGTH)))
Some(DicomPreamble(bytes = reader.take(DICOM_PREAMBLE_LENGTH)))
else None
}
else None
if (maybePreamble.isDefined && !reader.hasRemaining && isUpstreamClosed)
ParseResult(maybePreamble, FinishedParser)
else {
reader.ensure(8)
DicomParsing.dicomInfo(reader.remainingData.take(8)).map { info =>
val nextState = if (info.hasFmi)
InFmiHeader(FmiHeaderState(None, info.bigEndian, info.explicitVR, info.hasFmi, 0, None))
else
InDatasetHeader(DatasetHeaderState(info.bigEndian, info.explicitVR), None)
ParseResult(maybePreamble, nextState)
}.getOrElse(throw new DicomStreamException("Not a DICOM stream"))
}
}
}
case class InFmiHeader(state: FmiHeaderState) extends DicomParseStep {
def parse(reader: ByteReader) = {
val (tag, vr, headerLength, valueLength) = readHeader(reader, state)
if (DicomParsing.groupNumber(tag) != 2) {
log.warning("Missing or wrong File Meta Information Group Length (0002,0000)")
reader.ensure(valueLength + 2)
ParseResult(None, toDatasetStep(reader.remainingData.drop(valueLength).take(2), state))
} else {
// no meta attributes can lead to vr = null
val updatedVr = if (vr == VR.UN) ElementDictionary.getStandardElementDictionary.vrOf(tag) else vr
val bytes = reader.take(headerLength)
val updatedPos = state.pos + headerLength + valueLength
val updatedState = tag match {
case 0x00020000 => // meta info length
reader.ensure(4)
val valueBytes = reader.remainingData.take(4)
state.copy(pos = updatedPos, fmiEndPos = Some(updatedPos + DicomParsing.bytesToInt(valueBytes, state.bigEndian)))
case 0x00020010 => // transfer syntax
if (valueLength < transferSyntaxLengthLimit) {
reader.ensure(valueLength)
val valueBytes = reader.remainingData.take(valueLength)
state.copy(tsuid = Some(valueBytes.utf8String.trim), pos = updatedPos)
} else {
log.warning("Transfer syntax data is very large, skipping")
state.copy(pos = updatedPos)
}
case _ =>
state.copy(pos = updatedPos)
}
val part = Some(DicomHeader(tag, updatedVr, valueLength, isFmi = true, state.bigEndian, state.explicitVR, bytes))
val nextStep = updatedState.fmiEndPos.filter(_ <= updatedPos) match {
case Some(_) =>
reader.ensure(valueLength + 2)
toDatasetStep(reader.remainingData.drop(valueLength).take(2), updatedState)
case None =>
InFmiHeader(updatedState)
}
ParseResult(part, InValue(ValueState(updatedState.bigEndian, valueLength, nextStep)))
}
}
}
case class InDatasetHeader(state: DatasetHeaderState, inflater: Option[InflateData]) extends DicomParseStep {
def parse(reader: ByteReader) = {
val attribute = readDatasetHeader(reader, state)
val nextState = attribute.map {
case DicomHeader(_, _, length, _, bigEndian, _, _) => InValue(ValueState(bigEndian, length, InDatasetHeader(state, inflater)))
case DicomFragments(_, _, bigEndian, _) => InFragments(FragmentsState(bigEndian, state.explicitVR), inflater)
case _ => InDatasetHeader(state, inflater)
}.getOrElse(FinishedParser)
ParseResult(attribute, nextState)
}
}
case class InValue(state: ValueState) extends DicomParseStep {
def parse(reader: ByteReader) = {
val parseResult =
if (state.bytesLeft <= chunkSize)
ParseResult(Some(DicomValueChunk(state.bigEndian, reader.take(state.bytesLeft), last = true)), state.nextStep)
else
ParseResult(Some(DicomValueChunk(state.bigEndian, reader.take(chunkSize), last = false)), InValue(state.copy(bytesLeft = state.bytesLeft - chunkSize)))
state.nextStep match {
case ds: InDatasetHeader if ds.inflater.isDefined && !isInflating =>
startInflating(ds.inflater.get, reader)
case _ =>
}
parseResult
}
}
case class InFragments(state: FragmentsState, inflater: Option[InflateData]) extends DicomParseStep {
def parse(reader: ByteReader) = {
val (tag, _, headerLength, valueLength) = readHeader(reader, state)
tag match {
case 0xFFFEE000 => // begin item
ParseResult(Some(DicomItem(valueLength, state.bigEndian, reader.take(headerLength))), InValue(ValueState(state.bigEndian, valueLength, this)))
case 0xFFFEE0DD => // end fragments
if (valueLength != 0) {
log.warning(s"Unexpected fragments delimitation length $valueLength")
}
ParseResult(Some(DicomFragmentsDelimitation(state.bigEndian, reader.take(headerLength))), InDatasetHeader(DatasetHeaderState(state.bigEndian, state.explicitVR), inflater))
case _ =>
log.warning(s"Unexpected attribute (${TagUtils.toHexString(tag)}) in fragments with length=$valueLength")
ParseResult(Some(DicomUnknownPart(state.bigEndian, reader.take(headerLength + valueLength))), this)
}
}
}
case class InDeflatedData(bigEndian: Boolean) extends DicomParseStep {
def parse(reader: ByteReader) = ParseResult(Some(DicomDeflatedChunk(bigEndian, reader.take(chunkSize))), this)
override def onTruncation(reader: ByteReader): Unit = {
emit(objOut, DicomDeflatedChunk(bigEndian, reader.takeAll()))
completeStage()
}
}
def toDatasetStep(firstTwoBytes: ByteString, state: FmiHeaderState): DicomParseStep = {
val tsuid = state.tsuid.getOrElse {
log.warning("Missing Transfer Syntax (0002,0010) - assume Explicit VR Little Endian")
"1.2.840.10008.1.2.1"
}
val deflatedTs = tsuid == "1.2.840.10008.1.2.1.99" || tsuid == "1.2.840.10008.1.2.4.95"
if (deflatedTs)
if (inflate) {
val inflater =
if (hasZLIBHeader(firstTwoBytes)) {
log.warning("Deflated DICOM Stream with ZLIB Header")
new Inflater()
}
else
new Inflater(true)
InDatasetHeader(DatasetHeaderState(
bigEndian = tsuid == "1.2.840.10008.1.2.2",
explicitVR = tsuid != "1.2.840.10008.1.2"),
Some(InflateData(inflater, new Array[Byte](chunkSize))))
}
else
InDeflatedData(state.bigEndian)
else
InDatasetHeader(DatasetHeaderState(
bigEndian = tsuid == "1.2.840.10008.1.2.2",
explicitVR = tsuid != "1.2.840.10008.1.2"),
None)
}
private def hasZLIBHeader(firstTwoBytes: ByteString): Boolean = {
bytesToUShortBE(firstTwoBytes) == 0x789C
}
def readHeader(reader: ByteReader, dicomState: HeaderState): (Int, VR, Int, Int) = {
reader.ensure(8)
val tagVr = reader.remainingData.take(8)
val (tag, vr) = DicomParsing.tagVr(tagVr, dicomState.bigEndian, dicomState.explicitVR)
if (vr == null)
(tag, vr, 8, bytesToInt(tagVr.drop(4), dicomState.bigEndian))
else if (dicomState.explicitVR)
if (vr.headerLength == 8)
(tag, vr, 8, bytesToUShort(tagVr.drop(6), dicomState.bigEndian))
else {
reader.ensure(12)
(tag, vr, 12, bytesToInt(reader.remainingData.drop(8), dicomState.bigEndian))
}
else
(tag, VR.UN, 8, bytesToInt(tagVr.drop(4), dicomState.bigEndian))
}
def readDatasetHeader(reader: ByteReader, state: DatasetHeaderState): Option[DicomPart] = {
val (tag, vr, headerLength, valueLength) = readHeader(reader, state)
// println(s"$tag $vr $headerLength $valueLength")
if (stopTag.isDefined && tag == stopTag.get)
None
else if (vr != null) {
val updatedVr1 = if (vr == VR.UN) ElementDictionary.getStandardElementDictionary.vrOf(tag) else vr
val updatedVr2 = if ((updatedVr1 == VR.UN) && valueLength == -1) VR.SQ else updatedVr1
val bytes = reader.take(headerLength)
if (vr == VR.SQ)
Some(DicomSequence(tag, state.bigEndian, bytes))
else if (valueLength == -1)
Some(DicomFragments(tag, vr, state.bigEndian, bytes))
else
Some(DicomHeader(tag, updatedVr2, valueLength, isFmi = false, state.bigEndian, state.explicitVR, bytes))
} else
tag match {
case 0xFFFEE000 => Some(DicomItem(valueLength, state.bigEndian, reader.take(8)))
case 0xFFFEE00D => Some(DicomItemDelimitation(state.bigEndian, reader.take(8)))
case 0xFFFEE0DD => Some(DicomSequenceDelimitation(state.bigEndian, reader.take(8)))
case _ => Some(DicomUnknownPart(state.bigEndian, reader.take(headerLength))) // cannot happen
}
}
startWith(AtBeginning)
}
override def createLogic(attr: Attributes) = new DicomParsingLogic()
}
object DicomPartFlow {
val partFlow = new DicomPartFlow()
}