In [1]:
import dv_processing as dv
import cv2
import numpy as np
import time
import torch

from PIL import Image
from torchvision import transforms
from datetime import timedelta
from models.cnn_model import *

objc[5455]: Class CaptureDelegate is implemented in both /Users/winirrr/pytorch-test/env/lib/python3.9/site-packages/dv_processing.dylibs/libopencv_videoio.4.8.0.dylib (0x109d00880) and /Users/winirrr/pytorch-test/env/lib/libopencv_videoio.4.6.0.dylib (0x11dec9240). One of the two will be used. Which one is undefined.


In [2]:
import dv_processing as dv

cameras = dv.io.discoverDevices()

print(f"Device discovery: found {len(cameras)} devices.")
for camera_name in cameras:
    print(f"Detected device [{camera_name}]")

Device discovery: found 1 devices.
Detected device [DVXplorer_DXA00417]


In [None]:
# Open any camera
capture = dv.io.CameraCapture(cameraName="DVXplorer_DXA00417")


transform = transforms.Compose([
    transforms.Resize((32, 32)),  # Resize the images if needed
    transforms.Grayscale(num_output_channels=1),
    transforms.ToTensor(),         # Convert images to tensor
    transforms.Normalize(mean=[0.485], std=[0.229])  # Normalize
])

# Device agnostic code
device = "cuda" if torch.cuda.is_available() else "cpu"

# Load pre-trained model
loaded_model = LeNet()
loaded_model.load_state_dict(torch.load("/Users/winirrr/Documents/EventBased_Project/models_save/dv_df_11ms_aug_LeNet_0.pth"))
loaded_model.to(device)
loaded_model.eval()

predicted_class = {0:"no_press",
                   1:"press"}

# Initialize an accumulator with some resolution
accumulator = dv.Accumulator(capture.getEventResolution())

# Apply configuration, these values can be modified to taste
accumulator.setMinPotential(0.0)
accumulator.setMaxPotential(1.0)
accumulator.setNeutralPotential(0.5)
accumulator.setEventContribution(0.15)
accumulator.setDecayFunction(dv.Accumulator.Decay.EXPONENTIAL)
accumulator.setDecayParam(1e+6)
accumulator.setIgnorePolarity(False)
accumulator.setSynchronousDecay(True)

slicer = dv.EventStreamSlicer()

def slicing_callback(events: dv.EventStore):
    accumulator.accept(events)
    frame = accumulator.generateFrame()
    preview = cv2.cvtColor(frame.image, cv2.COLOR_GRAY2BGR)


    # Preprocess the frame for the model
    pil_image = Image.fromarray(frame.image)
    input_tensor = transform(pil_image).unsqueeze(0).to(device)

    # Make prediction
    with torch.no_grad():
        output = loaded_model(input_tensor)
        _, predicted = torch.max(output, 1)
        prediction = predicted.item()
        # print(prediction)

    # Put prediction text on the frame
    cv2.putText(preview, f'Prediction: {predicted_class[prediction]}', (10, 30), cv2.FONT_HERSHEY_SIMPLEX, 1, (255, 0, 0), 2, cv2.LINE_AA)
    
    # Show the accumulated image
    cv2.imshow("Preview", preview)
    # cv2.waitKey(2)

slicer.doEveryTimeInterval(timedelta(milliseconds=11), slicing_callback)


# Run the loop while camera is still connected
while capture.isRunning():
    # Read batch of events
    events = capture.getNextEventBatch()
    # The method does not wait for data arrive, it returns immediately with
    # latest available data or if no data is available, returns a `None`
    if events is not None:
        # Print received packet time range
        slicer.accept(events)
    
    if cv2.waitKey(1) & 0xFF == ord('q'):
        break
    
    # else:
    #     # No data has arrived yet, short sleep to reduce CPU load
    #     time.sleep(0.001)


cv2.destroyAllWindows()