In [1]:
import numpy as np
from pythonosc.osc_server import BlockingOSCUDPServer
from pythonosc.dispatcher import Dispatcher
from pythonosc.udp_client import SimpleUDPClient
import struct

# Python OSC Example

This notebook reads EEG data in real time from Bonsai using python-osc. Documentation for python-osc can be found at https://python-osc.readthedocs.io/en/latest/.

To change the type of data being received, write your own handler function and use struct.unpack()

In [2]:
# Port names
ip = '127.0.0.1'
receive_port = 2323
send_port = 7531

# Make sure the expected data shape matches the OpenCV matrix size
data_shape = (3, 256)

In [3]:
# This class packages together the data and handler function
class DataBlock:
    
    def __init__(self, n_rows, n_cols):
        self.data = np.zeros((n_rows, n_cols))
    
    # This is the handler function that will be called for each message
    def data_handler(self, address, byte_array):
        
        float_list = struct.unpack(self.data.size*'f', byte_array)
        self.data = np.array(float_list).reshape(*self.data.shape)

# The default handler prints unhandled messages
def default_handler(address, *args):
    print(f'DEFAULT {address}: {args}')

In [4]:
# Instantiate dispatcher
dispatcher = Dispatcher()

# Setup DataBlock class
data_block = DataBlock(*data_shape)

# Map handler functions to message types
dispatcher.map('/eeg', data_block.data_handler)
dispatcher.set_default_handler(default_handler)

# Setup server and client
server = BlockingOSCUDPServer((ip, receive_port), dispatcher)
client = SimpleUDPClient(ip, send_port)

# Read 20 blocks
for i in range(20):
    server.handle_request()
    # Real-time plotting in Jupyter is hard. Do something here to verify data is being received.
    # print(np.mean(data_block.data))

# Example message send
client.send_message("/number", 1)

NameError: name 'datablock' is not defined