diff --git a/examples/nidaqmx/nidaqmx_analog_output_voltage/README.md b/examples/nidaqmx/nidaqmx_analog_output_voltage/README.md new file mode 100644 index 00000000..a25d7ac6 --- /dev/null +++ b/examples/nidaqmx/nidaqmx_analog_output_voltage/README.md @@ -0,0 +1,22 @@ +Prerequisites +=============== +Requires a Physical or Simulated Device : https://github.com/ni/nidaqmx-python/blob/master/README.rst (Getting Started Section) + +## Sample + +This is a nipanel example that displays an interactive Streamlit app and updates continuous analog output examples. + +### Feature + +- Supports various data types + +### Required Software + +- Python 3.9 or later + +### Usage + +```pwsh +poetry install --with examples +poetry run examples\nidaqmx\nidaqmx_analog_output_voltage\nidaqmx_analog_output_voltage.py +``` \ No newline at end of file diff --git a/examples/nidaqmx/nidaqmx_analog_output_voltage/nidaqmx_analog_output_voltage.py b/examples/nidaqmx/nidaqmx_analog_output_voltage/nidaqmx_analog_output_voltage.py new file mode 100644 index 00000000..e46dfb12 --- /dev/null +++ b/examples/nidaqmx/nidaqmx_analog_output_voltage/nidaqmx_analog_output_voltage.py @@ -0,0 +1,107 @@ +"""Data acquisition script that continuously generates analog output data.""" + +import time +from pathlib import Path + +import nidaqmx +import nidaqmx.stream_writers +import nidaqmx.system +import numpy as np +from nidaqmx.constants import AcquisitionType, Edge +from nidaqmx.errors import DaqError + +import nipanel + +panel_script_path = Path(__file__).with_name("nidaqmx_analog_output_voltage_panel.py") +panel = nipanel.create_streamlit_panel(panel_script_path) + +system = nidaqmx.system.System.local() + +available_channel_names = [] +for dev in system.devices: + for chan in dev.ao_physical_chans: + available_channel_names.append(chan.name) +panel.set_value("available_channel_names", available_channel_names) + +available_trigger_sources = [""] +for dev in system.devices: + if hasattr(dev, "terminals"): + for term in dev.terminals: + available_trigger_sources.append(term) +panel.set_value("available_trigger_sources", available_trigger_sources) +panel.set_value("run_button", False) +try: + panel.set_value("daq_error", "") + print(f"Panel URL: {panel.panel_url}") + print(f"Waiting for the 'Run' button to be pressed...") + print(f"(Press Ctrl + C to quit)") + while True: + panel.set_value("is_running", False) + while not panel.get_value("run_button", False): + time.sleep(0.1) + # How to use nidaqmx: https://nidaqmx-python.readthedocs.io/en/stable/ + with nidaqmx.Task() as task: + chan = task.ao_channels.add_ao_voltage_chan( + panel.get_value("physical_channel", ""), + max_val=panel.get_value("max_value_voltage", 5.0), + min_val=panel.get_value("min_value_voltage", -5.0), + ) + + sample_rate = panel.get_value("rate", 1000.0) + num_samples = panel.get_value("total_samples", 1000) + frequency = panel.get_value("frequency", 10.0) + amplitude = panel.get_value("amplitude", 1.0) + + task.timing.cfg_samp_clk_timing( + source=panel.get_value("source", ""), # "" - OnboardClock + rate=sample_rate, + sample_mode=AcquisitionType.CONTINUOUS, + ) + # Not all hardware supports all trigger types. + # Refer to your device documentation for more information. + trigger_type = panel.get_value("trigger_type") + + if trigger_type == 2: + task.triggers.start_trigger.cfg_dig_edge_start_trig( + trigger_source=panel.get_value("digital_source", ""), + trigger_edge=panel.get_value("edge", Edge.FALLING), + ) + else: + pass + + panel.set_value("sample_rate", task.timing.samp_clk_rate) + t = np.arange(num_samples) / sample_rate + wave_type = panel.get_value("wave_type", "Sine Wave") + + if wave_type == "Sine Wave": + waveform = amplitude * np.sin(2 * np.pi * frequency * t) + elif wave_type == "Square Wave": + waveform = amplitude * np.sign(np.sin(2 * np.pi * frequency * t)) + else: + waveform = amplitude * (2 * np.abs(2 * (t * frequency % 1) - 1) - 1) + + writer = nidaqmx.stream_writers.AnalogSingleChannelWriter( + task.out_stream, auto_start=False # pyright: ignore[reportArgumentType] + ) + writer.write_many_sample(waveform) + panel.set_value("data", waveform.tolist()) + try: + task.start() + panel.set_value("is_running", True) + panel.set_value("stop_button", False) + while not panel.get_value("stop_button", False): + time.sleep(0.1) + + except KeyboardInterrupt: + break + finally: + task.stop() + panel.set_value("is_running", False) + +except DaqError as e: + daq_error = str(e) + print(daq_error) + panel.set_value("daq_error", daq_error) + +except KeyboardInterrupt: + pass diff --git a/examples/nidaqmx/nidaqmx_analog_output_voltage/nidaqmx_analog_output_voltage_panel.py b/examples/nidaqmx/nidaqmx_analog_output_voltage/nidaqmx_analog_output_voltage_panel.py new file mode 100644 index 00000000..cee0292f --- /dev/null +++ b/examples/nidaqmx/nidaqmx_analog_output_voltage/nidaqmx_analog_output_voltage_panel.py @@ -0,0 +1,205 @@ +"""Streamlit visualization script to display data acquired by nidaqmx_analog_output_voltage.py.""" + +import extra_streamlit_components as stx # type: ignore[import-untyped] +import streamlit as st +from nidaqmx.constants import Edge +from streamlit_echarts import st_echarts + +import nipanel +from nipanel.controls import enum_selectbox + +st.set_page_config(page_title="Voltage - Continuous Output", page_icon="📈", layout="wide") +st.title("Voltage - Continuous Output") +panel = nipanel.get_streamlit_panel_accessor() + +left_col, right_col = st.columns(2) + +st.markdown( + """ + + + """, + unsafe_allow_html=True, +) + + +with left_col: + with st.container(border=True): + is_running = panel.get_value("is_running", False) + if is_running: + st.button("Stop", key="stop_button") + elif not is_running and panel.get_value("daq_error", "") == "": + run_button = st.button("Run", key="run_button") + else: + st.error( + f"There was an error running the script. Fix the issue and re-run nidaqmx_analog_output_voltage.py \n\n {panel.get_value('daq_error', '')}" + ) + + st.title("Channel Settings") + physical_channel = st.selectbox( + options=panel.get_value("available_channel_names", ["Mod2/ai0"]), + index=0, + label="Physical Channels", + disabled=panel.get_value("is_running", False), + ) + panel.set_value("physical_channel", physical_channel) + + max_value_voltage = st.number_input( + "Max Value", + value=5.0, + step=1.0, + disabled=panel.get_value("is_running", False), + key="max_value_voltage", + ) + + min_value_voltage = st.number_input( + "Min Value", + value=-5.0, + step=1.0, + disabled=panel.get_value("is_running", False), + key="min_value_voltage", + ) + st.title("Timing and Buffer Settings") + + source = st.selectbox( + "Sample Clock Source", + options=panel.get_value("available_trigger_sources", [""]), + index=0, + disabled=panel.get_value("is_running", False), + ) + panel.set_value("source", source) + st.number_input( + "Sample Rate", + value=1000.0, + min_value=1.0, + step=1.0, + disabled=panel.get_value("is_running", False), + key="rate", + ) + st.number_input( + "Number of Samples", + value=1000, + min_value=1, + step=1, + disabled=panel.get_value("is_running", False), + key="total_samples", + ) + st.number_input( + "Actual Sample Rate", + value=panel.get_value("actual_sample_rate", 1000.0), + key="actual_sample_rate", + step=1.0, + disabled=True, + ) + st.title("Waveform Settings") + st.number_input( + "Frequency", + value=panel.get_value("frequency", 10.0), + key="frequency", + step=1.0, + disabled=panel.get_value("is_running", False), + ) + st.number_input( + "Amplitude", + value=panel.get_value("amplitude", 1.0), + key="amplitude", + step=1.0, + disabled=panel.get_value("is_running", False), + ) + wave_type = st.selectbox( + label="Wave Type", + options=["Sine Wave", "Triangle Wave", "Square Wave"], + key="wave_type", + disabled=panel.get_value("is_running", False), + ) + panel.set_value("wave_type", wave_type) + +with right_col: + with st.container(border=True): + st.title("Output") + acquired_data = panel.get_value("data", [0.0]) + sample_rate = panel.get_value("sample_rate", 1000.0) + acquired_data_graph = { + "animation": False, + "tooltip": {"trigger": "axis"}, + "legend": {"data": ["Voltage (V)"]}, + "xAxis": { + "type": "category", + "data": [x / sample_rate for x in range(len(acquired_data))], + "name": "Time", + "nameLocation": "center", + "nameGap": 40, + }, + "yAxis": { + "type": "value", + "name": "Amplitude", + "nameRotate": 90, + "nameLocation": "center", + "nameGap": 40, + }, + "series": [ + { + "name": "voltage_amplitude", + "type": "line", + "data": acquired_data, + "emphasis": {"focus": "series"}, + "smooth": True, + "seriesLayoutBy": "row", + }, + ], + } + st_echarts(options=acquired_data_graph, height="400px", key="graph", width="100%") + + with st.container(border=True): + st.title("Trigger Settings") + trigger_type = stx.tab_bar( + data=[ + stx.TabBarItemData(id=1, title="No Trigger", description=""), + stx.TabBarItemData(id=2, title="Digital Start", description=""), + stx.TabBarItemData(id=3, title="Digital Pause", description=""), + stx.TabBarItemData(id=4, title="Digital Reference", description=""), + stx.TabBarItemData(id=5, title="Analog Start", description=""), + stx.TabBarItemData(id=6, title="Analog Pause", description=""), + stx.TabBarItemData(id=7, title="Analog Reference", description=""), + ], + default=1, + ) + trigger_type = int(trigger_type) # pyright: ignore[reportArgumentType] + panel.set_value("trigger_type", trigger_type) + + if trigger_type == 2: + with st.container(border=True): + source = st.selectbox( + "Source:", options=panel.get_value("available_trigger_sources", [""]) + ) + panel.set_value("digital_source", source) + enum_selectbox( + panel, + label="Edge", + value=Edge.FALLING, + disabled=panel.get_value("is_running", False), + key="edge", + ) + elif trigger_type == 1: + with st.container(border=True): + st.write( + "To enable triggers, select a tab above, and configure the settings. Not all hardware supports all trigger types. Refer to your device documentation for more information." + ) + + else: + st.write( + "This trigger type is not supported in output tasks. Refer to your device documentation for more information on which triggers are supported." + )