In [1]:
import matplotlib
%matplotlib inline

import numpy as np
import time
import cv2

import os
from lava.magma.core.run_configs import Loihi2SimCfg, Loihi2HwCfg
from lava.magma.core.run_conditions import RunSteps, RunContinuous
from lava.magma.core.decorator import implements, requires, tag
from lava.magma.core.model.py.model import PyLoihiProcessModel
from lava.magma.core.model.py.ports import PyOutPort, PyInPort
from lava.magma.core.model.py.type import LavaPyType
from lava.magma.core.process.ports.ports import OutPort, InPort
from lava.magma.core.process.process import AbstractProcess
from lava.magma.core.process.variable import Var
from lava.magma.core.resources import CPU
from lava.magma.core.sync.protocols.loihi_protocol import LoihiProtocol
from lava.proc.io.sink import RingBuffer

from lava.lib.peripherals.dvs.process import PropheseeCamera
from metavision_core.utils import get_sample
from metavision_sdk_cv import TrailFilterAlgorithm, ActivityNoiseFilterAlgorithm
from metavision_sdk_core import PolarityFilterAlgorithm

from multiprocessing import Pipe
import matplotlib.pyplot as plt
from matplotlib.animation import FuncAnimation
import IPython


from metavision_core.event_io import RawReader
from lava.lib.peripherals.dvs.transform import Compose, MirrorHorizontally, MirrorVertically, MergePolarities, Downsample
from metavision_ml.preprocessing import histo, histo_quantized
from metavision_core.event_io import RawReader

from lava.proc.embedded_io.spike import PyToNxAdapter, NxToPyAdapter
from lava.proc.lif.process import LIF, LogConfig
from lava.proc.dense.process import Dense
from lava.proc.sparse.process import Sparse
import logging
from lava.proc.dense.models import PyDenseModelBitAcc
from lava.proc.lif.models import PyLifModelBitAcc
from lava.proc.sparse.models import PySparseModelBitAcc

from utils import VisProcess, VisUpDownProcess
from scipy.sparse import csr_matrix

  def _viz_events(events, img):



## Download raw event recording

In [2]:
SEQUENCE_FILENAME_RAW = "traffic_monitoring.raw"
# if the file doesn't exist, it will be downloaded from Prophesee's public sample server
get_sample(SEQUENCE_FILENAME_RAW)

reader = RawReader(SEQUENCE_FILENAME_RAW)
height, width = reader.get_size()
del reader

## Basic usage

Initialize PropheseeCamera using the path to the recording, in order to use a live camera, replace the path with an empty string ("").

In [3]:

# Init Processes
frame_input = PropheseeCamera(device=SEQUENCE_FILENAME_RAW,
                              sensor_shape=(height, width),
                              num_output_time_bins=1)

recv = VisProcess(shape=frame_input.s_out.shape)

# Connect
frame_input.s_out.connect(recv.s_in)

# Run
num_steps = 200
run_cfg = Loihi2SimCfg()
run_cnd = RunSteps(num_steps=num_steps)

frame_input.run(condition=run_cnd, run_cfg=run_cfg)
frame_input.stop()

## Apply filters

As you see, the falling items in the recording have a trail. In order to remove the trails, we can apply the TrailFilterAlgorithm from the metavision_sdk. See the metavision documentation for more filters and their usage.

In [4]:
filters = [TrailFilterAlgorithm(width=width, height=height, threshold=100000),
           ActivityNoiseFilterAlgorithm(width=width, height=height, threshold=1000),]


# Init Processes
frame_input = PropheseeCamera(device=SEQUENCE_FILENAME_RAW,
                              filters=filters,
                              sensor_shape=(height, width),
                              num_output_time_bins=1)

recv = VisProcess(shape=frame_input.s_out.shape)

# Connect
frame_input.s_out.connect(recv.s_in)

# Run
num_steps = 200
run_cfg = Loihi2SimCfg()
run_cnd = RunSteps(num_steps=num_steps)

frame_input.run(condition=run_cnd, run_cfg=run_cfg)
frame_input.stop()

## Apply transformations



In [5]:
transformations = Compose(
    [
        Downsample(factor=0.5),
    ]
)

# Init Processes
frame_input = PropheseeCamera(device=SEQUENCE_FILENAME_RAW,
                              transformations=transformations,
                              sensor_shape=(height, width),
                              num_output_time_bins=1)

recv = VisProcess(shape=frame_input.s_out.shape)

# Connect
frame_input.s_out.connect(recv.s_in)

# Run
num_steps = 200
run_cfg = Loihi2SimCfg()
run_cnd = RunSteps(num_steps=num_steps)

frame_input.run(condition=run_cnd, run_cfg=run_cfg)
frame_input.stop()

## Custom transformation and manual output shape

We also provide the option to use tonic transformations on the data. See the tonic documentation to see a complete list and usage. If you need to use custom transformation, the automatic shape determination would not work. In that case, you can specify the output shape manually.

In [6]:
def expand_x_dim(events):
    events['x'] += 500
    return events
    

transformations = Compose(
    [
        expand_x_dim,
    ]
)

# Init Processes
frame_input = PropheseeCamera(device=SEQUENCE_FILENAME_RAW,
                              transformations=transformations,
                              sensor_shape=(height, width),
                              num_output_time_bins=1,
                              out_shape=(1, 2, height, width+500))

recv = VisProcess(shape=frame_input.s_out.shape)

# Connect
frame_input.s_out.connect(recv.s_in)

# Run
num_steps = 200
run_cfg = Loihi2SimCfg()
run_cnd = RunSteps(num_steps=num_steps)

frame_input.run(condition=run_cnd, run_cfg=run_cfg)
frame_input.stop()
                              



## Swipe detector

In the next section, we will create a simple hand gesture recognition model.

In [19]:

from scipy.sparse import csr_matrix

transformations = Compose(
    [
        MirrorHorizontally(height=height),
        MirrorVertically(width=width),
        Downsample(factor=0.3),
        MergePolarities(),
    ]
)


print("Create Cam")
# Init Processes
frame_input = PropheseeCamera(device=SEQUENCE_FILENAME_RAW,
                              transformations=transformations,
                              sensor_shape=(height, width),
                              num_output_time_bins=1)

num_neurons = np.prod(frame_input.s_out.shape)
_, _, scaled_height, scaled_width = frame_input.s_out.shape
flat_shape = (num_neurons, )


w_ff = 1000
w_rec = 5
kernel_size = 5

vth = 1000
du = dv = 2000


print("Create FF weights")

ff_weights = csr_matrix(np.eye(num_neurons) * w_ff)


print("Create down weights")
rec_weights_down = np.zeros((num_neurons, num_neurons)).reshape((scaled_height, scaled_width, scaled_height, scaled_width))
for i in range(scaled_height):
    for j in range(scaled_width):
        rec_weights_down[i+1:i+kernel_size, j-kernel_size:j+kernel_size, i, j-kernel_size:j+kernel_size] = w_rec
rec_weights_down = csr_matrix(rec_weights_down.reshape((num_neurons, num_neurons)))

print("Create up weights")
rec_weights_up = np.zeros((num_neurons, num_neurons)).reshape((scaled_height, scaled_width, scaled_height, scaled_width))
for i in range(scaled_height):
    for j in range(scaled_width):
        rec_weights_up[i-kernel_size:i, j-kernel_size:j+kernel_size, i, j-kernel_size:j+kernel_size] = w_rec
rec_weights_up = csr_matrix(rec_weights_up.reshape((num_neurons, num_neurons)))

print("Create left weights")
rec_weights_left = np.zeros((num_neurons, num_neurons)).reshape((scaled_height, scaled_width, scaled_height, scaled_width))
for i in range(scaled_height):
    for j in range(scaled_width):
        rec_weights_left[i-kernel_size:i+kernel_size, j-kernel_size:j, i-kernel_size:i+kernel_size, j] = w_rec

rec_weights_left = csr_matrix(rec_weights_left.reshape((num_neurons, num_neurons)))

print("Create right weights")
rec_weights_right = np.zeros((num_neurons, num_neurons)).reshape((scaled_height, scaled_width, scaled_height, scaled_width))
for i in range(scaled_height):
    for j in range(scaled_width):
        rec_weights_right[i-kernel_size:i+kernel_size, j+1:j+kernel_size, i-kernel_size:i+kernel_size, j] = w_rec
rec_weights_right = csr_matrix(rec_weights_right.reshape((num_neurons, num_neurons)))



Create Cam
Create FF weights
Create down weights
Create up weights
Create left weights
Create right weights


In [20]:


print("Create down processes")
ff_down = Sparse(weights=ff_weights)
rec_down = Sparse(weights=rec_weights_down)
lif_down = LIF(shape=frame_input.shape,
              du=du,
              dv=dv,
              vth=vth)

print("Create up processes")
ff_up = Sparse(weights=ff_weights)
rec_up = Sparse(weights=rec_weights_up)
lif_up = LIF(shape=frame_input.shape,
              du=du,
              dv=dv,
              vth=vth)

print("Create left processes")
ff_left = Sparse(weights=ff_weights)
rec_left = Sparse(weights=rec_weights_left)
lif_left = LIF(shape=frame_input.shape,
              du=du,
              dv=dv,
              vth=vth)

print("Create right processes")
ff_right = Sparse(weights=ff_weights)
rec_right = Sparse(weights=rec_weights_right)
lif_right = LIF(shape=frame_input.shape,
              du=du,
              dv=dv,
              vth=vth)

recv = VisUpDownProcess(shape=lif_down.s_out.shape)



Create down processes
Create up processes
Create left processes
Create right processes


In [21]:
print("Connect")
# Connect
frame_input.s_out.connect(recv.frame_in)

frame_input.s_out.flatten().connect(ff_down.s_in)
ff_down.a_out.reshape(lif_down.a_in.shape).connect(lif_down.a_in)
lif_down.s_out.flatten().connect(rec_down.s_in)
rec_down.a_out.reshape(lif_down.a_in.shape).connect(lif_down.a_in)
lif_down.s_out.connect(recv.down_in)
lif_down.s_out.connect(recv.down_frame_in)

frame_input.s_out.flatten().connect(ff_up.s_in)
ff_up.a_out.reshape(lif_up.a_in.shape).connect(lif_up.a_in)
lif_up.s_out.flatten().connect(rec_up.s_in)
rec_up.a_out.reshape(lif_up.a_in.shape).connect(lif_up.a_in)
lif_up.s_out.connect(recv.up_in)
lif_up.s_out.connect(recv.up_frame_in)

frame_input.s_out.flatten().connect(ff_left.s_in)
ff_left.a_out.reshape(lif_left.a_in.shape).connect(lif_left.a_in)
lif_left.s_out.flatten().connect(rec_left.s_in)
rec_left.a_out.reshape(lif_left.a_in.shape).connect(lif_left.a_in)
lif_left.s_out.connect(recv.left_in)
lif_left.s_out.connect(recv.left_frame_in)

frame_input.s_out.flatten().connect(ff_right.s_in)
ff_right.a_out.reshape(lif_right.a_in.shape).connect(lif_right.a_in)
lif_right.s_out.flatten().connect(rec_right.s_in)
rec_right.a_out.reshape(lif_right.a_in.shape).connect(lif_right.a_in)
lif_right.s_out.connect(recv.right_in)
lif_right.s_out.connect(recv.right_frame_in)



Connect


In [22]:
print("Run ")
# Run
num_steps = 2000
run_cfg = Loihi2SimCfg(exception_proc_model_map={Dense: PyDenseModelBitAcc, Sparse: PySparseModelBitAcc, LIF: PyLifModelBitAcc})
run_cnd = RunSteps(num_steps=num_steps)

frame_input.run(condition=run_cnd, run_cfg=run_cfg)
frame_input.stop()

Run 
