-
Notifications
You must be signed in to change notification settings - Fork 18
Expand file tree
/
Copy patheventslicer.py
More file actions
166 lines (144 loc) · 6.16 KB
/
eventslicer.py
File metadata and controls
166 lines (144 loc) · 6.16 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
import math
from typing import Dict, Tuple
import h5py
from numba import jit
import numpy as np
class EventSlicer:
def __init__(self, h5f: h5py.File):
self.h5f = h5f
self.events = dict()
for dset_str in ['p', 'x', 'y', 't']:
self.events[dset_str] = self.h5f['events/{}'.format(dset_str)]
# This is the mapping from milliseconds to event index:
# It is defined such that
# (1) t[ms_to_idx[ms]] >= ms*1000, for ms > 0
# (2) t[ms_to_idx[ms] - 1] < ms*1000, for ms > 0
# (3) ms_to_idx[0] == 0
# , where 'ms' is the time in milliseconds and 't' the event timestamps in microseconds.
#
# As an example, given 't' and 'ms':
# t: 0 500 2100 5000 5000 7100 7200 7200 8100 9000
# ms: 0 1 2 3 4 5 6 7 8 9
#
# we get
#
# ms_to_idx:
# 0 2 2 3 3 3 5 5 8 9
self.ms_to_idx = np.asarray(self.h5f['ms_to_idx'], dtype='int64')
if "t_offset" in list(h5f.keys()):
self.t_offset = int(h5f['t_offset'][()])
else:
self.t_offset = 0
self.t_final = int(self.events['t'][-1]) + self.t_offset
def get_start_time_us(self):
return self.t_offset
def get_final_time_us(self):
return self.t_final
def get_events(self, t_start_us: int, t_end_us: int) -> Dict[str, np.ndarray]:
"""Get events (p, x, y, t) within the specified time window
Parameters
----------
t_start_us: start time in microseconds
t_end_us: end time in microseconds
Returns
-------
events: dictionary of (p, x, y, t) or None if the time window cannot be retrieved
"""
assert t_start_us < t_end_us
# We assume that the times are top-off-day, hence subtract offset:
t_start_us -= self.t_offset
t_end_us -= self.t_offset
t_start_ms, t_end_ms = self.get_conservative_window_ms(t_start_us, t_end_us)
t_start_ms_idx = self.ms2idx(t_start_ms)
t_end_ms_idx = self.ms2idx(t_end_ms)
if t_start_ms_idx is None or t_end_ms_idx is None:
# Cannot guarantee window size anymore
return None
events = dict()
time_array_conservative = np.asarray(self.events['t'][t_start_ms_idx:t_end_ms_idx])
idx_start_offset, idx_end_offset = self.get_time_indices_offsets(time_array_conservative, t_start_us, t_end_us)
t_start_us_idx = t_start_ms_idx + idx_start_offset
t_end_us_idx = t_start_ms_idx + idx_end_offset
# Again add t_offset to get gps time
events['t'] = time_array_conservative[idx_start_offset:idx_end_offset] + self.t_offset
for dset_str in ['p', 'x', 'y']:
events[dset_str] = np.asarray(self.events[dset_str][t_start_us_idx:t_end_us_idx])
assert events[dset_str].size == events['t'].size
return events
@staticmethod
def get_conservative_window_ms(ts_start_us: int, ts_end_us) -> Tuple[int, int]:
"""Compute a conservative time window of time with millisecond resolution.
We have a time to index mapping for each millisecond. Hence, we need
to compute the lower and upper millisecond to retrieve events.
Parameters
----------
ts_start_us: start time in microseconds
ts_end_us: end time in microseconds
Returns
-------
window_start_ms: conservative start time in milliseconds
window_end_ms: conservative end time in milliseconds
"""
assert ts_end_us > ts_start_us
window_start_ms = math.floor(ts_start_us/1000)
window_end_ms = math.ceil(ts_end_us/1000)
return window_start_ms, window_end_ms
@staticmethod
@jit(nopython=True)
def get_time_indices_offsets(
time_array: np.ndarray,
time_start_us: int,
time_end_us: int) -> Tuple[int, int]:
"""Compute index offset of start and end timestamps in microseconds
Parameters
----------
time_array: timestamps (in us) of the events
time_start_us: start timestamp (in us)
time_end_us: end timestamp (in us)
Returns
-------
idx_start: Index within this array corresponding to time_start_us
idx_end: Index within this array corresponding to time_end_us
such that (in non-edge cases)
time_array[idx_start] >= time_start_us
time_array[idx_end] >= time_end_us
time_array[idx_start - 1] < time_start_us
time_array[idx_end - 1] < time_end_us
this means that
time_start_us <= time_array[idx_start:idx_end] < time_end_us
"""
assert time_array.ndim == 1
idx_start = -1
if time_array[-1] < time_start_us:
# This can happen in extreme corner cases. E.g.
# time_array[0] = 1016
# time_array[-1] = 1984
# time_start_us = 1990
# time_end_us = 2000
# Return same index twice: array[x:x] is empty.
return time_array.size, time_array.size
else:
for idx_from_start in range(0, time_array.size, 1):
if time_array[idx_from_start] >= time_start_us:
idx_start = idx_from_start
break
assert idx_start >= 0
idx_end = time_array.size
for idx_from_end in range(time_array.size - 1, -1, -1):
if time_array[idx_from_end] >= time_end_us:
idx_end = idx_from_end
else:
break
assert time_array[idx_start] >= time_start_us
if idx_end < time_array.size:
assert time_array[idx_end] >= time_end_us
if idx_start > 0:
assert time_array[idx_start - 1] < time_start_us
if idx_end > 0:
assert time_array[idx_end - 1] < time_end_us
return idx_start, idx_end
def ms2idx(self, time_ms: int) -> int:
assert time_ms >= 0
if time_ms >= self.ms_to_idx.size:
return None
return self.ms_to_idx[time_ms]