Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[RFC] nidaqmx: Add new version of event registration methods #474

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
223 changes: 212 additions & 11 deletions generated/nidaqmx/task.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,10 @@
import numpy
import warnings
from nidaqmx import utils
from typing import Callable, Optional

import deprecation
import numpy

from nidaqmx import utils
from nidaqmx._task_modules.channels.channel import Channel
from nidaqmx._task_modules.export_signals import ExportSignals
from nidaqmx._task_modules.in_stream import InStream
Expand All @@ -22,7 +25,7 @@
DOChannelCollection)
from nidaqmx.constants import (
AcquisitionType, ChannelType, FillMode, UsageTypeAI, UsageTypeCI, EveryNSamplesEventType,
READ_ALL_AVAILABLE, UsageTypeCO, _Save)
READ_ALL_AVAILABLE, Signal, UsageTypeCO, _Save)
from nidaqmx.error_codes import DAQmxErrors
from nidaqmx.errors import (
DaqError, DaqResourceWarning)
Expand All @@ -48,6 +51,15 @@ class UnsetAutoStartSentinel:
del UnsetAutoStartSentinel


DONE_EVENT_CALLBACK = Callable[[object, int, object], int]
EVERY_N_SAMPLES_EVENT_CALLBACK = Callable[[object, int, int, object], int]
SIGNAL_EVENT_CALLBACK = Callable[[object, int, object], int]

DONE_EVENT_CALLBACK_EX = Callable[["Task", int], None]
EVERY_N_SAMPLES_EVENT_CALLBACK_EX = Callable[["Task", EveryNSamplesEventType, int], None]
SIGNAL_EVENT_CALLBACK_EX = Callable[["Task", Signal], None]


class Task:
"""
Represents a DAQmx Task.
Expand Down Expand Up @@ -622,7 +634,8 @@ def read(self, number_of_samples_per_channel=NUM_SAMPLES_UNSET,

return data.tolist()

def register_done_event(self, callback_method):
@deprecation.deprecated(deprecated_in="0.8.0", details="Use register_done_event_ex instead.")
def register_done_event(self, callback_method: Optional[DONE_EVENT_CALLBACK]) -> None:
"""
Registers a callback function to receive an event when a task stops due
to an error or when a finite acquisition task or finite generation task
Expand Down Expand Up @@ -651,8 +664,12 @@ def register_done_event(self, callback_method):
"""
self._interpreter.register_done_event(self._handle, 0, callback_method, None)

@deprecation.deprecated(
deprecated_in="0.8.0", details="Use register_every_n_samples_acquired_into_buffer_event_ex instead."
)
def register_every_n_samples_acquired_into_buffer_event(
self, sample_interval, callback_method):
self, sample_interval: int, callback_method: Optional[EVERY_N_SAMPLES_EVENT_CALLBACK]
) -> None:
"""
Registers a callback function to receive an event when the specified
number of samples is written from the device to the buffer. This
Expand Down Expand Up @@ -685,11 +702,16 @@ def register_every_n_samples_acquired_into_buffer_event(
Passing None for this parameter unregisters the event callback
function.
"""
self._interpreter.register_every_n_samples_event(self._handle, EveryNSamplesEventType.ACQUIRED_INTO_BUFFER.value,
sample_interval, 0, callback_method, None)
self._interpreter.register_every_n_samples_event(
self._handle, EveryNSamplesEventType.ACQUIRED_INTO_BUFFER.value,
sample_interval, 0, callback_method, None)

@deprecation.deprecated(
deprecated_in="0.8.0", details="Use register_every_n_samples_transferred_from_buffer_event_ex instead."
)
def register_every_n_samples_transferred_from_buffer_event(
self, sample_interval, callback_method):
self, sample_interval: int, callback_method: Optional[EVERY_N_SAMPLES_EVENT_CALLBACK]
) -> None:
"""
Registers a callback function to receive an event when the specified
number of samples is written from the buffer to the device. This
Expand Down Expand Up @@ -722,10 +744,14 @@ def register_every_n_samples_transferred_from_buffer_event(
Passing None for this parameter unregisters the event callback
function.
"""
self._interpreter.register_every_n_samples_event(self._handle, EveryNSamplesEventType.TRANSFERRED_FROM_BUFFER.value,
sample_interval, 0, callback_method, None)
self._interpreter.register_every_n_samples_event(
self._handle, EveryNSamplesEventType.TRANSFERRED_FROM_BUFFER.value,
sample_interval, 0, callback_method, None)

def register_signal_event(self, signal_type, callback_method):
@deprecation.deprecated(deprecated_in="0.8.0", details="Use register_signal_event_ex instead.")
def register_signal_event(
self, signal_type: Signal, callback_method: Optional[SIGNAL_EVENT_CALLBACK]
) -> None:
"""
Registers a callback function to receive an event when the specified
hardware event occurs.
Expand Down Expand Up @@ -756,6 +782,181 @@ def register_signal_event(self, signal_type, callback_method):
"""
self._interpreter.register_signal_event(self._handle, signal_type.value, 0, callback_method, None)

def register_done_event_ex(self, callback_method: Optional[DONE_EVENT_CALLBACK_EX]) -> None:
"""
Registers a callback function to receive an event when a task stops due
to an error or when a finite acquisition task or finite generation task
completes execution. A Done event does not occur when a task is stopped
explicitly, such as by calling DAQmx Stop Task.

Args:
callback_method (function): Specifies the function that you want
DAQmx to call when the event occurs. The function you pass in
this parameter must have the following prototype:

>>> def callback(task: nidaqmx.Task, status: int) -> None:
>>> pass

Upon entry to the callback, the task parameter contains
the task on which the event occurred. The status
parameter contains the status of the task when the event
occurred. If the status value is negative, it indicates an
error. If the status value is zero, it indicates no error.
If the status value is positive, it indicates a warning. The
callbackData parameter contains the value you passed in the
callbackData parameter of this function.

Passing None for this parameter unregisters the event callback
function.
"""
def invoke_callback(task_handle: object, status: int, callback_data: object) -> int:
callback_method(self, status)
return 0

self._interpreter.register_done_event(self._handle, 0, invoke_callback, None)

def register_every_n_samples_acquired_into_buffer_event_ex(
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Make sure to update this example and any others that are registering callbacks to the new versions.

self, sample_interval: int, callback_method: Optional[EVERY_N_SAMPLES_EVENT_CALLBACK_EX]
) -> None:
"""
Registers a callback function to receive an event when the specified
number of samples is written from the device to the buffer. This
function only works with devices that support buffered tasks.

When you stop a task explicitly any pending events are discarded. For
example, if you call DAQmx Stop Task then you do not receive any
pending events.

Args:
sample_interval (int): Specifies the number of samples after
which each event should occur.
callback_method (function): Specifies the function that you want
DAQmx to call when the event occurs. The function you pass in
this parameter must have the following prototype:

>>> def callback(
>>> task: nidaqmx.Task,
>>> every_n_samples_event_type: nidaqmx.constants.EveryNSamplesEventType,
>>> number_of_samples: int
>>> ) -> None:
>>> pass

Upon entry to the callback, the task parameter contains
the task on which the event occurred. The
every_n_samples_event_type parameter contains the
EveryNSamplesEventType.ACQUIRED_INTO_BUFFER value. The
number_of_samples parameter contains the value you passed in
the sample_interval parameter of this function. The
callback_data parameter contains the value you passed in the
callback_data parameter of this function.

Passing None for this parameter unregisters the event callback
function.
"""
def invoke_callback(
task_handle: object,
every_n_samples_event_type: int,
number_of_samples: int,
callback_data: object
) -> int:
callback_method(self, EveryNSamplesEventType(every_n_samples_event_type), number_of_samples)
return 0

self._interpreter.register_every_n_samples_event(
self._handle, EveryNSamplesEventType.ACQUIRED_INTO_BUFFER.value,
sample_interval, 0, invoke_callback, None)

def register_every_n_samples_transferred_from_buffer_event_ex(
self, sample_interval: int, callback_method: Optional[EVERY_N_SAMPLES_EVENT_CALLBACK_EX]
) -> None:
"""
Registers a callback function to receive an event when the specified
number of samples is written from the buffer to the device. This
function only works with devices that support buffered tasks.

When you stop a task explicitly any pending events are discarded. For
example, if you call DAQmx Stop Task then you do not receive any
pending events.

Args:
sample_interval (int): Specifies the number of samples after
which each event should occur.
callback_method (function): Specifies the function that you want
DAQmx to call when the event occurs. The function you pass in
this parameter must have the following prototype:

>>> def callback(
>>> task: nidaqmx.Task,
>>> every_n_samples_event_type: nidaqmx.constants.EveryNSamplesEventType,
>>> number_of_samples: int
>>> ) -> None:
>>> pass

Upon entry to the callback, the task parameter contains
the task on which the event occurred. The
every_n_samples_event_type parameter contains the
EveryNSamplesEventType.TRANSFERRED_FROM_BUFFER value. The
number_of_samples parameter contains the value you passed in
the sample_interval parameter of this function. The
callback_data parameter contains the value you passed in the
callback_data parameter of this function.

Passing None for this parameter unregisters the event callback
function.
"""
def invoke_callback(
task_handle: object,
every_n_samples_event_type: int,
number_of_samples: int,
callback_data: object
) -> int:
callback_method(self, EveryNSamplesEventType(every_n_samples_event_type), number_of_samples)
return 0

self._interpreter.register_every_n_samples_event(
self._handle, EveryNSamplesEventType.TRANSFERRED_FROM_BUFFER.value,
sample_interval, 0, invoke_callback, None)

def register_signal_event_ex(
self, signal_type: Signal, callback_method: Optional[SIGNAL_EVENT_CALLBACK_EX]
) -> None:
"""
Registers a callback function to receive an event when the specified
hardware event occurs.

When you stop a task explicitly any pending events are discarded. For
example, if you call DAQmx Stop Task then you do not receive any
pending events.

Args:
signal_type (nidaqmx.constants.Signal): Specifies the type of
signal for which you want to receive results.
callback_method (function): Specifies the function that you want
DAQmx to call when the event occurs. The function you pass in
this parameter must have the following prototype:

>>> def callback(
>>> task: nidaqmx.Task,
>>> signal_type: nidaqmx.constants.Signal
>>> ) -> None:
>>> pass

Upon entry to the callback, the task parameter contains
the task on which the event occurred. The
signal_type parameter contains the nidaqmx.constants.Signal value you passed in
the signal_type parameter of this function. The callback_data
parameter contains the value you passed in the callback_data
parameter of this function.

Passing None for this parameter unregisters the event callback
function.
"""
def invoke_callback(task_handle: object, signal_type: int, callback_data: object) -> int:
callback_method(self, Signal(signal_type))
return 0

self._interpreter.register_signal_event(self._handle, signal_type.value, 0, invoke_callback, None)

def save(self, save_as="", author="", overwrite_existing_task=False,
allow_interactive_editing=True, allow_interactive_deletion=True):
"""
Expand Down
Loading