import asyncio import sys import traceback from typing import TypeVar from grpclib.client import Channel from grpclib.exceptions import GRPCError from nidevice import nidaqmx_grpc T = TypeVar("T") server_address = "localhost" server_port = 31763 physical_channel = "Dev1/ai0" if len(sys.argv) >= 2: server_address = sys.argv[1] if len(sys.argv) >= 3: server_port = int(sys.argv[2]) if len(sys.argv) >= 4: physical_channel = sys.argv[3] async def main(): # Create a gRPC channel + client. channel = Channel(host=server_address, port=server_port) daq_service = nidaqmx_grpc.NiDAQmxStub(channel) task = None async def raise_if_error(response: T) -> T: if response.status != 0: error_message_response = await daq_service.get_error_string( error_code=response.status ) raise Exception(error_message_response.error_string) return response try: task_response = await raise_if_error(await daq_service.create_task()) task = task_response.task print(f"Task created {task=}") await raise_if_error( await daq_service.create_a_i_voltage_chan( task=task, physical_channel=physical_channel, terminal_config_raw=nidaqmx_grpc.InputTermCfgWithDefault.INPUT_TERM_CFG_WITH_DEFAULT_CFG_DEFAULT, min_val=-10.0, max_val=10.0, units_raw=nidaqmx_grpc.VoltageUnits2.VOLTAGE_UNITS2_VOLTS, ) ) await raise_if_error( await daq_service.cfg_samp_clk_timing( task=task, rate=100.0, active_edge_raw=nidaqmx_grpc.Edge1.EDGE1_RISING, sample_mode_raw=nidaqmx_grpc.AcquisitionType.ACQUISITION_TYPE_FINITE_SAMPS, samps_per_chan=1000, ) ) every_n_samples_stream = daq_service.register_every_n_samples_event( task=task, n_samples=100, every_n_samples_event_type_raw=nidaqmx_grpc.EveryNSamplesEventType.EVERY_N_SAMPLES_EVENT_TYPE_ACQUIRED_INTO_BUFFER, ) done_stream = daq_service.register_done_event(task=task) async def read_data(): async for every_n_samples_response in every_n_samples_stream: await raise_if_error(every_n_samples_response) read_response = await raise_if_error( await daq_service.read_analog_f64( task=task, num_samps_per_chan=100, fill_mode_raw=nidaqmx_grpc.GroupBy.GROUP_BY_GROUP_BY_CHANNEL, array_size_in_samps=100, ) ) print( f"Acquired {len(read_response.read_array)} samples", f"({read_response.samps_per_chan_read} samples per channel)", ) print("Read Data (first 10 samples):", read_response.read_array[:10]) print("read_data Finished") async def wait_for_done(): async for done_response in done_stream: await every_n_samples_stream.aclose() await done_stream.aclose() await raise_if_error(done_response) print("Trying to register callbacks") acquisition = asyncio.gather(read_data(), wait_for_done()) print("Sleeping") await asyncio.sleep(2) print("Starting task") await raise_if_error(await daq_service.start_task(task=task)) await acquisition print("Done") except GRPCError as e: print(f"GRPCError: {e}") if e.status.name == "UNIMPLEMENTED": print( "The operation is not implemented or is not supported/enabled on the server" ) traceback.print_exc() except Exception as e: print(f"ERROR: {e}") traceback.print_exc() finally: if task: await daq_service.stop_task(task=task) await daq_service.clear_task(task=task) channel.close() ## Run main loop = asyncio.get_event_loop() future = asyncio.ensure_future(main()) loop.run_until_complete(future)