forked from ratal/mdfreader
-
Notifications
You must be signed in to change notification settings - Fork 0
/
mdf3reader.py
1266 lines (1120 loc) · 52.5 KB
/
mdf3reader.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
# -*- coding: utf-8 -*-
""" Measured Data Format file reader module for version 3.x
Platform and python version
----------------------------------------
With Unix and Windows for python 2.6+ and 3.2+
:Author: `Aymeric Rateau <https://github.com/ratal/mdfreader>`__
Created on Sun Oct 10 12:57:28 2010
Dependencies
-------------------
- Python >2.6, >3.2 <http://www.python.org>
- Numpy >1.6 <http://numpy.scipy.org>
- Sympy to convert channels with formula
Attributes
--------------
PythonVersion : float
Python version currently running, needed for compatibility of both
python 2.6+ and 3.2+
mdf3reader module
--------------------------
"""
from __future__ import print_function
from numpy import right_shift, bitwise_and, interp
from numpy import max as npmax, min as npmin
from numpy import asarray, zeros, recarray, array, searchsorted
from numpy.core.records import fromfile, fromarrays
from numpy.core.defchararray import encode as ncode
from collections import defaultdict
from math import log, exp
from time import strftime, time, gmtime
from struct import pack, Struct
from io import open # for python 3 and 2 consistency
from sys import platform, exc_info, version_info, stderr, path
from os.path import dirname, abspath
import os
_root = dirname(abspath(__file__))
path.append(_root)
from mdf import mdf_skeleton, _open_MDF, _bits_to_bytes, _convertName,\
dataField, conversionField, compressed_data
from mdfinfo3 import info3
from channel import Channel3
PythonVersion = version_info
PythonVersion = PythonVersion[0]
def linearConv(data, conv): # 0 Parametric, Linear: Physical =Integer*P2 + P1
""" apply linear conversion to data
Parameters
----------------
data : numpy 1D array
raw data to be converted to physical value
conv : mdfinfo3.info3 conversion block ('CCBlock') dict
Returns
-----------
converted data to physical value
"""
if conv['P2'] == 1.0 and conv['P1'] in (0.0, -0.0):
return data # keeps dtype probably more compact than float64
else:
return data * conv['P2'] + conv['P1']
def tabInterpConv(data, conv): # 1 Tabular with interpolation
""" apply Tabular interpolation conversion to data
Parameters
----------------
data : numpy 1D array
raw data to be converted to physical value
conv : mdfinfo3.info3 conversion block ('CCBlock') dict
Returns
-----------
converted data to physical value
"""
tmp = array([(key, val['int'], val['phys'])
for (key, val) in conv.items()])
return interp(data, tmp[:, 1], tmp[:, 2])
def tabConv(data, conv): # 2 Tabular
""" apply Tabular conversion to data
Parameters
----------------
data : numpy 1D array
raw data to be converted to physical value
conv : mdfinfo3.info3 conversion block ('CCBlock') dict
Returns
-----------
converted data to physical value
"""
tmp = array([(key, val['int'], val['phys'])
for (key, val) in conv.items()])
indexes = searchsorted(tmp[:, 1], data)
return tmp[indexes, 2]
def polyConv(data, conv): # 6 Polynomial
""" apply polynomial conversion to data
Parameters
----------------
data : numpy 1D array
raw data to be converted to physical value
conv : mdfinfo3.info3 conversion block ('CCBlock') dict
Returns
-----------
converted data to physical value
"""
return (conv['P2'] - conv['P4'] * (data - conv['P5'] - conv['P6'])) \
/ (conv['P3'] * (data - conv['P5'] - conv['P6']) - conv['P1'])
def expConv(data, conv): # 7 Exponential
""" apply exponential conversion to data
Parameters
----------------
data : numpy 1D array
raw data to be converted to physical value
conv : mdfinfo3.info3 conversion block ('CCBlock') dict
Returns
-----------
converted data to physical value
"""
if conv['P4'] == 0 and conv['P1'] != 0 and conv['P2'] != 0:
return exp(((data - conv['P7']) * conv['P6'] - conv['P3'])
/ conv['P1']) / conv['P2']
elif conv['P1'] == 0 and conv['P4'] != 0 and conv['P5'] != 0:
return exp((conv['P3'] / (data - conv['P7']) - conv['P6'])
/ conv['P4']) / conv['P5']
else:
print('Non possible conversion parameters for channel ', file=stderr)
def logConv(data, conv): # 8 Logarithmic
""" apply logarithmic conversion to data
Parameters
----------------
data : numpy 1D array
raw data to be converted to physical value
conv : mdfinfo3.info3 conversion block ('CCBlock') dict
Returns
-----------
converted data to physical value
"""
if conv['P4'] == 0 and conv['P1'] != 0 and conv['P2'] != 0:
return log(((data - conv['P7']) * conv['P6'] - conv['P3'])
/ conv['P1']) / conv['P2']
elif conv['P1'] == 0 and conv['P4'] != 0 and conv['P5'] != 0:
return log((conv['P3'] / (data - conv['P7']) - conv['P6'])
/ conv['P4']) / conv['P5']
else:
print('Non possible conversion parameters for channel ', file=stderr)
def rationalConv(data, conv): # 9 rational
""" apply rational conversion to data
Parameters
----------------
data : numpy 1D array
raw data to be converted to physical value
conv : mdfinfo3.info3 conversion block ('CCBlock') dict
Returns
-----------
converted data to physical value
"""
return (conv['P1'] * data * data + conv['P2'] * data + conv['P3'])\
/ (conv['P4'] * data * data + conv['P5'] * data + conv['P6'])
def formulaConv(data, conv): # 10 Text Formula
""" apply formula conversion to data
Parameters
----------------
data : numpy 1D array
raw data to be converted to physical value
conv : mdfinfo3.info3 conversion block ('CCBlock') dict
Returns
-----------
converted data to physical value
Notes
--------
Requires sympy module
"""
try:
from sympy import lambdify, symbols
X = symbols('X') # variable is X
formula = conv['textFormula']
# remove trailing text after 0
formula = formula[:formula.find('\x00')]
# adapt ASAM-MCD2 syntax to sympy
formula = formula.replace('pow(', 'power(')
# formula to function for evaluation
expr = lambdify(X, formula, modules='numpy', dummify=False)
return expr(data)
except:
print('Please install sympy to convert channel ', file=stderr)
print('Failed to convert formulae ' + conv['textFormula'], file=stderr)
def textRangeTableConv(data, conv): # 12 Text range table
""" apply text range table conversion to data
Parameters
----------------
data : numpy 1D array
raw data to be converted to physical value
conv : mdfinfo3.info3 conversion block ('CCBlock') dict
Returns
-----------
converted data to physical value
"""
try:
npair = len(conv)
lower = [conv[pair]['lowerRange'] for pair in range(npair)]
upper = [conv[pair]['upperRange'] for pair in range(npair)]
text = [conv[pair]['Textrange'] for pair in range(npair)]
temp = []
for Lindex in range(len(data)):
value = text[0] # default value
for pair in range(1, npair):
if lower[pair] <= data[Lindex] <= upper[pair]:
value = text[pair]
break
temp.append(value)
try:
temp = asarray(temp) # try to convert to numpy
except:
pass
return temp
except:
print('Failed to convert text to range table', file=stderr)
class record(list):
""" record class lists Channel classes,
it is representing a channel group
Attributes
--------------
CGrecordLength : int
length of record from channel group block information in Byte
recordLength : int
length of record from channels information in Byte
numberOfRecords : int
number of records in data block
recordID : int
recordID corresponding to channel group
recordIDnumber : int
size of recordID
dataGroup : int:
data group number
channelGroup : int
channel group number
numpyDataRecordFormat : list
list of numpy (dtype) for each channel
dataRecordName : list
list of channel names used for recarray attribute definition
master : dict
define name and number of master channel
recordToChannelMatching : dict
helps to identify nested bits in byte
channelNames : set
channel names to be stored, useful for low memory consumption but slow
hiddenBytes : Bool, False by default
flag in case of non declared channels in record
byte_aligned : Bool, True by default
flag for byte aligned record
Methods
------------
addChannel(info, channelNumber)
loadInfo(info)
readSortedRecord(fid, pointer, channelSet=None)
readRecordBuf(buf, channelSet=None)
readRecordBits(bita, channelSet=None)
changeChannelName(channelName)
"""
def __init__(self, dataGroup, channelGroup):
self.CGrecordLength = 0
self.recordLength = 0
self.dataBlockLength = 0
self.numberOfRecords = 0
self.recordID = 0
self.recordIDnumber = 0
self.dataGroup = dataGroup
self.channelGroup = channelGroup
self.numpyDataRecordFormat = []
self.dataRecordName = []
self.master = {}
self.master['name'] = 'master_{}'.format(dataGroup)
self.master['number'] = None
self.recordToChannelMatching = {}
self.channelNames = set()
self.hiddenBytes = False
self.byte_aligned = True
def __repr__(self):
output = list()
output.append('Channels :\n')
for chan in self.channelNames:
output.append(''.join([chan, '\n']))
output.append('Datagroup number : {}\n'.format(self.dataGroup))
if self.master['name'] is not None:
output.append(''.join(['Master channel : ', self.master['name'], '\n']))
output.append('Numpy records format : \n')
for record in self.numpyDataRecordFormat:
output.append('{}\n'.format(record))
return ''.join(output)
def addChannel(self, info, channelNumber):
""" add a channel in class
Parameters
----------------
info : mdfinfo3.info3 class
channelNumber : int
channel number in mdfinfo3.info3 class
"""
self.append(Channel3(info, self.dataGroup, self.channelGroup,
channelNumber, self.recordIDnumber))
self.channelNames.add(self[-1].name)
def loadInfo(self, info):
""" gathers records related from info class
Parameters
----------------
info : mdfinfo3.info3 class
"""
self.recordIDnumber = info['DGBlock'][self.dataGroup]['numberOfRecordIDs']
self.recordID = info['CGBlock'][self.dataGroup][self.channelGroup]['recordID']
self.CGrecordLength = info['CGBlock'][self.dataGroup][self.channelGroup]['dataRecordSize']
self.numberOfRecords = info['CGBlock'][self.dataGroup][self.channelGroup]['numberOfRecords']
self.dataBlockLength = self.CGrecordLength * self.numberOfRecords
if self.recordIDnumber > 0: # record ID existing at beginning of record
self.dataRecordName.append('RecordID{}'.format(self.channelGroup))
format = ('{}_title'.format(self.dataRecordName[-1]), self.dataRecordName[-1])
self.numpyDataRecordFormat.append((format, 'uint8'))
self.dataBlockLength = (self.CGrecordLength + 1) * self.numberOfRecords
embedding_channel = None
for channelNumber in range(info['CGBlock'][self.dataGroup][self.channelGroup]['numberOfChannels']):
channel = Channel3(info, self.dataGroup, self.channelGroup,
channelNumber, self.recordIDnumber)
if self.master['number'] is None or channel.channelType == 1: # master channel found
self.master['name'] = channel.name
self.master['number'] = channelNumber
self.append(channel) # adds channel in record list
self.channelNames.add(channel.name)
# Checking if several channels are embedded in bytes
if len(self) > 1:
# all channels are already ordered in record based on byte_offset and bit_offset
# so just comparing with previous channel
prev_chan = self[-2]
prev_chan_includes_curr_chan = channel.posBitBeg >= 8 * prev_chan.byteOffset \
and channel.posBitEnd <= 8 * (prev_chan.byteOffset + prev_chan.nBytes)
if embedding_channel is not None:
embedding_channel_includes_curr_chan = \
channel.posBitEnd <= embedding_channel.posByteEnd * 8
else:
embedding_channel_includes_curr_chan = False
if channel.byteOffset >= prev_chan.byteOffset and \
channel.posBitBeg < 8 * (prev_chan.byteOffset + prev_chan.nBytes) and \
channel.posBitEnd > 8 * (prev_chan.byteOffset + prev_chan.nBytes):
# not byte aligned
self.byte_aligned = False
if embedding_channel is not None and \
channel.posBitEnd > embedding_channel.posByteEnd * 8:
embedding_channel = None
if prev_chan_includes_curr_chan or \
embedding_channel_includes_curr_chan: # bit(s) in byte(s)
if embedding_channel is None and prev_chan_includes_curr_chan:
embedding_channel = prev_chan # new embedding channel detected
if self.recordToChannelMatching: # not first channel
self.recordToChannelMatching[channel.recAttributeName] = \
self.recordToChannelMatching[prev_chan.recAttributeName]
channel.embedding_channel_bitOffset = \
channel.posBitBeg - embedding_channel.posBitBeg
else: # first channels
self.recordToChannelMatching[channel.recAttributeName] = \
channel.recAttributeName
self.numpyDataRecordFormat.append(channel.RecordFormat)
self.dataRecordName.append(channel.recAttributeName)
self.recordLength += channel.nBytes
if embedding_channel is None: # adding bytes
self.recordToChannelMatching[channel.recAttributeName] = \
channel.recAttributeName
self.numpyDataRecordFormat.append(channel.RecordFormat)
self.dataRecordName.append(channel.recAttributeName)
self.recordLength += channel.nBytes
if self.recordIDnumber == 2: # second record ID at end of record
self.dataRecordName.append('RecordID{}_2'.format(self.channelGroup))
format = ('{}_title'.format(self.dataRecordName[-1]), self.dataRecordName[-1])
self.numpyDataRecordFormat.append((format, 'uint8'))
self.dataBlockLength = (self.CGrecordLength + 2) * self.numberOfRecords
# check for hidden bytes
if self.CGrecordLength > self.recordLength:
self.hiddenBytes = True
# check record length consitency
elif self.CGrecordLength < self.recordLength:
# forces to use dataRead instead of numpy records.
self.byte_aligned = False
def readSortedRecord(self, fid, pointer, channelSet=None):
""" reads record, only one channel group per datagroup
Parameters
----------------
fid : float
file identifier
pointer
position in file of data block beginning
channelSet : Set of str, optional
list of channel to read
Returns
-----------
rec : numpy recarray
contains a matrix of raw data in a recarray (attributes
corresponding to channel name)
Notes
--------
If channelSet is None, read data using numpy.core.records.fromfile
that is rather quick. However, in case of large file, you can use
channelSet to load only interesting channels or only one channel
on demand, but be aware it might be much slower.
"""
fid.seek(pointer)
if channelSet is None and not self.hiddenBytes and self.byte_aligned:
# reads all, quickest but memory consuming
return fromfile(fid, dtype=self.numpyDataRecordFormat,
shape=self.numberOfRecords,
names=self.dataRecordName)
else: # reads only some channels from a sorted data block
if channelSet is None:
channelSet = self.channelNames
# memory efficient but takes time
# are channelSet in this dataGroup
if len(channelSet & self.channelNames) > 0:
# check if master channel is in the list
if not self.master['name'] in channelSet:
channelSet.add(self.master['name']) # adds master channel
try: # use rather cython compiled code for performance
from dataRead import dataRead
# converts data type from mdf 3.x to 4.x
convertDataType3to4 = {0: 0, 1: 2, 2: 4, 3: 4,
7: 6, 8: 10,
9: 1, 10: 3, 11: 5, 12: 5,
13: 0, 14: 2, 15: 4, 16: 4}
bita = fid.read(self.dataBlockLength)
format = []
for channel in self:
if channel.recAttributeName in channelSet:
format.append(channel.nativeRecordFormat)
buf = recarray(self.numberOfRecords, format)
for chan in range(len(self)):
if self[chan].recAttributeName in channelSet:
buf[self[chan].recAttributeName] = \
dataRead(bytes(bita),
self[chan].bitCount,
convertDataType3to4[self[chan].signalDataType],
self[chan].nativeRecordFormat[1],
self.numberOfRecords,
self.CGrecordLength,
self[chan].bitOffset,
self[chan].posByteBeg,
self[chan].posByteEnd)
return buf
except:
print('Unexpected error:', exc_info(), file=stderr)
print('dataRead crashed, back to python data reading',
file=stderr)
rec = {}
recChan = []
numpyDataRecordFormat = []
for channel in channelSet: # initialise data structure
rec[channel] = 0
for channel in self: # list of Channels from channelSet
if channel.recAttributeName in channelSet:
recChan.append(channel)
numpyDataRecordFormat.append(channel.RecordFormat)
rec = zeros((self.numberOfRecords, ), dtype=numpyDataRecordFormat)
recordLength = self.recordIDnumber + self.CGrecordLength
for r in range(self.numberOfRecords): # for each record,
buf = fid.read(recordLength)
for channel in recChan:
rec[channel.recAttributeName][r] = \
channel.CFormat.unpack(buf[channel.posByteBeg:
channel.posByteEnd])[0]
return rec.view(recarray)
def readRecordBuf(self, buf, channelSet=None):
""" read stream of record bytes
Parameters
----------------
buf : stream
stream of bytes read in file
channelSet : Set of str, optional
list of channel to read
Returns
-----------
rec : dict
returns dictionary of channel with its corresponding values
"""
temp = {}
if channelSet is None:
channelSet = self.channelNames
for Channel in self: # list of channel classes from channelSet
if Channel.name in channelSet:
temp[self.recordToChannelMatching[Channel.recAttributeName]] = \
Channel.CFormat.unpack(buf[Channel.posByteBeg:
Channel.posByteEnd])[0]
return temp # returns dictionary of channel with its corresponding values
def readRecordBits(self, bita, channelSet=None):
""" read stream of record bits by bits in case of not aligned or hidden bytes
Parameters
----------------
buf : stream
stream of bytes read in file
channelSet : Set of str, optional
list of channel to read
Returns
-----------
rec : dict
returns dictionary of channel with its corresponding values
"""
from bitarray import bitarray
B = bitarray(endian="little") # little endian by default
B.frombytes(bytes(bita))
def signedInt(temp, extension):
""" extend bits of signed data managing two's complement
"""
extension.setall(False)
extensionInv = bitarray(extension, endian='little')
extensionInv.setall(True)
signBit = temp[-1]
if not signBit: # positive value, extend with 0
temp.extend(extension)
else: # negative value, extend with 1
signBit = temp.pop(-1)
temp.extend(extensionInv)
temp.append(signBit)
return temp
# read data
temp = {}
if channelSet is None:
channelSet = self.channelNames
for Channel in self: # list of channel classes from channelSet
if Channel.name in channelSet:
temp[Channel.recAttributeName] = B[Channel.posBitBeg: Channel.posBitEnd]
nbytes = len(temp[Channel.recAttributeName].tobytes())
if not nbytes == Channel.nBytes:
byte = bitarray(8 * (Channel.nBytes - nbytes), endian='little')
byte.setall(False)
if Channel.signalDataType not in (1, 10, 14): # not signed integer
temp[Channel.recAttributeName].extend(byte)
else: # signed integer (two's complement), keep sign bit and extend with bytes
temp[Channel.recAttributeName] = signedInt(temp[Channel.recAttributeName], byte)
nTrailBits = Channel.nBytes*8 - Channel.bitCount
if Channel.signalDataType in (1, 10, 14) and \
nbytes == Channel.nBytes and \
nTrailBits > 0: # Ctype byte length but signed integer
trailBits = bitarray(nTrailBits, endian='little')
temp[Channel.recAttributeName] = signedInt(temp[Channel.recAttributeName], trailBits)
if 's' not in Channel.dataFormat:
temp[Channel.recAttributeName] = Channel.CFormat.unpack(temp[Channel.recAttributeName].tobytes())[0]
else:
temp[Channel.recAttributeName] = temp[Channel.recAttributeName].tobytes()
return temp # returns dictionary of channel with its corresponding values
class DATA(dict):
""" DATA class is organizing record classes itself made of channel.
This class inherits from dict. Keys are corresponding to channel
group recordID. A DATA class corresponds to a data block, a dict
of record classes (one per channel group). Each record class contains
a list of channel class representing the structure of channel record.
Attributes
--------------
fid : io.open
file identifier
pointerToData : int
position of Data block in mdf file
BlockLength : int
total size of data block
Methods
------------
addRecord(record)
Adds a new record in DATA class dict
read(channelSet)
Reads data block
loadSorted(record, nameList=None)
Reads sorted data block from record definition
loadUnSorted(nameList=None)
Reads unsorted data block, not yet implemented
"""
def __init__(self, fid, pointer):
self.fid = fid
self.pointerToData = pointer
self.BlockLength = 0
def addRecord(self, record):
"""Adds a new record in DATA class dict
Parameters
----------------
record class
channel group definition listing record channel classes
"""
self[record.recordID] = {}
self[record.recordID]['record'] = record
self.BlockLength += record.dataBlockLength
def read(self, channelSet):
"""Reads data block
Parameters
----------------
channelSet : set of str, optional
list of channel names
"""
if len(self) == 1: # sorted dataGroup
recordID = list(self.keys())[0]
self[recordID]['data'] = \
self.loadSorted(self[recordID]['record'],
nameList=channelSet)
elif len(self) >= 2: # unsorted DataGroup
data = self.loadUnSorted(nameList=channelSet)
for recordID in list(self.keys()):
self[recordID]['data'] = {}
for channel in self[recordID]['record']:
self[recordID]['data'][channel.recAttributeName] = \
data[self[recordID]['record'].
recordToChannelMatching[channel.recAttributeName]]
else: # empty data group
pass
def loadSorted(self, record, nameList=None): # reads sorted data
"""Reads sorted data block from record definition
Parameters
----------------
record class
channel group definition listing record channel classes
channelSet : set of str, optional
list of channel names
Returns
-----------
numpy recarray of data
"""
return record.readSortedRecord(self.fid, self.pointerToData, nameList)
def loadUnSorted(self, nameList=None):
"""Reads unsorted data block from record definition
Parameters
----------------
record class
channel group definition listing record channel classes
channelSet : set of str, optional
list of channel names
Returns
-----------
numpy recarray of data
"""
self.fid.seek(self.pointerToData)
stream = self.fid.read(self.BlockLength)
# reads only the channels using offset functions, channel by channel.
buf = defaultdict(list)
position = 0
recordIdCFormat = Struct('B')
# initialise data structure
for recordID in self:
for channelName in self[recordID]['record'].dataRecordName:
buf[channelName] = []
# read data
while position < len(stream):
recordID = recordIdCFormat.unpack(stream[position:position + 1])[0]
if not self[recordID]['record'].hiddenBytes and self[recordID]['record'].byte_aligned:
temp = self[recordID]['record'].readRecordBuf(stream[position:position + self[recordID]['record'].CGrecordLength + 1], nameList)
else: # do read bytes but bits in record
temp = self[recordID]['record'].readRecordBits(stream[position:position + self[recordID]['record'].CGrecordLength + 1], nameList)
# recordId is only unit8
position += self[recordID]['record'].CGrecordLength + 1
for channelName in temp:
buf[channelName].append(temp[channelName]) # to remove append
# convert list to array
for chan in buf:
buf[chan] = array(buf[chan])
return buf
class mdf3(mdf_skeleton):
""" mdf file version 3.0 to 3.3 class
Attributes
--------------
fileName : str
file name
MDFVersionNumber : int
mdf file version number
masterChannelList : dict
Represents data structure: a key per master channel with corresponding value containing
a list of channels
One key or master channel represents then a data group having same sampling interval.
multiProc : bool
Flag to request channel conversion multi processed for performance improvement.
One thread per data group.
convertAfterRead : bool
flag to convert raw data to physical just after read
filterChannelNames : bool
flag to filter long channel names from its module names separated by '.'
file_metadata : dict
file metadata with minimum keys: author, organisation, project, subject, comment, time, date
Methods
------------
read3( fileName=None, info=None, multiProc=False, channelList=None, convertAfterRead=True)
Reads mdf 3.x file data and stores it in dict
_getChannelData3(channelName)
Returns channel numpy array
_convertChannel3(channelName)
converts specific channel from raw to physical data according to CCBlock information
_convertAllChannel3()
Converts all channels from raw data to converted data according to CCBlock information
write3(fileName=None)
Writes simple mdf 3.3 file
"""
def read3(self, fileName=None, info=None, multiProc=False, channelList=None, \
convertAfterRead=True, filterChannelNames=False, compression=False):
""" Reads mdf 3.x file data and stores it in dict
Parameters
----------------
fileName : str, optional
file name
info : mdfinfo3.info3 class
info3 class containing all MDF Blocks
multiProc : bool
flag to activate multiprocessing of channel data conversion
channelList : list of str, optional
list of channel names to be read
If you use channelList, reading might be much slower but it will save you memory.
Can be used to read big files
convertAfterRead : bool, optional
flag to convert channel after read, True by default
If you use convertAfterRead by setting it to false, all data from channels
will be kept raw, no conversion applied.
If many float are stored in file, you can gain from 3 to 4 times memory footprint
To calculate value from channel, you can then use method .getChannelData()
compression : bool, optional
falg to activate data compression with blosc
"""
self.multiProc = multiProc
if platform == 'win32':
self.multiProc = False # no multiprocessing for windows platform
if self.fileName is None and info is not None:
self.fileName = info.fileName
elif fileName is not None and self.fileName is None:
self.fileName = fileName
if channelList is None:
channelSetFile = None
else:
channelSetFile = set(channelList)
# Read information block from file
if info is None:
if self.info is None:
info = info3(self.fileName, None, False, True)
else:
info = self.info
if info.fid is None or info.fid.closed:
try:
info.fid = open(self.fileName, 'rb')
except IOError:
raise Exception('Can not find file ' + self.fileName)
minimal = 2 # always reads minimum info by default
# reads metadata
try:
comment = info['HDBlock']['TXBlock']['Text']
except:
comment = ''
# converts date to be compatible with ISO8601
day, month, year = info['HDBlock']['Date'].split(':')
ddate = '-'.join([year, month, day])
self.add_metadata(author=info['HDBlock']['Author'],
organisation=info['HDBlock']['Organization'],
project=info['HDBlock']['ProjectName'],
subject=info['HDBlock']['Subject'], comment=comment,
date=ddate, time=info['HDBlock']['Time'])
# Read data from file
for dataGroup in info['DGBlock']:
channelSet = channelSetFile
if info['DGBlock'][dataGroup]['numberOfChannelGroups'] > 0 and \
(channelSet is None or
len(channelSet & info['ChannelNamesByDG'][dataGroup]) > 0): # data exists
if not self._noDataLoading: # load CG, CN and CC block info
info.readCGBlock(info.fid, dataGroup, minimal=minimal)
# Pointer to data block
pointerToData = info['DGBlock'][dataGroup]['pointerToDataRecords']
buf = DATA(info.fid, pointerToData)
for channelGroup in range(info['DGBlock'][dataGroup]['numberOfChannelGroups']):
temp = record(dataGroup, channelGroup) # create record class
temp.loadInfo(info) # load all info related to record
if temp.numberOfRecords != 0: # continue if there are at least some records
buf.addRecord(temp)
if self._noDataLoading and channelSet is not None and len(channelSet & \
buf[temp.recordID]['record'].channelNames) > 0:
channelSet = None # will load complete datagroup
buf.read(channelSet) # reads datablock potentially containing several channel groups
for recordID in buf:
if 'record' in buf[recordID]:
master_channel = buf[recordID]['record'].master['name']
if master_channel in self and self[master_channel][dataField] is not None:
master_channel = ''.join([master_channel, '_{}'.format(dataGroup)])
channels = (c for c in buf[recordID]['record']
if channelSet is None or c.name in channelSet)
for chan in channels: # for each recordchannel
# in case record is used for several channels
recordName = buf[recordID]['record'].\
recordToChannelMatching[chan.recAttributeName]
temp = buf[recordID]['data'][recordName]
if len(temp) != 0:
# Process concatenated bits inside uint8
if not chan.bitCount // 8.0 == chan.bitCount / 8.0 \
and not buf[recordID]['record'].hiddenBytes \
and buf[recordID]['record'].byte_aligned:
# if channel data do not use complete bytes
if chan.signalDataType in (0, 1, 9, 10, 13, 14): # integers
temp = right_shift(temp, chan.embedding_channel_bitOffset)
mask = int(pow(2, chan.bitCount) - 1) # masks isBitUint8
temp = bitwise_and(temp, mask)
else: # should not happen
print('bit count and offset not applied \
to correct data type', file=stderr)
self.add_channel(dataGroup, chan.name, temp,
master_channel,
master_type=1,
unit=chan.unit,
description=chan.desc,
conversion=chan.conversion,
info=None,
compression=compression)
del buf
if not self._noDataLoading:
# clean CN, CC and CG info to free memory
info.cleanDGinfo(dataGroup)
info.fid.close() # close file
if convertAfterRead and not compression:
self._noDataLoading = False
self._convertAllChannel3()
def _getChannelData3(self, channelName):
"""Returns channel numpy array
Parameters
----------------
channelName : str
channel name
Returns:
-----------
numpy array
converted, if not already done, data corresponding to channel name
Notes
------
This method is the safest to get channel data as numpy array from 'data' dict key might contain raw data
"""
if channelName in self:
vect = self.getChannel(channelName)[dataField]
if vect is None: # noDataLoading reading argument flag activated
if self.info.fid is None or (self.info.fid is not None and self.info.fid.closed):
(self.info.fid, self.info.fileName, zipfile) = _open_MDF(self.fileName)
self.read3(fileName=None, info=self.info, channelList=[channelName], convertAfterRead=False)
return self._convert3(channelName)
else:
return None
def _convert3(self, channelName):
"""converts specific channel from raw to physical data according to CCBlock information
Parameters
----------------
channelName : str
Name of channel
Returns
-----------
numpy array
returns numpy array converted to physical values according to conversion type
"""
if self[channelName][dataField] is None:
vect = self[channelName][dataField]
else:
if isinstance(self[channelName][dataField], compressed_data):
vect = self[channelName][dataField].decompression() # uncompress blosc
else:
vect = self[channelName][dataField][:] # to have bcolz uncompressed data
if conversionField in self[channelName]: # there is conversion property
conversion = self[channelName][conversionField]
if conversion['type'] == 0:
return linearConv(vect, conversion['parameters'])
elif conversion['type'] == 1:
return tabInterpConv(vect, conversion['parameters'])
elif conversion['type'] == 2:
return tabConv(vect, conversion['parameters'])
elif conversion['type'] == 6:
return polyConv(vect, conversion['parameters'])
elif conversion['type'] == 7:
return expConv(vect, conversion['parameters'])
elif conversion['type'] == 8:
return logConv(vect, conversion['parameters'])
elif conversion['type'] == 9:
return rationalConv(vect, conversion['parameters'])
elif conversion['type'] == 10:
return formulaConv(vect, conversion['parameters'])
elif conversion['type'] == 12:
return textRangeTableConv(vect, conversion['parameters'])
else:
return vect
else:
return vect
def _convertChannel3(self, channelName):
"""converts specific channel from raw to physical data according to CCBlock information
Parameters
----------------
channelName : str
Name of channel
"""
self.setChannelData(channelName, self._convert3(channelName))
self.remove_channel_conversion(channelName)