-
-
Notifications
You must be signed in to change notification settings - Fork 1
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
PR on bin file iterator and first event visualization example (#253)
* Added bin file iterator. * Added IteratorBin test * Added example on 2D event visualization. * Added data types.
- Loading branch information
Showing
5 changed files
with
162 additions
and
0 deletions.
There are no files selected for viewing
Binary file not shown.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,37 @@ | ||
# Example using N-Caltech101 https://www.garrickorchard.com/datasets/n-caltech101 | ||
import numpy as np | ||
from pathlib import Path | ||
from evlib.codec import fileformat | ||
from evlib.vis.view2d import events | ||
|
||
# Setup iterator reader | ||
bin_file_path = "./artifacts/sample_data/faces_easy_0001.bin" | ||
ev_iter = fileformat.IteratorBinEvent(bin_file_path) | ||
save_path = Path("./artifacts/results/faces_easy") | ||
save_path.mkdir(parents=True, exist_ok=True) | ||
|
||
# Iterate over the stream of events given a set temporal window | ||
# TODO: This should probably be inside the event iterator | ||
t_current = 0 | ||
t_window = 10e3 # 10 ms, as N-Caltech101 is in microseconds | ||
|
||
for iter_data in ev_iter: | ||
t_max = iter_data['timestamp'][-1] | ||
i = 1 | ||
while t_current < t_max: | ||
# Get t_window ms of the events stream and generate a plot | ||
mask = (iter_data['timestamp'] > t_current) & (iter_data['timestamp'] <= t_current + t_window) | ||
x_ = iter_data['x'][mask] | ||
y_ = iter_data['y'][mask] | ||
t_ = iter_data['t'][mask] | ||
p_ = iter_data['p'][mask] | ||
|
||
evs_np = np.vstack([y_, x_, t_, p_]).T | ||
image = events(evs_np, (ev_iter.size_y, ev_iter.size_x)) | ||
# Show / Save image | ||
img_path = f'{str(save_path)}/{str(i).zfill(6)}.jpg' | ||
# image.show() | ||
image.save(img_path) | ||
|
||
t_current += t_window | ||
i += 1 |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,97 @@ | ||
"""Bin format. | ||
Used in N-Caltech101, N-cars. | ||
""" | ||
import logging | ||
from typing import Any | ||
|
||
import numpy as np | ||
|
||
|
||
logger = logging.getLogger(__name__) | ||
|
||
from ...types import RawEvents | ||
from ._iterator_access import IteratorAccess | ||
|
||
|
||
# TODO make parser abstract and merge these classes. | ||
|
||
|
||
class IteratorBin(IteratorAccess): | ||
FORMAT = "bin" | ||
|
||
def __init__(self, binfile: str) -> None: | ||
# TODO add parse format option | ||
super().__init__(binfile) | ||
# Variables | ||
self.raw_evs = np.zeros((4, 1)) | ||
self.size_x = 0 | ||
self.size_y = 0 | ||
# Initialize variables | ||
self.file = open(self.file_name, 'rb') | ||
self._init_vars_() | ||
|
||
def __iter__(self) -> Any: | ||
self.count = 0 | ||
return self | ||
|
||
def _init_vars_(self) -> None: | ||
# Read event file | ||
self.raw_evs = self.read_ev_file() | ||
# Estimate width and height of sensor given the events in the stream | ||
self.size_x = self.raw_evs[:, 0].max() + 1 | ||
self.size_y = self.raw_evs[:, 1].max() + 1 | ||
|
||
def read_ev_file(self) -> np.ndarray: | ||
# From https://github.com/gorchard/event-Python/blob/master/eventvision.py#L532 | ||
# Change np.fromfile() to use "offset" if needed for future datasets | ||
raw_data = np.fromfile(self.file, dtype=np.uint8) | ||
self.file.close() | ||
raw_data = np.uint32(raw_data) | ||
raw_evs = self._transform_raw_to_evs_(raw_data) | ||
return raw_evs | ||
|
||
@staticmethod | ||
def _transform_raw_to_evs_(raw_data: np.ndarray) -> np.ndarray: | ||
all_y = raw_data[1::5] | ||
all_x = raw_data[0::5] | ||
all_p = (raw_data[2::5] & 128) >> 7 # bit 7 | ||
all_ts = ((raw_data[2::5] & 127) << 16) | (raw_data[3::5] << 8) | (raw_data[4::5]) | ||
|
||
# Process time stamp overflow events | ||
time_increment = 2 ** 13 | ||
overflow_indices = np.where(all_y == 240)[0] | ||
for overflow_index in overflow_indices: | ||
all_ts[overflow_index:] += time_increment | ||
|
||
# Everything else is a proper td spike | ||
td_indices = np.where(all_y != 240)[0] | ||
|
||
x_ = all_x[td_indices] | ||
y_ = all_y[td_indices] | ||
t_ = all_ts[td_indices] | ||
p_ = all_p[td_indices] | ||
|
||
raw_evs = np.vstack([x_, y_, t_, p_]).T | ||
return raw_evs | ||
|
||
|
||
class IteratorBinEvent(IteratorBin): | ||
def __next__(self) -> RawEvents: | ||
""" | ||
Returns: | ||
RawEvents: events | ||
""" | ||
raw_evs = self.raw_evs | ||
_l = len(raw_evs) | ||
|
||
# As we are reading the whole file at once this works. Re-check if we decide to go for chunk reading. | ||
if self.count >= _l: | ||
raise StopIteration | ||
|
||
x = raw_evs[:, 0].astype(np.int32) | ||
y = raw_evs[:, 1].astype(np.int32) | ||
t = raw_evs[:, 2].astype(np.float64) | ||
p = raw_evs[:, 3].astype(bool) | ||
|
||
self.count += _l | ||
return RawEvents(x=x, y=y, timestamp=t, polarity=p) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,27 @@ | ||
import os | ||
import numpy as np | ||
|
||
from evlib.codec import fileformat | ||
from evlib.utils import basics as basic_utils | ||
from ...test_utils.misc import generate_random_event_text_file | ||
|
||
|
||
def test_iterator_bin_event_next_dtype(tmp_path): # type: ignore | ||
tmp_file_path = os.path.join(tmp_path, "event.bin") | ||
|
||
# Ideally we want to save it to a .bin file and load it, but the way N-Caltech builds the files is very convoluted | ||
# We will save a placeholder temporary file instead, and just ignore the loading of it | ||
evs = basic_utils.generate_events(200, 20, 40, 0.1, 0.3) | ||
evs.reshape(1, -1).tofile(tmp_file_path) | ||
iter_bin = fileformat.IteratorBinEvent(tmp_file_path) | ||
|
||
# Instead, ignore the file load and check that the iterator works properly | ||
iter_bin.raw_evs = evs | ||
iter_bin.size_x = iter_bin.raw_evs[:, 0].max() + 1 | ||
iter_bin.size_y = iter_bin.raw_evs[:, 1].max() + 1 | ||
|
||
for ev in iter_bin: | ||
assert ev["t"].dtype.type is np.float64 | ||
assert ev["x"].dtype.type is np.int32 | ||
assert ev["y"].dtype.type is np.int32 | ||
assert ev["p"].dtype.type is np.bool_ |