## Multiple Scan

1


In [11]:
#!/usr/bin/env python
#  -*- coding: utf-8 -*-
"""
    MCC 172 Functions Demonstrated:
        mcc172.trigger_config
        mcc172.iepe_config_write
        mcc172.a_in_clock_config_write
        mcc172.a_in_clock_config_read
        mcc172.a_in_scan_start
        mcc172.a_in_scan_read
        mcc172.a_in_scan_stop

    Purpose:
    Get synchronous data from multiple MCC 172 HAT devices.

    Description:
        This example demonstrates acquiring data synchronously from multiple
        MCC 172 HAT devices.  This is done using the shared clock and
        trigger options.  An external trigger source must be provided to the
        TRIG terminal on the master MCC 172 HAT device.  The EXTTRIGGER scan
        option is set on all devices.
"""
from __future__ import print_function
from sys import stdout, version_info
from math import sqrt
from time import sleep
from daqhats import (hat_list, mcc172, OptionFlags, HatIDs, TriggerModes,
                     HatError, SourceType)
from daqhats_utils import (enum_mask_to_string, chan_list_to_mask,
                           validate_channels)

# Constants
DEVICE_COUNT = 2
MASTER = 0
CURSOR_SAVE = "\x1b[s"
CURSOR_RESTORE = "\x1b[u"
CURSOR_BACK_2 = '\x1b[2D'
ERASE_TO_END_OF_LINE = '\x1b[0K'

def get_iepe():
    """
    Get IEPE enable from the user.
    """

    while True:
        # Wait for the user to enter a response
        message = "IEPE enable [y or n]?  "
        if version_info.major > 2:
            response = input(message)
        else:
            response = raw_input(message)

        # Check for valid response
        if (response == "y") or (response == "Y"):
            return 1
        elif (response == "n") or (response == "N"):
            return 0
        else:
            # Ask again.
            print("Invalid response.")

def main(): # pylint: disable=too-many-locals, too-many-statements, too-many-branches
    """
    This function is executed automatically when the module is run directly.
    """
    hats = []
    # Define the channel list for each HAT device
    chans = [
        {0, 1},
        {0, 1}
    ]
    # Define the options for each HAT device
    options = [
        OptionFlags.EXTTRIGGER,
        OptionFlags.EXTTRIGGER
    ]
    samples_per_channel = 10000
    sample_rate = 10240.0  # Samples per second
    trigger_mode = TriggerModes.RISING_EDGE

    try:
        # Get an instance of the selected hat device object.
        hats = select_hat_devices(HatIDs.MCC_172, DEVICE_COUNT)

        # Validate the selected channels.
        for i, hat in enumerate(hats):
            validate_channels(chans[i], hat.info().NUM_AI_CHANNELS)

        # Turn on IEPE supply?
        iepe_enable = get_iepe()

        for i, hat in enumerate(hats):
            for channel in chans[i]:
                # Configure IEPE.
                hat.iepe_config_write(channel, iepe_enable)
            if hat.address() != MASTER:
                # Configure the slave clocks.
                hat.a_in_clock_config_write(SourceType.SLAVE, sample_rate)
                # Configure the trigger.
                hat.trigger_config(SourceType.SLAVE, trigger_mode)

        # Configure the master clock and start the sync.
        hats[MASTER].a_in_clock_config_write(SourceType.MASTER, sample_rate)
        synced = False
        while not synced:
            (_source_type, actual_rate, synced) = \
                hats[MASTER].a_in_clock_config_read()
            if not synced:
                sleep(0.005)

        # Configure the master trigger.
        hats[MASTER].trigger_config(SourceType.MASTER, trigger_mode)

        print('MCC 172 multiple HAT example using external clock and',
              'external trigger options')
        print('    Functions demonstrated:')
        print('         mcc172.trigger_mode')
        print('         mcc172.a_in_clock_config_write')
        print('         mcc172.a_in_clock_config_read')
        print('         mcc172.a_in_scan_start')
        print('         mcc172.a_in_scan_read')
        print('         mcc172.a_in_scan_stop')
        print('         mcc172.a_in_scan_cleanup')
        print('    IEPE power: ', end='')
        if iepe_enable == 1:
            print('on')
        else:
            print('off')
        print('    Samples per channel:', samples_per_channel)
        print('    Requested Sample Rate: {:.3f} Hz'.format(sample_rate))
        print('    Actual Sample Rate: {:.3f} Hz'.format(actual_rate))
        print('    Trigger type:', trigger_mode.name)

        for i, hat in enumerate(hats):
            print('    HAT {}:'.format(i))
            print('      Address:', hat.address())
            print('      Channels: ', end='')
            print(', '.join([str(chan) for chan in chans[i]]))
            options_str = enum_mask_to_string(OptionFlags, options[i])
            print('      Options:', options_str)

        print('\n*NOTE: Connect a trigger source to the TRIG input terminal on HAT 0.')

        try:
            input("\nPress 'Enter' to continue")
        except (NameError, SyntaxError):
            pass

        # Start the scan.
        for i, hat in enumerate(hats):
            chan_mask = chan_list_to_mask(chans[i])
            hat.a_in_scan_start(chan_mask, samples_per_channel, options[i])

        print('\nWaiting for trigger ... Press Ctrl-C to stop scan\n')

        try:
            # Monitor the trigger status on the master device.
            wait_for_trigger(hats[MASTER])
            # Read and display data for all devices until scan completes
            # or overrun is detected.
            read_and_display_data(hats, chans)

        except KeyboardInterrupt:
            # Clear the '^C' from the display.
            print(CURSOR_BACK_2, ERASE_TO_END_OF_LINE, '\nAborted\n')

    except (HatError, ValueError) as error:
        print('\n', error)

    finally:
        for hat in hats:
            hat.a_in_scan_stop()
            hat.a_in_scan_cleanup()


def wait_for_trigger(hat):
    """
    Monitor the status of the specified HAT device in a loop until the
    triggered status is True or the running status is False.

    Args:
        hat (mcc172): The mcc172 HAT device object on which the status will
            be monitored.

    Returns:
        None

    """
    # Read the status only to determine when the trigger occurs.
    is_running = True
    is_triggered = False
    while is_running and not is_triggered:
        status = hat.a_in_scan_status()
        is_running = status.running
        is_triggered = status.triggered

def calc_rms(data, channel, num_channels, num_samples_per_channel):
    """ Calculate RMS value from a block of samples. """
    value = 0.0
    index = channel
    for _i in range(num_samples_per_channel):
        value += (data[index] * data[index]) / num_samples_per_channel
        index += num_channels

    return sqrt(value)

def read_and_display_data(hats, chans):
    """
    Reads data from the specified channels on the specified DAQ HAT devices
    and updates the data on the terminal display.  The reads are executed in a
    loop that continues until either the scan completes or an overrun error
    is detected.

    Args:
        hats (list[mcc172]): A list of mcc172 HAT device objects.
        chans (list[int][int]): A 2D list to specify the channel list for each
            mcc172 HAT device.

    Returns:
        None

    """
    samples_to_read = 1000
    timeout = 5  # Seconds
    samples_per_chan_read = [0] * DEVICE_COUNT
    total_samples_per_chan = [0] * DEVICE_COUNT
    is_running = True

    # Since the read_request_size is set to a specific value, a_in_scan_read()
    # will block until that many samples are available or the timeout is
    # exceeded.

    # Create blank lines where the data will be displayed
    for _ in range(DEVICE_COUNT * 4 + 1):
        print('')
    # Move the cursor up to the start of the data display.
    print('\x1b[{0}A'.format(DEVICE_COUNT * 4 + 1), end='')
    print(CURSOR_SAVE, end='')

    while True:
        data = [None] * DEVICE_COUNT
        # Read the data from each HAT device.
        for i, hat in enumerate(hats):
            read_result = hat.a_in_scan_read(samples_to_read, timeout)
            data[i] = read_result.data
            is_running &= read_result.running
            samples_per_chan_read[i] = int(len(data[i]) / len(chans[i]))
            total_samples_per_chan[i] += samples_per_chan_read[i]

            if read_result.buffer_overrun:
                print('\nError: Buffer overrun')
                break
            if read_result.hardware_overrun:
                print('\nError: Hardware overrun')
                break

        print(CURSOR_RESTORE, end='')

        # Display the data for each HAT device
        for i, hat in enumerate(hats):
            print('HAT {0}:'.format(i))

            # Print the header row for the data table.
            print('  Samples Read    Scan Count', end='')
            for chan in chans[i]:
                print('     Channel', chan, end='')
            print('')

            # Display the sample count information.
            print('{0:>14}{1:>14}'.format(samples_per_chan_read[i],
                                          total_samples_per_chan[i]), end='')

            # Display the data for all selected channels
            #for chan_idx in range(len(chans[i])):
            #    sample_idx = ((samples_per_chan_read[i] * len(chans[i]))
            #                  - len(chans[i]) + chan_idx)
            #    print('{:>12.5f} V'.format(data[i][sample_idx]), end='')

            # Display the RMS voltage for each channel.
            if samples_per_chan_read[i] > 0:
                for channel in chans[i]:
                    value = calc_rms(data[i], channel, len(chans[i]),
                                     samples_per_chan_read[i])
                    print('{:10.5f}'.format(value), 'Vrms ',
                          end='')
                stdout.flush()
            print('\n')

        #stdout.flush()

        if not is_running:
            break


def select_hat_devices(filter_by_id, number_of_devices):
    """
    This function performs a query of available DAQ HAT devices and determines
    the addresses of the DAQ HAT devices to be used in the example.  If the
    number of HAT devices present matches the requested number of devices,
    a list of all mcc172 objects is returned in order of address, otherwise the
    user is prompted to select addresses from a list of displayed devices.

    Args:
        filter_by_id (int): If this is :py:const:`HatIDs.ANY` return all DAQ
            HATs found.  Otherwise, return only DAQ HATs with ID matching this
            value.
        number_of_devices (int): The number of devices to be selected.

    Returns:
        list[mcc172]: A list of mcc172 objects for the selected devices
        (Note: The object at index 0 will be used as the master).

    Raises:
        HatError: Not enough HAT devices are present.

    """
    selected_hats = []

    # Get descriptors for all of the available HAT devices.
    hats = hat_list(filter_by_id=filter_by_id)
    number_of_hats = len(hats)

    # Verify at least one HAT device is detected.
    if number_of_hats < number_of_devices:
        error_string = ('Error: This example requires {0} MCC 172 HATs - '
                        'found {1}'.format(number_of_devices, number_of_hats))
        raise HatError(0, error_string)
    elif number_of_hats == number_of_devices:
        for i in range(number_of_devices):
            selected_hats.append(mcc172(hats[i].address))
    else:
        # Display available HAT devices for selection.
        for hat in hats:
            print('Address ', hat.address, ': ', hat.product_name, sep='')
        print('')

        for device in range(number_of_devices):
            valid = False
            while not valid:
                input_str = 'Enter address for HAT device {}: '.format(device)
                address = int(input(input_str))

                # Verify the selected address exists.
                if any(hat.address == address for hat in hats):
                    valid = True
                else:
                    print('Invalid address - try again')

                # Verify the address was not previously selected
                if any(hat.address() == address for hat in selected_hats):
                    print('Address already selected - try again')
                    valid = False

                if valid:
                    selected_hats.append(mcc172(address))

    return selected_hats


if __name__ == '__main__':
    # This will only be run when the module is called directly.
    main()


 Addr 0: Error: This example requires 3 MCC 172 HATs - found 1


## Continous Scan

In [2]:
#!/usr/bin/env python
#  -*- coding: utf-8 -*-

"""
    MCC 172 Functions Demonstrated:
        mcc172.iepe_config_write
        mcc172.a_in_clock_config_write
        mcc172.a_in_clock_config_read
        mcc172.a_in_scan_start
        mcc172.a_in_scan_read
        mcc172.a_in_scan_stop
        mcc172.a_in_scan_cleanup

    Purpose:
        Performa a continuous acquisition on 1 or more channels.

    Description:
        Continuously acquires blocks of analog input data for a
        user-specified group of channels until the acquisition is
        stopped by the user.  The RMS voltage for each channel
        is displayed for each block of data received from the device.
"""
from __future__ import print_function
from sys import stdout, version_info
from time import sleep
from math import sqrt
from daqhats import mcc172, OptionFlags, SourceType, HatIDs, HatError
from daqhats_utils import select_hat_device, enum_mask_to_string, \
    chan_list_to_mask

READ_ALL_AVAILABLE = -1

CURSOR_BACK_2 = '\x1b[2D'
ERASE_TO_END_OF_LINE = '\x1b[0K'
IEPE_ENABLE = 1

def get_iepe():
    """
    Get IEPE enable from the user.
    """

def main(): # pylint: disable=too-many-locals, too-many-statements
    """
    This function is executed automatically when the module is run directly.
    """

    # Store the channels in a list and convert the list to a channel mask that
    # can be passed as a parameter to the MCC 172 functions.
    channels = [0, 1]
    channel_mask = chan_list_to_mask(channels)
    num_channels = len(channels)

    samples_per_channel = 0

    options = OptionFlags.CONTINUOUS

    scan_rate = 10240.0

    try:
        # Select an MCC 172 HAT device to use.
        address = select_hat_device(HatIDs.MCC_172)
        hat = mcc172(address)

        print('\nSelected MCC 172 HAT device at address', address)

        # Define IEPE
        iepe_enable = 1

        for channel in channels:
            hat.iepe_config_write(channel, iepe_enable)

        # Configure the clock and wait for sync to complete.
        hat.a_in_clock_config_write(SourceType.LOCAL, scan_rate)

        synced = False
        while not synced:
            (_source_type, actual_scan_rate, synced) = hat.a_in_clock_config_read()
            if not synced:
                sleep(0.005)

        print('\nMCC 172 continuous scan example')
        print('    Functions demonstrated:')
        print('         mcc172.iepe_config_write')
        print('         mcc172.a_in_clock_config_write')
        print('         mcc172.a_in_clock_config_read')
        print('         mcc172.a_in_scan_start')
        print('         mcc172.a_in_scan_read')
        print('         mcc172.a_in_scan_stop')
        print('         mcc172.a_in_scan_cleanup')
        print('    IEPE power: ', end='')
        if iepe_enable == 1:
            print('on')
        else:
            print('off')
        print('    Channels: ', end='')
        print(', '.join([str(chan) for chan in channels]))
        print('    Requested scan rate: ', scan_rate)
        print('    Actual scan rate: ', actual_scan_rate)
        print('    Options: ', enum_mask_to_string(OptionFlags, options))

        try:
            input('\nPress ENTER to continue ...')
        except (NameError, SyntaxError):
            pass


        # Configure and start the scan.
        # Since the continuous option is being used, the samples_per_channel
        # parameter is ignored if the value is less than the default internal
        # buffer size (10000 * num_channels in this case). If a larger internal
        # buffer size is desired, set the value of this parameter accordingly.
        hat.a_in_scan_start(channel_mask, samples_per_channel, options)

        print('Starting scan ... Press Ctrl-C to stop\n')

        # Display the header row for the data table.
        print('Samples Read    Scan Count', end='')
        for chan, item in enumerate(channels):
            print('       Channel ', item, sep='', end='')
        print('')

        try:
            read_and_display_data(hat, num_channels)

        except KeyboardInterrupt:
            # Clear the '^C' from the display.
            print(CURSOR_BACK_2, ERASE_TO_END_OF_LINE, '\n')
            print('Stopping')

            hat.a_in_scan_stop()
            hat.a_in_scan_cleanup()

            # Turn off IEPE supply
            for channel in channels:
                hat.iepe_config_write(channel, 0)

    except (HatError, ValueError) as err:
        print('\n', err)

def calc_rms(data, channel, num_channels, num_samples_per_channel):
    """ Calculate RMS value from a block of samples. """
    value = 0.0
    index = channel
    for _i in range(num_samples_per_channel):
        value += (data[index] * data[index]) / num_samples_per_channel
        index += num_channels

    return sqrt(value)

def read_and_display_data(hat, num_channels):
    """
    Reads data from the specified channels on the specified DAQ HAT devices
    and updates the data on the terminal display.  The reads are executed in a
    loop that continues until the user stops the scan or an overrun error is
    detected.

    Args:
        hat (mcc172): The mcc172 HAT device object.
        num_channels (int): The number of channels to display.

    Returns:
        None

    """
    total_samples_read = 0
    read_request_size = READ_ALL_AVAILABLE

    # When doing a continuous scan, the timeout value will be ignored in the
    # call to a_in_scan_read because we will be requesting that all available
    # samples (up to the default buffer size) be returned.
    timeout = 5.0

    # Read all of the available samples (up to the size of the read_buffer which
    # is specified by the user_buffer_size).  Since the read_request_size is set
    # to -1 (READ_ALL_AVAILABLE), this function returns immediately with
    # whatever samples are available (up to user_buffer_size) and the timeout
    # parameter is ignored.
    while True:
        read_result = hat.a_in_scan_read(read_request_size, timeout)

        # Check for an overrun error
        if read_result.hardware_overrun:
            print('\n\nHardware overrun\n')
            break
        elif read_result.buffer_overrun:
            print('\n\nBuffer overrun\n')
            break

        samples_read_per_channel = int(len(read_result.data) / num_channels)
        total_samples_read += samples_read_per_channel

        print('\r{:12}'.format(samples_read_per_channel),
              ' {:12} '.format(total_samples_read), end='')

        # Display the RMS voltage for each channel.
        if samples_read_per_channel > 0:
            for i in range(num_channels):
                value = calc_rms(read_result.data, i, num_channels,
                                 samples_read_per_channel)
                print('{:10.5f}'.format(value), 'Vrms ',
                      end=''
                      )
            stdout.flush()

            sleep(0.1)

    print('\n')

if __name__ == '__main__':
    main()


Selected MCC 172 HAT device at address 2

MCC 172 continuous scan example
    Functions demonstrated:
         mcc172.iepe_config_write
         mcc172.a_in_clock_config_write
         mcc172.a_in_clock_config_read
         mcc172.a_in_scan_start
         mcc172.a_in_scan_read
         mcc172.a_in_scan_stop
         mcc172.a_in_scan_cleanup
    IEPE power: on
    Channels: 0, 1
    Requested scan rate:  10240.0
    Actual scan rate:  10240.0
    Options:  CONTINUOUS
Starting scan ... Press Ctrl-C to stop

Samples Read    Scan Count       Channel 0       Channel 1
        1145       1289220    0.01649 Vrms    0.01554 Vrms [2D [0K 

Stopping


# FFT_SCAN

In [None]:
from __future__ import print_function
from time import sleep
from sys import version_info
from math import fabs
import numpy
from daqhats import mcc172, OptionFlags, SourceType, HatIDs, HatError
from daqhats_utils import select_hat_device, enum_mask_to_string, \
chan_list_to_mask

CURSOR_BACK_2 = '\x1b[2D'
ERASE_TO_END_OF_LINE = '\x1b[0K'

In [9]:
select_hat_device(HatIDs.MCC_172)


15

In [3]:
#!/usr/bin/env python
#  -*- coding: utf-8 -*-
"""
    MCC 172 Functions Demonstrated:
        mcc172.iepe_config_write
        mcc172.a_in_clock_config_write
        mcc172.a_in_clock_config_read
        mcc172.a_in_scan_start
        mcc172.a_in_scan_read_numpy
        mcc172.a_in_scan_stop

    Purpose:
        Perform finite acquisitions on both channels, calculate the FFTs, and
        display peak information.

    Description:
        Acquires blocks of analog input data for both channels then performs
        FFT calculations to determine the frequency content. The highest
        frequency peak is detected and displayed, along with harmonics. The
        time and frequency data are saved to CSV files.

        This example requires the NumPy library.

"""


def get_iepe():
    """
    Get IEPE enable from the user.
    """

    while True:
        # Wait for the user to enter a response
        message = "IEPE enable [y or n]?  "
        if version_info.major > 2:
            response = input(message)
        else:
            response = raw_input(message)

        # Check for valid response
        if (response == "y") or (response == "Y"):
            return 1
        elif (response == "n") or (response == "N"):
            return 0
        else:
            # Ask again.
            print("Invalid response.")

def main(): # pylint: disable=too-many-locals, too-many-statements
    """
    This function is executed automatically when the module is run directly.
    """

    channels = [0, 1]
    channel_mask = chan_list_to_mask(channels)

    samples_per_channel = 12800
    scan_rate = 51200.0
    options = OptionFlags.DEFAULT

    try:
        # Select an MCC 172 HAT device to use.
        address = select_hat_device(HatIDs.MCC_172)
        hat = mcc172(address)

        print('\nSelected MCC 172 HAT device at address', address)

        # Turn on IEPE supply?
        iepe_enable = get_iepe()

        for channel in channels:
            hat.iepe_config_write(channel, iepe_enable)

        # Configure the clock and wait for sync to complete.
        hat.a_in_clock_config_write(SourceType.LOCAL, scan_rate)

        synced = False
        while not synced:
            (_source_type, actual_scan_rate, synced) = hat.a_in_clock_config_read()
            if not synced:
                sleep(0.005)

        print('\nMCC 172 Multi channel FFT example')
        print('    Functions demonstrated:')
        print('         mcc172.iepe_config_write')
        print('         mcc172.a_in_clock_config_write')
        print('         mcc172.a_in_clock_config_read')
        print('         mcc172.a_in_scan_start')
        print('         mcc172.a_in_scan_read_numpy')
        print('         mcc172.a_in_scan_stop')
        print('         mcc172.a_in_scan_cleanup')
        print('    IEPE power: ', end='')
        if iepe_enable == 1:
            print('on')
        else:
            print('off')
        print('    Channels: ', end='')
        print(', '.join([str(chan) for chan in channels]))
        print('    Requested scan rate: ', scan_rate)
        print('    Actual scan rate: ', actual_scan_rate)
        print('    Samples per channel', samples_per_channel)
        print('    Options: ', enum_mask_to_string(OptionFlags, options))

        try:
            input('\nPress ENTER to continue ...')
        except (NameError, SyntaxError):
            pass

        # Configure and start the scan.
        hat.a_in_scan_start(channel_mask, samples_per_channel, options)

        print('Starting scan ... Press Ctrl-C to stop\n')

        try:
            read_and_display_data(hat, channels, samples_per_channel,
                                  actual_scan_rate)

        except KeyboardInterrupt:
            # Clear the '^C' from the display.
            print(CURSOR_BACK_2, ERASE_TO_END_OF_LINE, '\n')
            hat.a_in_scan_stop()

        hat.a_in_scan_cleanup()

    except (HatError, ValueError) as err:
        print('\n', err)

def window(index, max_index):
    """ Hann window function. """
    return 0.5 - 0.5*numpy.cos(2*numpy.pi*index / max_index)

def window_compensation():
    """ Hann window compensation factor. """
    return 2.0

def calculate_real_fft(data):
    """ Calculate a real-only FFT, returning the spectrum in dBFS. """
    n_samples = len(data)
    in_data = numpy.empty(n_samples, dtype=numpy.float64)

    # Apply the window and normalize the time data.
    max_v = mcc172.info().AI_MAX_RANGE
    for i in range(n_samples):
        in_data[i] = window(i, n_samples) * data[i] / max_v

    # Perform the FFT.
    out = numpy.fft.rfft(in_data)

    # Convert the complex results to real and convert to dBFS.
    spectrum = numpy.empty(len(out), dtype=numpy.float64)

    real_part = numpy.real(out)
    imag_part = numpy.imag(out)
    for i in range(len(out)):
        if i == 0:
            # Don't multiply DC value times 2.
            spectrum[i] = 20*numpy.log10(
                window_compensation() *
                numpy.sqrt(real_part[i] * real_part[i] + imag_part[i] * imag_part[i]) /
                n_samples)
        else:
            spectrum[i] = 20*numpy.log10(
                window_compensation() * 2 *
                numpy.sqrt(real_part[i] * real_part[i] + imag_part[i] * imag_part[i]) /
                n_samples)

    return spectrum

def quadratic_interpolate(bin0, bin1, bin2):
    """
    Interpolate between the bins of an FFT peak to find a more accurate
    frequency.  bin1 is the FFT value at the detected peak, bin0 and bin2
    are the values from adjacent bins above and below the peak. Returns
    the offset value from the index of bin1.
    """
    y_1 = fabs(bin0)
    y_2 = fabs(bin1)
    y_3 = fabs(bin2)

    result = (y_3 - y_1) / (2 * (2 * y_2 - y_1 - y_3))

    return result

def order_suffix(index):
    """ Return the suffix order string. """
    if index == 1:
        return "st"
    elif index == 2:
        return "nd"
    elif index == 3:
        return "rd"

    return "th"

def read_and_display_data(hat, channels, samples_per_channel, scan_rate):
    """
    Wait for all of the scan data, perform an FFT, find the peak frequency,
    and display the frequency information.  Only supports 1 channel in the data.

    Args:
        hat (mcc172): The mcc172 HAT device object.
        samples_per_channel: The number of samples to read for each channel.

    Returns:
        None
    """
    # pylint: disable=too-many-locals

    timeout = 5.0

    # Wait for all the data, and read it as a numpy array.
    read_result = hat.a_in_scan_read_numpy(samples_per_channel, timeout)

    # Check for an overrun error.
    if read_result.hardware_overrun:
        print('\n\nHardware overrun\n')
        return
    elif read_result.buffer_overrun:
        print('\n\nBuffer overrun\n')
        return

    # Separate the data by channel
    read_data = read_result.data.reshape((len(channels), -1), order='F')

    for channel in channels:
        print('===== Channel {}:\n'.format(channel))

        # Calculate the FFT.
        spectrum = calculate_real_fft(read_data[channels.index(channel)])

        # Calculate dBFS and find peak.
        f_i = 0.0
        peak_index = 0
        peak_val = -1000.0

        # Save data to CSV file
        logname = "fft_scan_{}.csv".format(channel)
        logfile = open(logname, "w")
        logfile.write("Time data (V), Frequency (Hz), Spectrum (dBFS)\n")

        for i, spec_val in enumerate(spectrum):
            # Find the peak value and index.
            if spec_val > peak_val:
                peak_val = spec_val
                peak_index = i

            # Save to the CSV file.
            logfile.write("{0:.6f},{1:.3f},{2:.6f}\n".format(
                read_result.data[i], f_i, spec_val))

            f_i += scan_rate / samples_per_channel

        logfile.close()

        # Interpolate for a more precise peak frequency.
        peak_offset = quadratic_interpolate(
            spectrum[peak_index - 1], spectrum[peak_index], spectrum[peak_index + 1])
        peak_freq = ((peak_index + peak_offset) * scan_rate /
                     samples_per_channel)
        print("Peak: {0:.1f} dBFS at {1:.1f} Hz".format(peak_val, peak_freq))

        # Find and display harmonic levels.
        i = 2
        h_freq = 0
        nyquist = scan_rate / 2.0
        while (i < 8) and (h_freq <= nyquist):
            # Stop when frequency exceeds Nyquist rate or at the 8th harmonic.
            h_freq = peak_freq * i
            if h_freq <= nyquist:
                h_index = int(h_freq * samples_per_channel / scan_rate + 0.5)
                h_val = spectrum[h_index]
                print("{0:d}{1:s} harmonic: {2:.1f} dBFS at {3:.1f} Hz".format(
                    i, order_suffix(i), h_val, h_freq))
            i += 1

        print('Data and FFT saved in {}\n'.format(logname))

if __name__ == '__main__':
    main()


Selected MCC 172 HAT device at address 2

MCC 172 Multi channel FFT example
    Functions demonstrated:
         mcc172.iepe_config_write
         mcc172.a_in_clock_config_write
         mcc172.a_in_clock_config_read
         mcc172.a_in_scan_start
         mcc172.a_in_scan_read_numpy
         mcc172.a_in_scan_stop
         mcc172.a_in_scan_cleanup
    IEPE power: on
    Channels: 0, 1
    Requested scan rate:  51200.0
    Actual scan rate:  51200.0
    Samples per channel 12800
    Options:  DEFAULT
Starting scan ... Press Ctrl-C to stop

===== Channel 0:

Peak: -49.4 dBFS at 1014.1 Hz
2nd harmonic: -56.0 dBFS at 2028.3 Hz
3rd harmonic: -62.9 dBFS at 3042.4 Hz
4th harmonic: -68.6 dBFS at 4056.6 Hz
5th harmonic: -75.3 dBFS at 5070.7 Hz
6th harmonic: -79.4 dBFS at 6084.9 Hz
7th harmonic: -86.1 dBFS at 7099.0 Hz
Data and FFT saved in fft_scan_0.csv

===== Channel 1:

Peak: -49.9 dBFS at 1014.1 Hz
2nd harmonic: -56.4 dBFS at 2028.3 Hz
3rd harmonic: -63.3 dBFS at 3042.4 Hz
4th harmonic: -

# Custom

## Debug: # of hats

In [17]:
from daqhats import hat_list

# Number of recognized hats (172)
hats = hat_list(HatIDs.MCC_172)
number_of_hats = len(hats)
print(number_of_hats)

1


## Read to File

### Imports

In [1]:
from __future__ import print_function
from sys import stdout, version_info
import csv
import os
from datetime import datetime
from time import sleep
from daqhats import mcc172, OptionFlags, SourceType, HatIDs, HatError
from daqhats_utils import select_hat_device, enum_mask_to_string, \
    chan_list_to_mask

READ_ALL_AVAILABLE = -1
IEPE_ENABLE = 1

CURSOR_BACK_2 = '\x1b[2D'
ERASE_TO_END_OF_LINE = '\x1b[0K'

### Main

In [9]:
def main(): # pylint: disable=too-many-locals, too-many-statements
    """
    This function is executed automatically when the module is run directly.
    """

    # Store the channels in a list and convert the list to a channel mask that
    # can be passed as a parameter to the MCC 172 functions.
    channels = [0, 1]
    channel_mask = chan_list_to_mask(channels)
    num_channels = len(channels)

    samples_per_channel = 0

    options = OptionFlags.CONTINUOUS

    scan_rate = 10240.0

    try:
        # Select an MCC 172 HAT device to use.
        address = select_hat_device(HatIDs.MCC_172)
        hat = mcc172(address)

        print('\nSelected MCC 172 HAT device at address', address)

        # Turn on IEPE supply?
        iepe_enable = IEPE_ENABLE

        for channel in channels:
            hat.iepe_config_write(channel, iepe_enable)

        # Configure the clock and wait for sync to complete.
        hat.a_in_clock_config_write(SourceType.LOCAL, scan_rate)

        synced = False
        while not synced:
            (_source_type, actual_scan_rate, synced) = hat.a_in_clock_config_read()
            if not synced:
                sleep(0.005)

        print('\nMCC 172 continuous scan example')
        print('    IEPE power: ', end='')
        if iepe_enable == 1:
            print('on')
        else:
            print('off')
        print('    Channels: ', end='')
        print(', '.join([str(chan) for chan in channels]))
        print('    Requested scan rate: ', scan_rate)
        print('    Actual scan rate: ', actual_scan_rate)
        print('    Options: ', enum_mask_to_string(OptionFlags, options))

        # Configure and start the scan.
        # Since the continuous option is being used, the samples_per_channel
        # parameter is ignored if the value is less than the default internal
        # buffer size (10000 * num_channels in this case). If a larger internal
        # buffer size is desired, set the value of this parameter accordingly.
        hat.a_in_scan_start(channel_mask, samples_per_channel, options)

        print('Starting scan ... Press Ctrl-C to stop\n')

        # Display the header row for the data table.
        print('Samples Read    Scan Count', end='')
        for chan, item in enumerate(channels):
            print('       Channel ', item, sep='', end='')
        print('')

        ## Define saved filename
        # Local Folder
        filename = 'data.csv'
        # Boot Partition
        #filename = '/boot/data/data.csv'
        if os.path.exists(filename):
            count = 1
            base_filename, ext = os.path.splitext(filename)
            while os.path.exists(f"{base_filename}_{count}{ext}"):
                count += 1
            filename = f"{base_filename}_{count}{ext}"

        try:
            read_and_display_data(hat, num_channels, filename)

        except KeyboardInterrupt:
            # Clear the '^C' from the display.
            print(CURSOR_BACK_2, ERASE_TO_END_OF_LINE, '\n')
            print('Stopping')

            hat.a_in_scan_stop()
            hat.a_in_scan_cleanup()

            # Turn off IEPE supply
            for channel in channels:
                hat.iepe_config_write(channel, 0)

    except (HatError, ValueError) as err:
        print('\n', err)
        

def return_value(data, channel, num_channels, num_samples_per_channel):
    value = 0.0
    index = channel
    for _i in range(num_samples_per_channel):
        value += (data[index] * data[index]) / num_samples_per_channel
        index += num_channels

    return value

# Updated write_to_csv function with header row
def write_to_csv(filename, data, num_channels, num_samples_per_channel):
    # Check if the file exists; if not, write the header row
    file_exists = os.path.exists(filename)
    with open(filename, mode='a', newline='') as csv_file:
        csv_writer = csv.writer(csv_file)

        # Write the header row if the file didn't exist before
        if not file_exists:
            header_row = ['time']
            for channel in range(num_channels):
                header_row.append(f'Channel {channel}')
            csv_writer.writerow(header_row)

        # Write the data rows with timestamp
        index = 0
        for _ in range(num_samples_per_channel):
            timestamp = datetime.now().strftime('%Y-%m-%d %H:%M:%S.%f')
            row = [timestamp]
            for _ in range(num_channels):
                row.append(data[index])
                index += 1
            csv_writer.writerow(row)

def read_and_display_data(hat, num_channels, filename):
    """
    Reads data from the specified channels on the specified DAQ HAT devices
    and updates the data on the terminal display.  The reads are executed in a
    loop that continues until the user stops the scan or an overrun error is
    detected.

    Args:
        hat (mcc172): The mcc172 HAT device object.
        num_channels (int): The number of channels to display.

    Returns:
        None

    """
    total_samples_read = 0
    read_request_size = READ_ALL_AVAILABLE

    # When doing a continuous scan, the timeout value will be ignored in the
    # call to a_in_scan_read because we will be requesting that all available
    # samples (up to the default buffer size) be returned.
    timeout = 5.0

    # Read all of the available samples (up to the size of the read_buffer which
    # is specified by the user_buffer_size).  Since the read_request_size is set
    # to -1 (READ_ALL_AVAILABLE), this function returns immediately with
    # whatever samples are available (up to user_buffer_size) and the timeout
    # parameter is ignored.
    while True:
        read_result = hat.a_in_scan_read(read_request_size, timeout)

        # Check for an overrun error
        if read_result.hardware_overrun:
            print('\n\nHardware overrun\n')
            break
        elif read_result.buffer_overrun:
            print('\n\nBuffer overrun\n')
            break

        samples_read_per_channel = int(len(read_result.data) / num_channels)
        total_samples_read += samples_read_per_channel

        write_to_csv(filename, read_result.data, num_channels, samples_read_per_channel)

        print('\r{:12}'.format(samples_read_per_channel),
              ' {:12} '.format(total_samples_read), end='')

        # Display the RMS voltage for each channel.
        if samples_read_per_channel > 0:
            for i in range(num_channels):
                value = return_value(read_result.data, i, num_channels,
                                 samples_read_per_channel)
                print('{:10.5f}'.format(value), 'V ',
                      end=''
                      )
            stdout.flush()

            sleep(0.1)

    print('\n')
    
if __name__ == '__main__':
    main()


Selected MCC 172 HAT device at address 2

MCC 172 continuous scan example
    IEPE power: on
    Channels: 0, 1
    Requested scan rate:  10240.0
    Actual scan rate:  10240.0
    Options:  CONTINUOUS
Starting scan ... Press Ctrl-C to stop

Samples Read    Scan Count       Channel 0       Channel 1
        1651         33266    0.00032 V    0.00027 V [2D [0K 

Stopping


### Service

### Run