In [None]:
# Importing Libraries
import matplotlib.pyplot as plt
import matplotlib.dates as mdates
import scipy as sp
import numpy as np
import pandas as pd
import time
import datetime
import os
import sys
import pyvisa
import threading
import ipywidgets
import nidaqmx
from scipy import signal
from collections import deque
from nidaqmx.constants import AcquisitionType, TerminalConfiguration
from IPython.display import clear_output, display

print(f"All Libraries imported successfully at {datetime.datetime.now()}")

In [None]:
# Function for connecting to the NI DAQ device via USB
def connect_daq():
    """Function to connect to the DAQ device and return the system object."""
    system = nidaqmx.system.System.local()
    for device in system.devices:
        print(f"Device Name:       {device.name}")
        print(f"Product Category:  {device.product_category}")
        print(f"Product Type:      {device.product_type}")
        ao_chans = [chan.name for chan in device.ao_physical_chans]
        ai_chans = [chan.name for chan in device.ai_physical_chans]
        di_lines = [line.name for line in device.di_lines]
        print(f"Analog Output Channels: {len(ao_chans)}")
        print(f"Analog Input Channels: {len(ai_chans)}")
        print(f"Digital Input Lines: {len(di_lines)}")

connect_daq()
print(f"\nNI DAQ connected successfully at {datetime.datetime.now()}")

In [None]:
# Function to generate different waveform arrays
def generate_waveform(waveform, amplitude, phase_deg, offset, frequency, total_duration, sampling_rate):
    """waveform : 'sine', 'square', 'triangular', 'sawtooth'"""
    # Time vector
    t = np.arange(0, total_duration+(1/sampling_rate), 1/sampling_rate)
    # Phase (convert once; stop mixing degrees and radians in code)
    phase = np.deg2rad(phase_deg)
    if waveform == "sine":
        y = amplitude * np.sin(2*np.pi*frequency*t + phase) + offset
    elif waveform == "square":
        y = amplitude * signal.square(2*np.pi*frequency*t + phase) + offset
    elif waveform == "triangular":
        y = amplitude * signal.sawtooth(2*np.pi*frequency*t + phase, width=0.5) + offset
    elif waveform == "sawtooth":
        y = amplitude * signal.sawtooth(2*np.pi*frequency*t + phase) + offset
    else:
        print("Unsupported waveform type: Please choose from 'sine', 'square', 'triangular', 'sawtooth'.")
        sys.exit()
    return t, y

# Function for ramping voltage safely
def ramp_to_voltage(channel_name, target_voltage):
    """
    Safely ramps voltage to target voltage to avoid sudden jumps.
    """
    # 1. Check present voltage
    dev, chan = channel_name.split('/') # Split device and channel
    monitor_chan = f"{dev}/_{chan}_vs_aognd" # Internal monitoring channel
    with nidaqmx.Task() as read_task:
        read_task.ai_channels.add_ai_voltage_chan(monitor_chan) # Add AI channel for monitoring
        present_val = read_task.read() # Read current voltage

    # 2. Ramp to zero
    step_mv = 0.001  # 1 mV
    # step_time = 0.001 # 1 ms
    steps = int(abs(target_voltage-present_val) / step_mv) # calculate number of steps
    if steps == 0: steps = 1 
    ramp_values = np.linspace(present_val, target_voltage, steps)
    with nidaqmx.Task() as task:
        task.ao_channels.add_ao_voltage_chan(channel_name, min_val=-10.0, max_val=10.0)
        for v in ramp_values:
            task.write(v)
            # time.sleep(step_time)

# Function for reading ao channel value
def read_ao(channel_name):
    dev, chan = channel_name.split('/')
    monitor_chan = f"{dev}/_{chan}_vs_aognd"
    try:
        with nidaqmx.Task() as read_task:
            read_task.ai_channels.add_ai_voltage_chan(monitor_chan)
            current_val = read_task.read()
    except:
        current_val = None
    return current_val

# Function for creating path for saving file in a specific folder
def save_file(folder,filename):
    """It will create the folder if it does not exist and return the full path for saving file in that folder."""
    current_path = os.getcwd()
    folder_path = os.path.join(current_path, folder)
    if not os.path.exists(folder_path):
        os.makedirs(folder_path)
    filename = f"{datetime.datetime.now().strftime('%y%m%d')}_{filename}"
    file_path = os.path.join(folder_path, filename)
    return file_path

# Function for creating path for loading file from a specific folder
def load_file(folder,filename):
    """It will return the full path for loading file from that folder."""
    current_path = os.getcwd()
    if folder == str(datetime.datetime.now().strftime('%y%m%d')):
        filename = f"{datetime.datetime.now().strftime('%y%m%d')}_{filename}"
    else:
        filename = f"{folder}_{filename}"
    file_path = os.path.join(current_path, folder, filename)
    return file_path

In [None]:
# Program for sending waveform to NI DAQ device and measuring the response
# --- single ao channel, single ai channel ---
""" Generate Waveform Data """
waveform = "triangular"  # 'sine', 'square', 'triangular', 'sawtooth'
amplitude = 5.0
phase_deg = 90
offset = 0
frequency = 5.0
total_duration = 1.0
sampling_rate = 10000.0
t_data, y_data = generate_waveform(waveform,amplitude,phase_deg,offset,frequency,total_duration,sampling_rate)
""" Measurement with NI DAQ """
ramp_to_voltage("Dev1/ao1", target_voltage=0)
with nidaqmx.Task() as ao_task, nidaqmx.Task() as ai_task:
    ao_task.ao_channels.add_ao_voltage_chan("Dev1/ao1", min_val=-10.0, max_val=10.0)
    ao_task.timing.cfg_samp_clk_timing(sampling_rate, sample_mode=AcquisitionType.FINITE, samps_per_chan=len(y_data))

    ai_task.ai_channels.add_ai_voltage_chan("Dev1/ai1",terminal_config=TerminalConfiguration.DIFF, min_val=-10.0, max_val=10.0)
    ai_task.timing.cfg_samp_clk_timing(sampling_rate, sample_mode=AcquisitionType.FINITE, samps_per_chan=len(y_data))

    try:
        ai_task.triggers.start_trigger.cfg_dig_edge_start_trig("/Dev1/ao/StartTrigger")
    except Exception as e:
        print(f"Warning: {e}.")

    ao_task.write(y_data, auto_start=False)
    ai_task.start()
    ao_task.start()
    ai_data = ai_task.read(number_of_samples_per_channel=len(y_data),timeout=total_duration+10)
    ao_task.wait_until_done()
ramp_to_voltage("Dev1/ao1", target_voltage=0)
plt.figure(figsize=(20, 6))
plt.plot(t_data, y_data, label='Output Sent (Source)', linestyle='--')
plt.plot(t_data, ai_data, label='Input Read (Measured)', alpha=0.7)
plt.title(f"NI USB-6341: Output-Input ({frequency} Hz, {sampling_rate} samples/s)")
plt.xlabel("Time (s)")
plt.ylabel("Voltage (V)")
plt.legend()
plt.grid(True)
plt.show()

In [None]:
# Program for sending waveform to NI DAQ device and measuring the response
# --- 2 ao channels, 2 ai channels simultaneously ---
""" Generate Waveform Data """
waveform_1 = "triangular"
amplitude_1 = 5.0
phase_deg_1 = 90
offset_1 = 0
frequency_1 = 2.0
waveform_2 = "triangular"
amplitude_2 = 5.0
phase_deg_2 = 90
offset_2 = 0
frequency_2 = 2.0
total_duration = 1.0
sampling_rate = 1000.0
t_data_1, y_data_1 = generate_waveform(waveform_1,amplitude_1,phase_deg_1,offset_1,frequency_1,total_duration,sampling_rate)
t_data_2, y_data_2 = generate_waveform(waveform_2,amplitude_2,phase_deg_2,offset_2,frequency_2,total_duration,sampling_rate)
y_data = np.vstack((y_data_1, y_data_2))
""" Measurement with NI DAQ """
ramp_to_voltage("Dev1/ao0", target_voltage=0)
ramp_to_voltage("Dev1/ao1", target_voltage=0)
with nidaqmx.Task() as ao_task, nidaqmx.Task() as ai_task:
    ao_task.ao_channels.add_ao_voltage_chan("Dev1/ao0:1", min_val=-10.0, max_val=10.0)
    ao_task.timing.cfg_samp_clk_timing(sampling_rate, sample_mode=AcquisitionType.FINITE, samps_per_chan=len(y_data_1))

    ai_task.ai_channels.add_ai_voltage_chan("Dev1/ai0:1",terminal_config=TerminalConfiguration.DIFF, min_val=-10.0, max_val=10.0)
    ai_task.timing.cfg_samp_clk_timing(sampling_rate, sample_mode=AcquisitionType.FINITE, samps_per_chan=len(y_data_1))

    try:
        ai_task.triggers.start_trigger.cfg_dig_edge_start_trig("/Dev1/ao/StartTrigger")
    except Exception as e:
        print(f"Warning: {e}.")

    ao_task.write(y_data, auto_start=False)
    ai_task.start()
    ao_task.start()
    ai_data = ai_task.read(number_of_samples_per_channel=len(y_data_1),timeout=total_duration+10)
    ao_task.wait_until_done()
ramp_to_voltage("Dev1/ao0", target_voltage=0)
ramp_to_voltage("Dev1/ao1", target_voltage=0)
plt.figure(figsize=(20, 6))
plt.plot(t_data_1, y_data_1, label='Output Sent 1 (Source)', linestyle='--')
plt.plot(t_data_2, y_data_2, label='Output Sent 2 (Source)', linestyle='--')
plt.plot(t_data_1, ai_data[0], label='Input Read 1 (Measured)', alpha=0.7)
plt.plot(t_data_2, ai_data[1], label='Input Read 2 (Measured)', alpha=0.7)
plt.title(f"NI USB-6341: Output-Input ({frequency_1} Hz, {sampling_rate} samples/s)")
plt.xlabel("Time (s)")
plt.ylabel("Voltage (V)")
plt.legend()
plt.grid(True)
plt.show()

In [None]:
# Program for recording time trace from NI DAQ device at fixed voltage

set_voltage = 1.0  # Voltage to set on ao channel
total_samples = 2**16  # seconds
sampling_rate = 1000.0  # samples per second
record_duration = total_samples / sampling_rate
ramp_to_voltage("Dev1/ao1", target_voltage=set_voltage)

time_data = np.arange(0, record_duration, 1/sampling_rate) # Time vector
output_data = np.full(total_samples, set_voltage)  # Constant waveform at set voltage

with nidaqmx.Task() as ao_task, nidaqmx.Task() as ai_task:
    ao_task.ao_channels.add_ao_voltage_chan("Dev1/ao1", min_val=-10.0, max_val=10.0)
    ao_task.timing.cfg_samp_clk_timing(sampling_rate, sample_mode=AcquisitionType.FINITE, samps_per_chan=total_samples)

    ai_task.ai_channels.add_ai_voltage_chan("Dev1/ai1",terminal_config=TerminalConfiguration.DIFF, min_val=-10.0, max_val=10.0)
    ai_task.timing.cfg_samp_clk_timing(sampling_rate, sample_mode=AcquisitionType.FINITE, samps_per_chan=total_samples)

    try:
        ai_task.triggers.start_trigger.cfg_dig_edge_start_trig("/Dev1/ao/StartTrigger")
    except Exception as e:
        print(f"Warning: {e}.")

    ao_task.write(output_data, auto_start=False)
    ai_task.start()
    ao_task.start()
    input_data = ai_task.read(number_of_samples_per_channel=total_samples,timeout=record_duration+10)
    ao_task.wait_until_done()
ramp_to_voltage("Dev1/ao1", target_voltage=0.0)

plt.figure(figsize=(20, 6))
plt.plot(time_data, input_data, label='Input Read (Measured)')
plt.title(f"NI USB-6341: Time Trace at {set_voltage} V ({sampling_rate} samples/s)")
plt.xlabel("Time (s)")
plt.ylabel("Voltage (V)")
plt.legend()
plt.grid(True)
plt.show()

In [None]:
def compute_psd(time_data,signal_data,window_type,remove_dc=True):
    # find sampling rate
    sampling_rate = (len(time_data)-1) / (time_data[-1] - time_data[0])
    # remove DC component
    if remove_dc: signal_data = signal_data - np.mean(signal_data)
    # apply numpy window function
    if window_type=='rectangular': window_type='boxcar'
    window = signal.get_window(window_type, len(signal_data))
    signal_windowed = signal_data * window
    # FFT
    amplitude_data = np.fft.rfft(signal_windowed)
    frequency_data = np.fft.rfftfreq(len(signal_data), d=1/sampling_rate)
    # Power Spectral Density (PSD)
    psd_data = (1/(sampling_rate*len(signal_data))) * np.abs(amplitude_data)**2
    psd_data[1:-1] = 2*psd_data[1:-1]  # Multiply by 2 for one-sided PSD

    amplitude_data[1:-1] = 2*amplitude_data[1:-1]  # Multiply by 2 for one-sided amplitude spectrum
    return frequency_data, psd_data
    
frequency_data, psd_data = compute_psd(time_data, input_data, window_type='hann', remove_dc=True)

input_data = np.array(input_data)
frequency_welch,psd_welch = signal.welch(input_data, fs=sampling_rate, nperseg=2**12,window='hann', noverlap=None, detrend='constant', return_onesided=True, scaling='density')
frequency_scipy,psd_scipy = signal.periodogram(input_data, fs=sampling_rate, window='hann', detrend='constant', return_onesided=True, scaling='density')
# Plot FFT
plt.figure(figsize=(20, 6))
plt.loglog(frequency_data, psd_data, label='Custom FFT')
plt.loglog(frequency_scipy, psd_scipy, label='SciPy Periodogram')
plt.loglog(frequency_welch, psd_welch, label='SciPy Welch')
plt.title(f"NI USB-6341: FFT at {set_voltage} V ({sampling_rate} samples/s)")
plt.xlabel("Frequency (Hz)")
plt.ylabel("Power Spectral Density (V^2/Hz)")
plt.legend()
plt.grid(True)
plt.show()

In [None]:

plt.figure(figsize=(20, 6))
plt.loglog(frequency_data, psd_data, color='orange')
plt.loglog(ff, pp)
plt.loglog(f, p, color='red')
plt.title(f"NI USB-6341: PSD at {set_voltage} V ({sampling_rate} samples/s)")
plt.xlabel("Frequency (Hz)")
plt.grid(True)
plt.show()