-
Notifications
You must be signed in to change notification settings - Fork 2
/
import.clj
441 lines (390 loc) · 17.1 KB
/
import.clj
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
(ns com.eldrix.dmd.import
"Support the UK NHS dm+d XML data files.
This namespace provides a thin wrapper over the data files, keeping the
original structures as much as possible and thus facilitating adapting
to changes in those definitions as they occur.
For more information see
https://www.nhsbsa.nhs.uk/sites/default/files/2021-02/dm%2Bd%20data%20model%20%28V2%29%2002.2021.pdf
https://www.nhsbsa.nhs.uk/sites/default/files/2017-02/Technical_Specification_of_data_files_R2_v3.1_May_2015.pdf
https://www.nhsbsa.nhs.uk/sites/default/files/2017-02/Data_Model_R2_v3.1_May_2015.pdf"
(:require [clojure.core.async :as a]
[clojure.data.xml :as xml]
[clojure.data.zip.xml :as zx]
[clojure.java.io :as io]
[clojure.pprint]
[clojure.string :as str]
[clojure.tools.logging.readable :as log]
[clojure.zip])
(:import [java.time LocalDate]
(java.time.format DateTimeFormatter DateTimeParseException)
(java.util List)))
(set! *warn-on-reflection* true)
;; dm+d date format = CCYY-MM-DD
(defn- parse-date ^LocalDate [^String s] (try (LocalDate/parse s DateTimeFormatter/ISO_LOCAL_DATE) (catch DateTimeParseException _)))
(defn- unsafe-parse-long ^Long [^String s] (Long/parseLong s))
(defn- parse-integer ^Integer [^String s] (Integer/parseInt s))
(defn- parse-flag ^Boolean [^String s] (boolean (= 1 (Integer/parseInt s)))) ;; just for fun, they sometimes use "1" or "0001" for flags...
(defn- unsafe-parse-double ^Double [^String s] (Double/parseDouble s))
(def ^:private file-ordering
"Order of file import for relational integrity, if needed."
[:LOOKUP :INGREDIENT :VTM :VMP :AMP :VMPP :AMPP :GTIN :BNF])
(def ^DateTimeFormatter df
(DateTimeFormatter/ofPattern "ddMMyy"))
(def ^:private file-matcher
"There is no formal specification for filename structure, but this is the de
facto standard."
#"^f_([a-z]*)\d_\d(\d{6})\.xml$")
(defn ^:private parse-dmd-filename
"Parse a dm+d filename if possible."
[f]
(let [f2 (clojure.java.io/as-file f)]
(when-let [[_ nm date] (re-matches file-matcher (.getName f2))]
(let [kw (keyword (str/upper-case nm))]
{:type kw
:date (LocalDate/parse date df)
:order (.indexOf ^List file-ordering kw)
:file f2}))))
(defn should-include?
[include exclude file-type]
(when (or (nil? include) (contains? include file-type))
(not (contains? exclude file-type))))
(defn dmd-file-seq
"Return an ordered sequence of dm+d files from the directory specified.
Components are returned in an order to support referential integrity.
Each result is a map containing :type, :date, :order and :file.
Optionally takes a set of file types to include or exclude."
([dir]
(->> dir
clojure.java.io/file
file-seq
(map parse-dmd-filename)
(filter some?)
(sort-by :order)))
([dir & {:keys [include exclude]}]
(filter #(should-include? include exclude (:type %)) (dmd-file-seq dir))))
(defn get-release-metadata
"Return release metadata from the directory specified.
Unfortunately, the dm+d distribution does not include a metadata file
containing release information, so version information is derived from
the filenames within the release.
Parameters:
- dir : directory to examine
Result:
- a map containing release information:
|- :release-date - date of the release (java.time.LocalDate)
As far as I am aware, there is no formal specification for dm+d filenames,
but currently the last six digits of the filename are a date of format
'ddMMyy' so we use the latest date as the date of the release."
[dir]
(when-let [release-date (last (sort (map :date (dmd-file-seq dir))))]
{:release-date release-date}))
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
;;;; Generic dm+d parsing functionality
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
(def ^:private property-parsers
{;; VTM properties
:VTMID unsafe-parse-long
:INVALID parse-flag
:VTMIDPREV unsafe-parse-long
:VTMIDDT parse-date
;; VMP properties
:VPID unsafe-parse-long
:VPIDDT parse-date
:VPIDPREV unsafe-parse-long
:BASISCD unsafe-parse-long
:NMDT parse-date
:BASIS_PREVCD unsafe-parse-long
:NMCHANGECD unsafe-parse-long
:COMBPRODCD unsafe-parse-long
:PRES_STATCD unsafe-parse-long
:SUG_F parse-flag
:GLU_F parse-flag
:PRES_F parse-flag
:CFC_F parse-flag
:NON_AVAILCD unsafe-parse-long
:NON_AVAILDT parse-date
:DF_INDCD unsafe-parse-long
:UDFS unsafe-parse-double
:UDFS_UOMCD unsafe-parse-long
:UNIT_DOSE_UOMCD unsafe-parse-long
:ISID unsafe-parse-long
:BASIS_STRNTCD unsafe-parse-long
:BS_SUBID unsafe-parse-long
:STRNT_NMRTR_VAL unsafe-parse-double
:STRNT_NMRTR_UOMCD unsafe-parse-long
:STRNT_DNMTR_VAL unsafe-parse-double
:STRNT_DNMTR_UOMCD unsafe-parse-long
:FORMCD unsafe-parse-long
:ROUTECD unsafe-parse-long
:CATCD unsafe-parse-long
:CATDT parse-date
:CAT_PREVCD unsafe-parse-long
;; AMP properties
:APID unsafe-parse-long
:SUPPCD unsafe-parse-long
:LIC_AUTHCD unsafe-parse-long
:LIC_AUTH_PREVCD unsafe-parse-long
:LIC_AUTHCHANGECD unsafe-parse-long
:LIC_AUTHCHANGEDT parse-date
:FLAVOURCD unsafe-parse-long
:EMA parse-flag
:PARALLEL_IMPORT parse-flag
:AVAIL_RESTRICTCD unsafe-parse-long
:STRNTH unsafe-parse-double
:UOMCD unsafe-parse-long
:COLOURCD unsafe-parse-long
;; VMPP properties
:VPPID unsafe-parse-long
:QTYVAL unsafe-parse-double
:QTY_UOMCD unsafe-parse-long
:COMBPACKCD unsafe-parse-long
:PAY_CATCD unsafe-parse-long
:PRICE parse-integer
:DT parse-date
:PREVPRICE parse-integer
:PRNTVPPID unsafe-parse-long
:CHLDVPPID unsafe-parse-long
;; AMPP properties
:APPID unsafe-parse-long
:LEGAL_CATCD unsafe-parse-long
:DISCCD unsafe-parse-long
:DISCDT parse-date
:REIMB_STATCD unsafe-parse-long
:REIMB_STATDT parse-date
:REIMB_STATPREVCD unsafe-parse-long
:SCHED_2 parse-flag
:ACBS parse-flag
:PADM parse-flag
:FP10_MDA parse-flag
:SCHED_1 parse-flag
:HOSP parse-flag
:NURSE_F parse-flag
:ENURSE_F parse-flag
:DENT_F parse-flag
:PRICEDT parse-date
:PRICE_BASISCD unsafe-parse-long
:PX_CHRGS parse-flag
:DISP_FEES parse-flag ;; unlike the documentation, this is actually a flag (1 or omitted).
:BB parse-flag
:CAL_PACK parse-flag
:SPEC_CONTCD unsafe-parse-long
:FP34D parse-flag
:PRNTAPPID unsafe-parse-long
:CHLDAPPID unsafe-parse-long
;; ingredients
:ISIDPREV unsafe-parse-long
:ISIDDT parse-date
;; lookups
:CD unsafe-parse-long
:CDPREV unsafe-parse-long
:CDDT parse-date
;; BNF / extras
:DDD_UOMCD unsafe-parse-long
:DDD unsafe-parse-double
:STARTDT parse-date
:ENDDT parse-date})
(defn- parse-property [k v]
(if-let [parser (get property-parsers k)]
{k (parser v)}
{k v}))
(defn- parse-dmd-component
"Parse a fragment of XML.
Does not process nested XML but that is not required for the dm+d XML."
([node] (parse-dmd-component nil node))
([kind node]
(reduce into (if kind {:TYPE kind} {})
(map #(parse-property (:tag %) (first (:content %))) (:content node)))))
(defn- stream-flat-dmd
"Streams dm+d components from a flat XML file; blocking.
This expects top-level tags to represent the components themselves.
Suitable for parsing dm+d VTM and INGREDIENT file.
Each component has TYPE information added in the form
[file-type component-type].
For example: `[:VTM :VTM]`"
[root ch file-type close?]
(let [kind [file-type file-type]]
(a/<!! (a/onto-chan!! ch (map (partial parse-dmd-component kind) (:content root)) close?))
(when close? (a/close! ch))))
(defn- stream-nested-dmd
"Stream dm+d components from a nested dm+d distribution file; blocking.
A nested file contains multiple components; for example VMP contains VMPS
as well as VIRTUAL_PRODUCT_INGREDIENT and ONT_DRUG_FORM. Unfortunately
the naming is inconsistent with some in the plural and some in the singular.
Each component has TYPE information added in the form
[file-type component-type].
For example: `[:VMP :DRUG_FORM]`"
[root ch file-type close?]
(loop [components (:content root)]
(when-let [component (first components)]
(let [[_ subtag] (first component)
subtag' (if (= (str (name file-type) "S") (name subtag)) file-type subtag) ;; fix inconsistent naming of plural components
kind [file-type subtag']]
(a/<!! (a/onto-chan!! ch (map (partial parse-dmd-component kind) (:content component)) false)))
(recur (next components))))
(when close? (a/close! ch)))
(defn- stream-lookup-xml
[root ch file-type close?]
(loop [lookups (:content root)]
(if-let [lookup (first lookups)]
(let [tag (:tag lookup)
result (->> (:content lookup)
(map (partial parse-dmd-component [file-type tag])))]
(a/<!! (a/onto-chan!! ch result false))
(recur (next lookups)))
(when close? (a/close! ch)))))
(defn parse-gtin
"Note: unlike other AMPP related components, this uses :AMPPID as the key!"
[loc]
(let [gtin (zx/xml1-> loc :GTINDATA :GTIN zx/text)
startdt (zx/xml1-> loc :GTINDATA :STARTDT zx/text)
enddt (zx/xml1-> loc :GTINDATA :ENDDT zx/text)]
(cond-> {:TYPE [:GTIN :AMPP]
:AMPPID (Long/parseLong (zx/xml1-> loc :AMPPID zx/text))}
gtin
(assoc :GTIN (Long/parseLong gtin))
startdt
(assoc :STARTDT (parse-date startdt))
enddt
(assoc :ENDDT (parse-date enddt)))))
(defn stream-gtin
[root ch _ close?]
(let [gtins (zx/xml-> (clojure.zip/xml-zip root) :GTIN_DETAILS :AMPPS :AMPP parse-gtin)]
(a/<!! (a/onto-chan!! ch gtins false))
(when close? (a/close! ch))))
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
;;;; High-level dm+d processing functionality
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
(def ^:private streamers
{:VTM stream-flat-dmd
:VMP stream-nested-dmd
:AMP stream-nested-dmd
:VMPP stream-nested-dmd
:AMPP stream-nested-dmd
:INGREDIENT stream-flat-dmd
:LOOKUP stream-lookup-xml
:GTIN stream-gtin
:BNF stream-nested-dmd})
(defn- stream-dmd-file [ch close? {:keys [type file] :as dmd-file}]
(if-let [streamer (get streamers type)]
(with-open [rdr (io/reader file)]
(log/info "importing" dmd-file)
(let [root (xml/parse rdr :skip-whitespace true)]
(streamer root ch type close?)))
(log/info "skipping" dmd-file)))
(defn stream-dmd
"Streams dm+d components from the directory to the channel.
Components are ordered to maintain relational integrity should it be required.
This does minimal processing; streamed data is a close representation of the
dm+d data structures.Some properties are parsed such as dates (xxxDT) to a
java.time.LocalDate, flags (e.g. invalidity) to booleans, and identifiers to
'long's. Each component is labelled with its type as :TYPE. This is a tuple of
file and entity: e.g. [:VMP :DRUG_FORM].
Parameters:
- dir : directory containing dm+d distribution
- ch : clojure.core.async channel
- close? : close the channel when done?, default true
- include : a set of dm+d file types to include (e.g. #{:VTM})
- exclude : a set of dm+d file types to exclude (e.g. #{:VTM})"
[dir ch & {:keys [_include _exclude close?] :or {close? true} :as opts}]
(log/info "Importing from " dir)
(let [files (dmd-file-seq dir opts)]
(log/info "files found in directory " dir ":" files)
(doseq [dmd-file files]
(stream-dmd-file ch false dmd-file))
(when close? (a/close! ch))))
(defn statistics-dmd
"Return statistics for dm+d data in the specified directory."
[dir]
(let [ch (a/chan)]
(a/thread (stream-dmd dir ch))
(loop [item (a/<!! ch)
counts {}]
(if-not item
counts
(recur (a/<!! ch)
(update counts (:TYPE item) (fnil inc 0)))))))
(defn ^:private cardinalities-for-product [dir product-kind product-identifier]
(let [ch (a/chan 1 (filter #(not= (:TYPE %) [product-kind product-kind])))]
(a/thread (stream-dmd dir ch :include #{product-kind}))
(let [counts (loop [result {}]
(let [item (a/<!! ch)]
(if-not item
result
(recur (update-in result [(:TYPE item) (get item product-identifier)] (fnil inc 0))))))]
(reduce-kv (fn [result k v]
(let [max-cardinality (apply max (vals v))]
(conj result {:TYPE k
:MAX-CARDINALITY max-cardinality
:CARDINALITY (if (> max-cardinality 1) :TO-MANY :TO-ONE)}))) [] counts))))
(defn cardinalities
"Determines the cardinalities for the different product components."
[dir]
(map (fn [[kind id]] (cardinalities-for-product dir kind id))
[[:VMP :VPID]
[:AMP :APID]
[:VMPP :VPPID]
[:AMPP :APPID]
[:GTIN :AMPPID]]))
(defn print-cardinalities [{:keys [dir]}]
(println "Processing " dir)
(dorun (map clojure.pprint/print-table (cardinalities (str dir)))))
(comment
(map clojure.pprint/print-table (cardinalities "/Users/mark/Downloads/nhsbsa_dmd_3.4.0_20210329000001")))
(defn- ch->seq*
[ch]
(when-let [item (a/<!! ch)]
(cons item (lazy-seq (ch->seq* ch)))))
(defn ch->seq
"Turns a clojure core.async channel into a lazy sequence."
[ch]
(lazy-seq (ch->seq* ch)))
(defn get-component
"Convenience function to stream only the specified component.
Useful for testing.
Parameters:
- dir : directory from which to load dm+d files
- file-type : dm+d type e.g. :LOOKUP
- component-type : component type e.g. :COMBINATION_PACK_IND
`file-type` and `component-type` are keywords representing the names from the
dm+d specification.
:VTM :VTM - returns all VTMs
:VMP :VMP - returns all VMPs
:VMP :ONT_DRUG_FORM - returns all ONT_DRUG_FORMS from the VMP file."
[dir file-type component-type]
(let [ch (a/chan 200 (filter #(= [file-type component-type] (:TYPE %))))]
(a/thread (stream-dmd dir ch :include #{file-type}))
(ch->seq ch)))
(comment
(dmd-file-seq "/Users/mark/Downloads/nhsbsa_dmd_12.1.0_20201214000001")
(dmd-file-seq "/Users/mark/Downloads/week272021-r2_3-BNF")
(get-release-metadata "/Users/mark/Downloads/week272021-r2_3-BNF")
(get-component "/Users/mark/Downloads/week272021-r2_3-BNF" :BNF :BNF)
(get-component "/var/folders/w_/s108lpdd1bn84sntjbghwz3w0000gn/T/trud15801406225560397483/week352021-r2_3-GTIN-zip" :GTIN :AMPP)
(dmd-file-seq "/Users/mark/Downloads/nhsbsa_dmd_12.1.0_20201214000001" :include #{:VTM :VMP} :exclude #{:VMP})
(dmd-file-seq "/Users/mark/Downloads/nhsbsa_dmd_12.1.0_20201214000001" :include #{:VTM})
(dmd-file-seq "/Users/mark/Downloads/nhsbsa_dmd_12.1.0_20201214000001" :exclude #{:VTM})
(get-release-metadata "/Users/mark/Downloads/nhsbsa_dmd_12.1.0_20201214000001")
(get-release-metadata "/Users/mark/Downloads/nhsbsa_dmd_3.4.0_20210329000001")
(statistics-dmd "/Users/mark/Downloads/nhsbsa_dmd_12.1.0_20201214000001")
(statistics-dmd "/Users/mark/Downloads/week272021-r2_3-BNF")
(get-component "/Users/mark/Downloads/nhsbsa_dmd_12.1.0_20201214000001" :LOOKUP :LEGAL_CATEGORY)
(get-component "/Users/mark/Downloads/nhsbsa_dmd_12.1.0_20201214000001" :VTM :VTM)
(get-component "/Users/mark/Downloads/nhsbsa_dmd_12.1.0_20201214000001" :VMP :VMP)
(def ri (get-component "/tmp/trud6772717631287944974" :AMPP :REIMBURSEMENT_INFO))
(frequencies (map :DISP_FEES ri))
(take 20 ri)
(filter #(nil? (:DISP_FEES %)) ri)
(get-component (io/resource "dmd-2021-08-26") :AMP :AP_INGREDIENT)
(def rdr (io/reader file))
(def root (xml/parse rdr :skip-whitespace true))
(def ch (a/chan))
(a/thread (stream-nested-dmd root ch :BNF true))
(dotimes [_ 1000] (a/<!! ch))
(a/<!! ch)
(def file "/var/folders/w_/s108lpdd1bn84sntjbghwz3w0000gn/T/trud15801406225560397483/week352021-r2_3-GTIN-zip/f_gtin2_0260821.xml")
(def rdr (io/reader file))
(def root (xml/parse rdr :skip-whitespace true))
(def ch (a/chan))
(a/thread (stream-gtin root ch nil true))
(a/<!! ch)
(map clojure.pprint/print-table (cardinalities "/var/folders/w_/s108lpdd1bn84sntjbghwz3w0000gn/T/trud2465267306253668332/")))