-
Notifications
You must be signed in to change notification settings - Fork 5
/
art-bio.py
159 lines (131 loc) · 4.69 KB
/
art-bio.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
readme = '''
🐍 rt-bio : A Real-Time BIOsignal feature extraction tool 🦆
https://github.com/vatte/rt-bio
sources:
ecg (electrocardiogram) sends /ecg/ibi
recg (same as above + /ecg/raw)
eeg (electroencephalogram sends /eeg/alpha, /eeg/beta /eeg/gamma, /eeg/theta)
reeg (same as above + /eeg/raw)
emg (electromyogram) sends /emg/power
remg (same as above + /emg/raw)
eda (electrodermal activity) sends /eda/edr([length, amplitude])
reda (same as above + /eda/raw)
example usage:
art-bio -c ecg osc -c reeg file
command line arguments:
-d, --device [device_type (default: bitalino)]
choose biosignal acquisition device
[device_type] - bitalino, (openbci soon i promise...)
-f, --freq [sampling_frequency (default: 100)]
set sampling frequency for biosignal data acquisition
-l, --list
lists available devices
-i, --index [device_index (default: 0)]
set the correct device from available devices, see --list
-c, --connect [sourcetype] [destinationtype]
makes connections for biosignal sources to data destinations
[source]: ecg, eda, eeg, emg
[destination]: osc, file, digital
--osc-address [destination_address]
[destination_address (default: 127.0.0.1:4810)] IP_ADDRESS:PORT
--osc-prefix [prefix]
[prefix (default: /rtbio)]
--filename [filename]
[filename (default: temp.txt)]
'''
import sys
import configparser
import argparser as parser
from devices import *
from init_sources import init_sources
from router import Router
print("art-bio - A Real-Time BIOsignal feature extraction tool")
router = Router()
#set default argument values
device_name = 'bitalino'
sampling_frequency = 100
device_index = 0
router.osc_address = '127.0.0.1:4810'
router.osc_prefix = '/rtbio'
router.filename = 'temp.txt'
args = sys.argv
#get devicename from args
device_name_arg = parser.getDeviceName(args)
if device_name_arg:
device_name = device_name_arg
#get device index from args
device_index_arg = parser.getDeviceIndex(args)
if device_index_arg:
device_index = device_index_arg
#list devices and exit if --list
if '-l' in args or '--list' in args:
print('list of ' + device_name + ' devices: ')
if(device_name == 'bitalino'):
device_list = Bitalino.list_devices(None)
for i, dev in enumerate(device_list):
print('[{}] {}'.format(i, dev))
elif(device_name == 'openbci'):
device_list = OpenBCI.list_devices(None)
for i, dev in enumerate(device_list):
print('[{}] {}'.format(i, dev))
else:
raise ValueError('No such device: ' + device_name)
print('exiting...')
sys.exit(0)
#get osc_address from args
osc_address_arg = parser.getOSCAddress(args)
if osc_address_arg:
router.osc_address = osc_address_arg
#get osc_prefix from args
osc_prefix_arg = parser.getOSCPrefix(args)
if osc_prefix_arg:
router.osc_prefix = osc_prefix_arg
#get filename from args
filename_arg = parser.getFilename(args)
if filename_arg:
router.filename = filename_arg
#get sampling_frequency from args
sampling_frequency_arg = parser.getSamplingFrequency(args)
if sampling_frequency_arg:
sampling_frequency = sampling_frequency_arg
#get connections from args
connections = parser.getConnections(args)
#if no connections set show help
if not len(connections):
print(readme)
sys.exit(0)
#read config file if it exists
config = configparser.ConfigParser()
config.read('config.ini')
#initialize device
if device_name == 'bitalino':
device = Bitalino(device_index)
if 'bitalino' in config:
channels = {}
for k in config['bitalino']:
channels[k] = [ int(c) for c in config['bitalino'][k].split(',') ]
device.channel_map = channels
router.digital_out_func = device.digital_trigger
elif device_name == 'openbci':
device = OpenBCI(device_index)
def noDigital():
raise ValueError('openbci has no digital output defined')
router.digital_out_func = noDigital
else:
raise ValueError('No such device: ' + device_name)
#initialize sources and destinations
sources = init_sources(connections, device.channel_map, sampling_frequency)
router.init_destinations(connections)
#start streaming
device.start(sampling_frequency, sources.keys())
try:
while True:
# Read samples
samples = device.read()
for source in samples.keys():
features = []
for i, channel in enumerate(samples[source]):
features.append(sources[source][i].add_data(channel))
router.route_data(source, connections, features, samples[source])
except KeyboardInterrupt:
device.close()