Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Processes for Event-based data loading and pre-processing #514

Closed
wants to merge 42 commits into from
Closed
Show file tree
Hide file tree
Changes from 4 commits
Commits
Show all changes
42 commits
Select commit Hold shift + click to select a range
452e68e
Adding first round of Event Data Processes
Dec 2, 2022
77dc965
Adding Flattening implementation
Dec 2, 2022
d1ac4f5
flattening proc, pm + all unit tests
j0shcd Dec 2, 2022
5a89aa6
flattening unit tests
j0shcd Dec 2, 2022
9018466
addressed PR comments, still TODOs
j0shcd Dec 5, 2022
18fff7d
addressed remaining PR comments, still TODOs
j0shcd Dec 5, 2022
ddb3bb7
Applied PR comments for other processes
j0shcd Dec 5, 2022
877ed67
Applied PR comments for other tests, still TODOs
j0shcd Dec 5, 2022
093605f
doc strings for AedatDataLoader (+tests), events utils, PR comments
j0shcd Dec 6, 2022
ee81533
restructuring Processes
Dec 7, 2022
57c8929
add beginning of input process
SveaMeyer13 Dec 7, 2022
6b15db9
Merge branch 'dev/event_data_processes' of https://github.com/lava-nc…
SveaMeyer13 Dec 7, 2022
1bd6346
refactored tests
j0shcd Dec 7, 2022
be276bd
WIP
j0shcd Dec 7, 2022
4869e31
Merge branch 'dev/event_data_processes' of https://github.com/lava-nc…
j0shcd Dec 7, 2022
3554e89
refactoring event data processes (WIP)
Dec 7, 2022
fc79a24
Merge branch 'dev/event_data_processes' of https://github.com/lava-nc…
Dec 7, 2022
0c54139
added tests for data loader + b-to-u
j0shcd Dec 7, 2022
1bbcc66
adding complete unit test suite for MaxPooling
Dec 7, 2022
5e48595
Merge branch 'dev/event_data_processes' of https://github.com/lava-nc…
Dec 7, 2022
34e752a
polished docstrings, split tests
Dec 7, 2022
08415dc
Merge branch 'main' into dev/event_data_processes
gkarray Dec 7, 2022
dc02db1
Cleaned up DvStream and tests.
mathisrichter Dec 8, 2022
1581cac
small changes to doc strings + modifications sub_sampling
j0shcd Dec 8, 2022
5fbac8b
Cleaned BinaryToUnary Process and unit tests.
mathisrichter Dec 8, 2022
4ab5d1e
Merge branch 'dev/event_data_processes' of https://github.com/lava-nc…
mathisrichter Dec 8, 2022
082c9c9
Simplified the ProcModel of the BinaryToUnary Process.
mathisrichter Dec 8, 2022
7c4f1b4
Fixed docstring.
mathisrichter Dec 8, 2022
099259f
Reviewed EventsToFrame Process and unit tests.
mathisrichter Dec 8, 2022
3f5ee2d
Reviewed EventsToFrame ProcessModel and unit tests.
mathisrichter Dec 8, 2022
283b296
dv_stream without subsampling
SveaMeyer13 Dec 8, 2022
6d8a4dc
updated dv_stream + tests
j0shcd Dec 9, 2022
2617193
moved encoding method into utils
j0shcd Dec 9, 2022
a6778a4
sub-sampling and polishing other tests
j0shcd Dec 9, 2022
ffac5f6
Reviewed AedatStream unit tests.
mathisrichter Dec 9, 2022
5a8ceb2
Added missing empty line.
mathisrichter Dec 9, 2022
354340a
Merge branch 'dev/event_data_processes' of https://github.com/lava-nc…
mathisrichter Dec 9, 2022
d795530
Merge branch 'main' of https://github.com/lava-nc/lava into dev/event…
mathisrichter Dec 9, 2022
20a85bf
^testtesttest
SveaMeyer13 Dec 15, 2022
b358462
Revert "^testtesttest"
SveaMeyer13 Dec 16, 2022
d1c7e89
Merge branch 'main' into dev/event_data_processes
SveaMeyer13 May 3, 2023
fbe3653
Merge branch 'main' into dev/event_data_processes
SveaMeyer13 May 3, 2023
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitattributes
Original file line number Diff line number Diff line change
@@ -1 +1,2 @@
*.npy* filter=lfs diff=lfs merge=lfs -text
*.aedat4 filter=lfs diff=lfs merge=lfs -text
1 change: 1 addition & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ numpy = "^1.22.2"
scipy = "^1.8.0"
networkx = "<=2.8.7"
asteval = "^0.9.27"
dv = "^1.0.10"
mathisrichter marked this conversation as resolved.
Show resolved Hide resolved

[tool.poetry.dev-dependencies]
bandit = "1.7.2"
Expand Down
137 changes: 137 additions & 0 deletions src/lava/proc/event_data/event_data_loader/aedat_data_loader.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,137 @@
# Copyright (C) 2022 Intel Corporation
# SPDX-License-Identifier: BSD-3-Clause
# See: https://spdx.org/licenses/

import typing as ty

mathisrichter marked this conversation as resolved.
Show resolved Hide resolved
from dv import AedatFile
import numpy as np
import random
from operator import itemgetter
mathisrichter marked this conversation as resolved.
Show resolved Hide resolved
import os

from lava.magma.core.process.process import AbstractProcess
from lava.magma.core.process.ports.ports import OutPort

from lava.magma.core.sync.protocols.loihi_protocol import LoihiProtocol
from lava.magma.core.model.py.ports import PyOutPort
from lava.magma.core.model.py.type import LavaPyType
from lava.magma.core.resources import CPU
from lava.magma.core.decorator import implements, requires
from lava.magma.core.model.py.model import PyLoihiProcessModel


class AedatDataLoader(AbstractProcess):
def __init__(self,
file_path: str,
mathisrichter marked this conversation as resolved.
Show resolved Hide resolved
shape_out: tuple,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This has to be a tuple with a single entry, right? With typing you could do

shape_out: ty.Tuple[int]

Not sure whether that also works with regular types as type annotations.

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

implemented this, it does indeed show a warning if we give a tuple with more than one entry. Do you think the validation step that checks if the shape is invalid can be deleted then?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Leave it in for now. The shape always having to be one-dimensional is a bit odd anyway, so better make sure that users get an error when they do not see this. The warning only shows up in IDEs that support it and will not have any impact on running the code.

seed_sub_sampling: int = None,
mathisrichter marked this conversation as resolved.
Show resolved Hide resolved
**kwargs) -> None:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Remove kwargs if you are not using them.

super().__init__(file_path=file_path,
shape_out=shape_out,
seed_sub_sampling=seed_sub_sampling,
**kwargs)

self._validate_file_path(file_path)
self._validate_shape_out(shape_out)

self.out_port = OutPort(shape=shape_out)

@staticmethod
def _validate_file_path(file_path):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

def _validate_file_path(file_path: str) -> None:

# Checking file extension
if not file_path[-7:] == ".aedat4":
mathisrichter marked this conversation as resolved.
Show resolved Hide resolved
raise ValueError(f"Given file should be an .aedat4 file. "
mathisrichter marked this conversation as resolved.
Show resolved Hide resolved
f"{file_path} given.")

try:
# Checking file size
if os.stat(file_path).st_size > 0:
mathisrichter marked this conversation as resolved.
Show resolved Hide resolved
return file_path
except FileNotFoundError:
# Checking file exists
raise FileNotFoundError(f"File not found. {file_path} given.")
mathisrichter marked this conversation as resolved.
Show resolved Hide resolved

return file_path
mathisrichter marked this conversation as resolved.
Show resolved Hide resolved

@staticmethod
def _validate_shape_out(shape_out):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Missing type annotations

def _validate_shape_out(shape_out: ty.Tuple[int]) -> None:

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

if not isinstance(shape_out[0], int):
raise ValueError(f"Max number of events should be an integer."
f"{shape_out} given.")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In Lava, we usually do not validate types. We use type annotations instead.

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done, implemented every where type annotations were not used

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Keep in mind though that the type annotations are just hints, they do not check anything at runtime (see my other comment above). So, wherever you think it is crucial, I would still add a validation that throws an exception. Otherwise, just leave it out.

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For now if the user inputs an invalid dimension or negative values in the shapes it throws an error. I guess we can keep it like this for now.


if shape_out[0] <= 0:
raise ValueError(f"Max number of events should be positive. "
f"{shape_out} given.")

if len(shape_out) != 1:
raise ValueError(f"Shape of the OutPort should be 1D. "
mathisrichter marked this conversation as resolved.
Show resolved Hide resolved
f"{shape_out} given.")
mathisrichter marked this conversation as resolved.
Show resolved Hide resolved

return shape_out
mathisrichter marked this conversation as resolved.
Show resolved Hide resolved


@implements(proc=AedatDataLoader, protocol=LoihiProtocol)
@requires(CPU)
class AedatDataLoaderPM(PyLoihiProcessModel):
out_port: PyOutPort = LavaPyType(PyOutPort.VEC_SPARSE, int)

def __init__(self, proc_params: dict) -> None:
super().__init__(proc_params)
self._file_path = proc_params["file_path"]
self._shape_out = proc_params["shape_out"]

self._init_aedat_file()
self._frame_shape = (self._file["events"].size_x,
self._file["events"].size_y)

seed_sub_sampling = proc_params["seed_sub_sampling"]
self._random_rng = np.random.default_rng(seed_sub_sampling)

def _init_aedat_file(self) -> None:
self._file = AedatFile(file_name=self._file_path)
self._stream = self._file["events"].numpy()

def run_spk(self) -> None:
events = self._get_next_event_batch()

xs, ys, ps = events['x'], events['y'], events['polarity']
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would move this into the self._encode_data_and_indices method. You don't need the split up data in the run_spk method again.

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done, events is sent into self._encode_data_and_indices as a parameter but we're not sure of the type of "events", will have to look into this further.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks like some sort of dict, maybe typing.Dict[str, int].

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's what we put in for now, yes. I guess it will do, but Ghassen thinks it is some sort of numpy specific type (something like structured arrays?). It shouldn't really matter though as the data is accessed in the same way as in a dict.


data, indices = self._encode_data_and_indices(xs, ys, ps)

if data.shape[0] > self._shape_out[0]:
# If we have more data than our shape allows, subsample
data, indices = self._sub_sample(data, indices)

self.out_port.send(data, indices)

def _get_next_event_batch(self):
try:
events = self._stream.__next__()
except StopIteration:
self._init_aedat_file()
events = self._stream.__next__()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What does this do?

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this makes it so that once we reach the end of the file we loop back to the beginning. Added a comment for clarity.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, nice! This should also be documented later in the docstring, in particular, since this behavior cannot be switched off at the moment. In the future, I would expect a loop: bool flag in the Process but we can do that feature later.

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sounds good!


return events

def _encode_data_and_indices(self,
xs: np.ndarray,
ys: np.ndarray,
ps: np.ndarray) \
-> ty.Tuple[np.ndarray, np.ndarray]:
data = ps
indices = np.ravel_multi_index((xs, ys), self._frame_shape)

return data, indices

def _sub_sample(self,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This seems like a function that could be used for any event-data. Can we make this a function instead that takes data, indices, and a max_events and then does the subsampling independently of the AedatDataLoader class?

You can then also remove the random number generator from the class.

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

_sub_sample has been made a function (sub_sample), that takes data, indices, max_events, and seed_random. It is called in the run_spk of the aedat_data_loader, and should also be available outside the class. Should it stay in aedat_data_loader.py or should it be moved somewhere else?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That will have to be moved somewhere else, maybe a new module lava/src/lava/utils/events.py?

data: np.ndarray,
indices: np.ndarray) \
-> ty.Tuple[np.ndarray, np.ndarray]:
# TODO: print a warning if subsampling, say how much data has been lost
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please add the warning and remove the TODO.

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done, warning is:
warnings.warn(f"Read {data.shape[0]} events. Maximum number of events is {max_events}. " f"Removed {data.shape[0] - max_events} events by subsampling.")

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe it would be nice to add the percentage of removed events to the warning.

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

data_idx_array = np.arange(0, data.shape[0])
sampled_idx = self._random_rng.choice(data_idx_array,
self._shape_out[0],
replace=False)

return data[sampled_idx], indices[sampled_idx]
Original file line number Diff line number Diff line change
@@ -0,0 +1,194 @@
# Copyright (C) 2022 Intel Corporation
# SPDX-License-Identifier: BSD-3-Clause
# See: https://spdx.org/licenses/

import typing as ty

import numpy as np
from scipy import signal
from numpy.lib.stride_tricks import as_strided

from lava.proc.event_data.event_pre_processor.utils import \
DownSamplingMethodDense

from lava.magma.core.process.process import AbstractProcess
from lava.magma.core.process.ports.ports import InPort, OutPort

from lava.magma.core.sync.protocols.loihi_protocol import LoihiProtocol
from lava.magma.core.model.py.ports import PyInPort, PyOutPort
from lava.magma.core.model.py.type import LavaPyType
from lava.magma.core.resources import CPU
from lava.magma.core.decorator import implements, requires
from lava.magma.core.model.py.model import PyLoihiProcessModel


class DownSamplingDense(AbstractProcess):
def __init__(self,
shape_in: tuple,
down_sampling_method: DownSamplingMethodDense,
down_sampling_factor: int,
**kwargs) -> None:
super().__init__(shape_in=shape_in,
down_sampling_method=down_sampling_method,
down_sampling_factor=down_sampling_factor,
**kwargs)

self._validate_shape_in(shape_in)
self._validate_down_sampling_method(down_sampling_method)
self._validate_down_sampling_factor(down_sampling_factor)
# test invalid shape in (negative/decimal values, 1d, 4+d, 3rd dim not 2)
# test for invalid down sampling factor (negative values)
# test for invalid type given to down sampling method

shape_out = (shape_in[0] // down_sampling_factor,
shape_in[1] // down_sampling_factor)
self.in_port = InPort(shape=shape_in)
self.out_port = OutPort(shape=shape_out)

@staticmethod
def _validate_shape_in(shape_in):
if not (len(shape_in) == 2 or len(shape_in) == 3):
raise ValueError(f"shape_in should be 2 or 3 dimensional. "
f"{shape_in} given.")

if not isinstance(shape_in[0], int) or not isinstance(shape_in[1], int):
raise ValueError(f"Width and height of shape_in should be integers."
f"{shape_in} given.")
if len(shape_in) == 3:
if shape_in[2] != 2:
raise ValueError(f"Third dimension of shape_in should be "
f"equal to 2. "
f"{shape_in} given.")

if shape_in[0] <= 0 or shape_in[1] <= 0:
raise ValueError(f"Width and height of shape_in should be positive."
f"{shape_in} given.")

return shape_in

@staticmethod
def _validate_down_sampling_method(down_sampling_method):
if not isinstance(down_sampling_method, DownSamplingMethodDense):
raise (TypeError(
f"Down sampling methods for dense to dense down-sampling need to be "
f"selected using the DownSamplingMethodDense Enum."))
# TODO: mention that it's an enum in error message?

@staticmethod
def _validate_down_sampling_factor(down_sampling_factor):
# TODO: should the down sampling factor be a float or an int?
if not isinstance(down_sampling_factor, int):
raise (ValueError(f"Down sampling factor should be an integer."
f"{down_sampling_factor} given."))

if down_sampling_factor <= 0:
raise ValueError(f"Down sampling factor should be positive."
f"{down_sampling_factor} given.")


@implements(proc=DownSamplingDense, protocol=LoihiProtocol)
@requires(CPU)
class DownSamplingDensePM(PyLoihiProcessModel):
in_port: PyInPort = LavaPyType(PyInPort.VEC_DENSE, int)
out_port: PyOutPort = LavaPyType(PyOutPort.VEC_DENSE, int)

def __init__(self, proc_params: dict) -> None:
super().__init__(proc_params)
self._shape_in = proc_params["shape_in"]
self._down_sampling_method = proc_params["down_sampling_method"]
self._down_sampling_factor = proc_params["down_sampling_factor"]

self._shape_out = (self._shape_in[0] // self._down_sampling_factor,
self._shape_in[1] // self._down_sampling_factor)

def run_spk(self) -> None:
data = self.in_port.recv()

down_sampled_data = self._down_sample(data)

self.out_port.send(down_sampled_data)

def _down_sample(self, data: np.ndarray) -> np.ndarray:
if self._down_sampling_method == DownSamplingMethodDense.SKIPPING:
down_sampled_data = \
self._down_sample_skipping(data,
self._down_sampling_factor,
self._shape_out[0],
self._shape_out[1])

elif self._down_sampling_method == DownSamplingMethodDense.MAX_POOLING:
down_sampled_data = \
self._down_sample_max_pooling(data,
self._down_sampling_factor,
self._shape_out[0],
self._shape_out[1])

elif self._down_sampling_method == DownSamplingMethodDense.CONVOLUTION:
down_sampled_data = \
self._down_sample_convolution(data,
self._down_sampling_factor,
self._shape_out[0],
self._shape_out[1])

else:
# TODO : Remove since validation is taking care of this ?
raise ValueError(f"Unknown down_sample_mode."
f"{self._down_sampling_method=} given.")

return down_sampled_data

@staticmethod
def _down_sample_skipping(data: np.ndarray,
down_sampling_factor: int,
down_sampled_width: int,
down_sampled_height: int) -> np.ndarray:
down_sampled_data = \
data[::down_sampling_factor, ::down_sampling_factor]

down_sampled_data = \
down_sampled_data[:down_sampled_width, :down_sampled_height]

return down_sampled_data

@staticmethod
def _down_sample_max_pooling(data: np.ndarray,
down_sampling_factor: int,
down_sampled_width: int,
down_sampled_height: int) -> np.ndarray:
output_shape = \
((data.shape[0] - down_sampling_factor) // down_sampling_factor + 1,
(data.shape[1] - down_sampling_factor) // down_sampling_factor + 1)

shape_w = (output_shape[0],
output_shape[1],
down_sampling_factor,
down_sampling_factor)
strides_w = (down_sampling_factor * data.strides[0],
down_sampling_factor * data.strides[1],
data.strides[0],
data.strides[1])

down_sampled_data = as_strided(data, shape_w, strides_w)
down_sampled_data = down_sampled_data.max(axis=(2, 3))

# TODO: Is this really needed ?
down_sampled_data = \
down_sampled_data[:down_sampled_width, :down_sampled_height]

return down_sampled_data

@staticmethod
def _down_sample_convolution(data: np.ndarray,
down_sampling_factor: int,
down_sampled_width: int,
down_sampled_height: int) -> np.ndarray:
kernel = np.ones((down_sampling_factor, down_sampling_factor))
data_convolved = signal.convolve2d(data, kernel)

down_sampled_data = \
data_convolved[::down_sampling_factor, ::down_sampling_factor]

down_sampled_data = \
down_sampled_data[:down_sampled_width, :down_sampled_height]

return down_sampled_data