### Packages

In [None]:
import numpy as np
from serial import Serial

### Setup constants

In [None]:
SYS_CLK = np.float32(500)                                              # [MHz] AD9959 system clock frequency
DDS_BIT = 32                                                           #       AD9959 bit precision
DDS_TSTEPS = np.array([0.008, 0.016, 0.032, 0.064,
                       0.128, 0.256, 0.512, 1.024], dtype=np.float32)  # [μs] AD9959 chirp time steps
T_PP = np.float32(0.016)                                               # [μs] AD9959 profile pins transient time

### Input to AD9959 DDS

In [None]:
# AD9959 channels programming (example)
channel_0 = [ {'f0': 60, 't': 10, 'chirp': 1}, 70, None, None, {'f0': 70, 't': 10, 'chirp': -1}, 0]
channel_1 = [ 60, {'f0': 60, 't': 10, 'chirp': 1}, None, None, 80, 0]
channel_2 = [ None, {'f0': 70, 't': 10, 'chirp': 1}, None, 85, None, 0]
channel_3 = [ None, {'f0': 80, 't': 10, 'chirp': -2}, None, {'f0': 80, 't': 10, 'chirp': 1}, 75, 0]

# number of (complete) channel configurations
num_cfg = np.min([len(channel_0), len(channel_1), len(channel_2), len(channel_3)])

### Chirp variables configuration

In [None]:
def decode_aod_freq(ftw):
    """
    Decode AD9959 frequency value (in MHz)
    from input frequency tuning word (FTW).

    Parameters
    ----------
    ftw: uint32
        frequency tuning word

    Returns
    -------
    freq: float32
        frequency value [MHz]
    """
    freq = np.float32(ftw * SYS_CLK / pow(2, DDS_BIT))

    return freq


def encode_dds_ftw(freq):
    """
    Compute AD9959 frequency tuning word (FTW)
    from AOD frequency value (in MHz).

    Parameters
    ----------
    freq: float32
        frequency value [MHz]

    Returns
    -------
    ftw: uint32
        frequency tuning word
    """
    ftw = np.uint32(round(pow(2, DDS_BIT) * freq / SYS_CLK))

    return ftw


def compute_chirp_steps(chirp, t_step_max=DDS_TSTEPS[3]):
    """
    Get optimal chirp step time and step frequency values for the AD9959 DDS
    minimizing the chirp encode error.

    Parameters
    ----------
    chirp: float32
        chirp value [MHz/μs]

    t_step_max: float32
        maximum time step allowed [μs]

    Returns
    -------
    f_step: float32
        ramp frequency step [MHz]

    t_step: float32
        ramp time step [μs]
    """
    # handle 0 chirp values
    if chirp == 0:
        f_step = np.float32(0.0)
        t_step = DDS_TSTEPS[0]

    else:
        # f_step boundaries [MHz]
        f_step_min = np.float32(SYS_CLK / pow(2, DDS_BIT))

        # t_step boundaries [μs]
        t_step_array = DDS_TSTEPS
        t_step_min = DDS_TSTEPS[0]
        f_step_tmp = np.abs(chirp * t_step_min)

        # increase t_step_min until the corresponding f_step_min exceeds the
        # lowest programmable value (i.e. 0.116 Hz)
        while f_step_tmp <= f_step_min:
            if t_step_min <= t_step_max:
                t_step_min = 2 * t_step_min
                f_step_tmp = np.abs(chirp * t_step_min)

        # mask original array
        valid = np.logical_and((t_step_array >= t_step_min), (t_step_array <= t_step_max))
        t_step_array = t_step_array[valid]

        # initialize frequency error array
        df_err = np.zeros(shape=(t_step_array.size,), dtype=np.float32)
        for i in range(t_step_array.size):
            f_step_tmp = chirp * t_step_array[i]
            f_step_enc = decode_aod_freq(encode_dds_ftw(f_step_tmp))
            df_err[i] = np.abs(f_step_enc - f_step_tmp)

        # get best frequency step (minimizing the chirp encode error)
        best_idx = np.argmin(df_err)
        t_step = t_step_array[best_idx]
        f_step = abs(chirp * t_step)

    return f_step, t_step


def compute_chirp_params(chirp_dict):
    """
    Compute frequency ramp parameters for AD9959 programming.

    Parameters
    ----------
    chirp_dict: dict
        dictionary of chirp parameters ('f0', 't', 'chirp')
    freq_start: float32
        ramp start frequency [MHz]

    duration: float32
        ramp duration [μs]

    chirp: float32
        chirp value [MHz/μs]

    Returns
    -------
    freq_start: float32
        ramp start frequency [MHz]

    freq_end: float32
        ramp end frequency [MHz]

    df_chirp: float32
        chirp frequency steps [MHz]

    df_trans: float32
        transient frequency steps [MHz]

    dt_chirp: float32
        chirp time steps [μs]

    dt_trans: float32
        transient time steps [μs]
    """
    # get chirp dictionary keys (convert to single precision)
    freq_start = np.float32(chirp_dict['f0'])
    duration = np.float32(chirp_dict['t'])
    chirp = np.float32(chirp_dict['chirp'])

    # actual chirping phase
    df_chirp, dt_chirp = compute_chirp_steps(chirp)

    # transient phase
    f_sweep = np.abs(chirp * duration)
    chirp_trans = f_sweep / T_PP
    df_trans, dt_trans = compute_chirp_steps(chirp_trans, t_step_max=T_PP)

    # end frequency
    freq_end = freq_start + f_sweep

    return freq_start, freq_end, df_chirp, df_trans, dt_chirp, dt_trans
    

### Serial communication

In [None]:
def is_not_none(data):
    return not isinstance(data, type(None))


def is_dict(data):
    return type(data) is dict


def encode_channel_header(channels, data_string=''):
    """
    Generate channels configuration header.

    Parameters
    ----------
    channels: tuple
        channels configuration (single-tone value, linear sweep dict, or None)

    data_string: str
        data string to be extended

    Returns
    -------
    data_string: str
        extended data string
    """
    # unpack channels configuration tuple
    channel_3, channel_2, channel_1, channel_0 = channels

    # detect updated channels
    active = [is_not_none(channel_3), is_not_none(channel_2),
              is_not_none(channel_1), is_not_none(channel_0)]

    # detect linear sweep channels
    sweep = [is_dict(channel_3), is_dict(channel_2),
             is_dict(channel_1), is_dict(channel_0)]

    # generate header
    header = str(np.packbits(active + sweep)[0])
    data_string = data_string + header

    return data_string


def encode_single_tone(freq, data_string='', sep=','):
    """
    Encode single-tone frequency value into data string
    to be transferred to MCU board via serial port.

    Parameters
    ----------
    freq: float32
        frequency in [MHz]

    data_string: str
        input data string (to be extended)

    sep: str
        separator (default: ',')

    Returns
    ------
    data_string: str
        extended data string
    """
    # convert to single precision
    freq = np.float32(freq)

    # generate data string
    data_string = data_string + str(freq) + sep 

    return data_string


def encode_linear_sweep(freq_start, freq_end, df_chirp, df_trans, dt_chirp, dt_trans, data_string='', sep=','):
    """
    Encode single-tone frequency value into data string
    to be transferred to MCU board via serial port.

    Parameters
    ----------
    freq_start: float32
        ramp start frequency [MHz]

    freq_end: float32
        ramp end frequency [MHz]

    df_chirp: float32
        chirp frequency steps [MHz]

    df_trans: float32
        transient frequency steps [MHz]

    dt_chirp: float32
        chirp time steps [μs]

    dt_trans: float32
        transient time steps [μs]

    data_string: str
        input data string (to be extended)

    sep: str
        separator (default: ',')

    Returns
    ------
    data_string: str
        extended data string
    """
    data_string = data_string + \
                  str(freq_start) + sep + str(freq_end) + sep + \
                  str(df_chirp) + sep + str(df_trans) + sep + \
                  str(dt_chirp) + sep + str(dt_trans) + sep

    return data_string


def send_to_serial(data_string, port=8, baud_rate=2000000):
    """
    Send data string to MCU via serial port.

    Parameters
    ----------
    data_string: string
        encoded data string (transferred to the MCU board)

    port: int
        serial port number

    baud_rate: int
        serial port baud rate

    Returns
    ------
    None
    """
    # configure serial port
    ser = Serial()
    ser.port = 'COM' + str(port)
    ser.baudrate = baud_rate
    ser.open()

    # read instruction strings, send them via serial USB
    for i in range(len(data_string)):
        in_string = data_string[i]
        ser.write(in_string.encode())

    # send terminating string
    term = "END\n"
    ser.write(term.encode())

    # close connection
    ser.flush()
    ser.close()

### Compile and transfer data strings to MCU board

In [None]:
# declare empty list of data strings
serial_string = []

# loop over channels programming lists
for c in range(num_cfg):

    # channels programming tuple
    channels = (channel_3[c], channel_2[c], channel_1[c], channel_0[c])

    # generate channels configuration header
    data_string = encode_channel_header(channels)

    # loop over AD9959 channels
    for cfg in channels:

        # channel to be updated
        if cfg:

            # linear sweep mode
            if is_dict(cfg):
                freq_start, freq_end, df_chirp, df_trans, dt_chirp, dt_trans = \
                    compute_chirp_params(cfg)
                data_string = \
                    encode_linear_sweep(freq_start, freq_end, df_chirp, df_trans, dt_chirp, dt_trans,
                                        data_string=data_string)
            # single-tone mode
            else:
                data_string = encode_single_tone(cfg, data_string='')

    # add terminating char
    data_string = data_string + '\n'

    # append data string to global string to be transferred
    serial_string.append(data_string)

# transfer string to MCU board via serial port
send_to_serial(serial_string)