-
Notifications
You must be signed in to change notification settings - Fork 1
/
load_intan_rhd_format.py
219 lines (167 loc) · 10.3 KB
/
load_intan_rhd_format.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
#! /bin/env python
#
# Michael Gibson 17 July 2015
# Modified Adrian Foy Sep 2018
import sys, struct, math, os, time
import numpy as np
from intanutil.read_header import read_header
from intanutil.get_bytes_per_data_block import get_bytes_per_data_block
from intanutil.read_one_data_block import read_one_data_block
from intanutil.notch_filter import notch_filter
from intanutil.data_to_result import data_to_result
def read_data(filename):
"""Reads Intan Technologies RHD2000 data file generated by evaluation board GUI.
Data are returned in a dictionary, for future extensibility.
"""
tic = time.time()
fid = open(filename, 'rb')
filesize = os.path.getsize(filename)
header = read_header(fid)
print('Found {} amplifier channel{}.'.format(header['num_amplifier_channels'], plural(header['num_amplifier_channels'])))
print('Found {} auxiliary input channel{}.'.format(header['num_aux_input_channels'], plural(header['num_aux_input_channels'])))
print('Found {} supply voltage channel{}.'.format(header['num_supply_voltage_channels'], plural(header['num_supply_voltage_channels'])))
print('Found {} board ADC channel{}.'.format(header['num_board_adc_channels'], plural(header['num_board_adc_channels'])))
print('Found {} board digital input channel{}.'.format(header['num_board_dig_in_channels'], plural(header['num_board_dig_in_channels'])))
print('Found {} board digital output channel{}.'.format(header['num_board_dig_out_channels'], plural(header['num_board_dig_out_channels'])))
print('Found {} temperature sensors channel{}.'.format(header['num_temp_sensor_channels'], plural(header['num_temp_sensor_channels'])))
print('')
# Determine how many samples the data file contains.
bytes_per_block = get_bytes_per_data_block(header)
# How many data blocks remain in this file?
data_present = False
bytes_remaining = filesize - fid.tell()
if bytes_remaining > 0:
data_present = True
if bytes_remaining % bytes_per_block != 0:
raise Exception('Something is wrong with file size : should have a whole number of data blocks')
num_data_blocks = int(bytes_remaining / bytes_per_block)
num_amplifier_samples = header['num_samples_per_data_block'] * num_data_blocks
num_aux_input_samples = int((header['num_samples_per_data_block'] / 4) * num_data_blocks)
num_supply_voltage_samples = 1 * num_data_blocks
num_board_adc_samples = header['num_samples_per_data_block'] * num_data_blocks
num_board_dig_in_samples = header['num_samples_per_data_block'] * num_data_blocks
num_board_dig_out_samples = header['num_samples_per_data_block'] * num_data_blocks
record_time = num_amplifier_samples / header['sample_rate']
if data_present:
print('File contains {:0.3f} seconds of data. Amplifiers were sampled at {:0.2f} kS/s.'.format(record_time, header['sample_rate'] / 1000))
else:
print('Header file contains no data. Amplifiers were sampled at {:0.2f} kS/s.'.format(header['sample_rate'] / 1000))
if data_present:
# Pre-allocate memory for data.
print('')
print('Allocating memory for data...')
data = {}
if (header['version']['major'] == 1 and header['version']['minor'] >= 2) or (header['version']['major'] > 1):
data['t_amplifier'] = np.zeros(num_amplifier_samples, dtype=np.int_)
else:
data['t_amplifier'] = np.zeros(num_amplifier_samples, dtype=np.uint)
data['amplifier_data'] = np.zeros([header['num_amplifier_channels'], num_amplifier_samples], dtype=np.uint)
data['aux_input_data'] = np.zeros([header['num_aux_input_channels'], num_aux_input_samples], dtype=np.uint)
data['supply_voltage_data'] = np.zeros([header['num_supply_voltage_channels'], num_supply_voltage_samples], dtype=np.uint)
data['temp_sensor_data'] = np.zeros([header['num_temp_sensor_channels'], num_supply_voltage_samples], dtype=np.uint)
data['board_adc_data'] = np.zeros([header['num_board_adc_channels'], num_board_adc_samples], dtype=np.uint)
# by default, this script interprets digital events (digital inputs and outputs) as booleans
# if unsigned int values are preferred(0 for False, 1 for True), replace the 'dtype=np.bool' argument with 'dtype=np.uint' as shown
# the commented line below illustrates this for digital input data; the same can be done for digital out
#data['board_dig_in_data'] = np.zeros([header['num_board_dig_in_channels'], num_board_dig_in_samples], dtype=np.uint)
data['board_dig_in_data'] = np.zeros([header['num_board_dig_in_channels'], num_board_dig_in_samples], dtype=np.bool_)
data['board_dig_in_raw'] = np.zeros(num_board_dig_in_samples, dtype=np.uint)
data['board_dig_out_data'] = np.zeros([header['num_board_dig_out_channels'], num_board_dig_out_samples], dtype=np.bool_)
data['board_dig_out_raw'] = np.zeros(num_board_dig_out_samples, dtype=np.uint)
# Read sampled data from file.
print('Reading data from file...')
# Initialize indices used in looping
indices = {}
indices['amplifier'] = 0
indices['aux_input'] = 0
indices['supply_voltage'] = 0
indices['board_adc'] = 0
indices['board_dig_in'] = 0
indices['board_dig_out'] = 0
print_increment = 10
percent_done = print_increment
for i in range(num_data_blocks):
read_one_data_block(data, header, indices, fid)
# Increment indices
indices['amplifier'] += header['num_samples_per_data_block']
indices['aux_input'] += int(header['num_samples_per_data_block'] / 4)
indices['supply_voltage'] += 1
indices['board_adc'] += header['num_samples_per_data_block']
indices['board_dig_in'] += header['num_samples_per_data_block']
indices['board_dig_out'] += header['num_samples_per_data_block']
fraction_done = 100 * (1.0 * i / num_data_blocks)
if fraction_done >= percent_done:
print('{}% done...'.format(percent_done))
percent_done = percent_done + print_increment
# Make sure we have read exactly the right amount of data.
bytes_remaining = filesize - fid.tell()
if bytes_remaining != 0: raise Exception('Error: End of file not reached.')
# Close data file.
fid.close()
if (data_present):
print('Parsing data...')
# Extract digital input channels to separate variables.
for i in range(header['num_board_dig_in_channels']):
data['board_dig_in_data'][i, :] = np.not_equal(np.bitwise_and(data['board_dig_in_raw'], (1 << header['board_dig_in_channels'][i]['native_order'])), 0)
# Extract digital output channels to separate variables.
for i in range(header['num_board_dig_out_channels']):
data['board_dig_out_data'][i, :] = np.not_equal(np.bitwise_and(data['board_dig_out_raw'], (1 << header['board_dig_out_channels'][i]['native_order'])), 0)
# Scale voltage levels appropriately.
data['amplifier_data'] = np.multiply(0.195, (data['amplifier_data'].astype(np.int32) - 32768)) # units = microvolts
data['aux_input_data'] = np.multiply(37.4e-6, data['aux_input_data']) # units = volts
data['supply_voltage_data'] = np.multiply(74.8e-6, data['supply_voltage_data']) # units = volts
if header['eval_board_mode'] == 1:
data['board_adc_data'] = np.multiply(152.59e-6, (data['board_adc_data'].astype(np.int32) - 32768)) # units = volts
elif header['eval_board_mode'] == 13:
data['board_adc_data'] = np.multiply(312.5e-6, (data['board_adc_data'].astype(np.int32) - 32768)) # units = volts
else:
data['board_adc_data'] = np.multiply(50.354e-6, data['board_adc_data']) # units = volts
data['temp_sensor_data'] = np.multiply(0.01, data['temp_sensor_data']) # units = deg C
# Check for gaps in timestamps.
num_gaps = np.sum(np.not_equal(data['t_amplifier'][1:]-data['t_amplifier'][:-1], 1))
if num_gaps == 0:
print('No missing timestamps in data.')
else:
print('Warning: {0} gaps in timestamp data found. Time scale will not be uniform!'.format(num_gaps))
# Scale time steps (units = seconds).
data['t_amplifier'] = data['t_amplifier'] / header['sample_rate']
data['t_aux_input'] = data['t_amplifier'][range(0, len(data['t_amplifier']), 4)]
data['t_supply_voltage'] = data['t_amplifier'][range(0, len(data['t_amplifier']), header['num_samples_per_data_block'])]
data['t_board_adc'] = data['t_amplifier']
data['t_dig'] = data['t_amplifier']
data['t_temp_sensor'] = data['t_supply_voltage']
# If the software notch filter was selected during the recording, apply the
# same notch filter to amplifier data here.
if header['notch_filter_frequency'] > 0:
print('Applying notch filter...')
print_increment = 10
percent_done = print_increment
for i in range(header['num_amplifier_channels']):
data['amplifier_data'][i,:] = notch_filter(data['amplifier_data'][i,:], header['sample_rate'], header['notch_filter_frequency'], 10)
fraction_done = 100 * (i / header['num_amplifier_channels'])
if fraction_done >= percent_done:
print('{}% done...'.format(percent_done))
percent_done += print_increment
else:
data = [];
# Move variables to result struct.
result = data_to_result(header, data, data_present)
print('Done! Elapsed time: {0:0.1f} seconds'.format(time.time() - tic))
return result
def plural(n):
"""Utility function to optionally pluralize words based on the value of n.
"""
if n == 1:
return ''
else:
return 's'
if __name__ == '__main__':
#a=read_data(sys.argv[1])
rhd = read_data('D:/data/cage_data/20191002/20191002_Greyson_Cage__191002_162617.rhd')
fs = rhd['frequency_parameters']['amplifier_sample_rate']
d1 = rhd['amplifier_data'][6, :240000]
d2 = rhd['amplifier_data'][9, :240000]
d1 = notch_filter(d1, fs, 60, 10)
d2 = notch_filter(d2, fs, 60, 10)
dd = d1-d2
dd = notch_filter(dd,fs,60,10)