In [None]:
#default_exp radar.config_v1

# radar.config_v1

The TI 1443/1843 radar firmware accepts a some commands over the serial port to configure the radar waveform.
This module parses these commands so that we can interperet and process the raw ADC readings correctly.

Futher details on this commands can be found in the [mmWave SDK user guide](https://www.ti.com/tool/MMWAVE-SDK).

*Note:* We do not process all commands, but only command that affect the ADC data. The 1843 supports addtional commands for onboard processing. All these are ignored.

In [None]:
#export
import logging

logger = logging.getLogger()

In [None]:
#export

def read_radar_params(filename):
    """Reads a text file containing serial commands and returns parsed config as a dictionary"""
    with open(filename) as cfg:
        iwr_cmds = cfg.readlines()
        iwr_cmds = [x.strip() for x in iwr_cmds]
        radar_cfg = parse_commands(iwr_cmds)

    logger.debug(radar_cfg)
    return radar_cfg

def parse_commands(commands):
    """Calls the corresponding parser for each command in commands list"""
    cfg = None
    for line in commands:
        try:
            cmd = line.split()[0]
            args = line.split()[1:]
            cfg = command_handlers[cmd](args, cfg)
        except KeyError:
            logger.debug(f'{cmd} is not handled')
        except IndexError:
            logger.debug(f'line is empty "{line}"')
    return cfg

def dict_to_list(cfg):
    """Generates commands from config dictionary"""
    cfg_list = ['flushCfg','dfeDataOutputMode 1']

    # rx antennas/lanes for channel config
    rx_bool = [cfg['rx4'], cfg['rx3'], cfg['rx2'], cfg['rx1']]
    rx_mask = sum(2 ** i for i, v in enumerate(reversed(rx_bool)) if v)
    # number of tx antennas for channel config
    tx_bool = [cfg['tx3'], cfg['tx2'], cfg['tx1']]
    tx_mask = sum(2 ** i for i, v in enumerate(reversed(tx_bool)) if v)
    #print('[NOTE] Azimuth angle can be determined from channel config.') if cfg['tx2'] is True and (cfg['tx1'] or cfg['tx3']) is False else 0
    #print('[NOTE] Azimuth angle can be determined from channel config.') if cfg['tx2'] is False and (cfg['tx1'] or cfg['tx3']) is True else 0
    #print('[NOTE] Elevation and Azimuth angle can be determined from channel config.') if cfg['tx2'] is True and (cfg['tx1'] or cfg['tx3']) else 0
    cfg_list.append('channelCfg %s %s 0' % (rx_mask, tx_mask))  # rx and tx mask

    # adc config
    if cfg['isComplex'] and cfg['image_band']:
        outputFmt = 2
        #print('[NOTE] Complex 2x mode, both Imaginary and Real IF spectrum is filtered and sent to ADC, so\n'
        #      '       if Sampling rate is X, ADC data would include frequency spectrum from -X/2 to X/2.')
    elif cfg['isComplex'] and not cfg['image_band'] == True:
        outputFmt = 1
        #print('[NOTE] Complex 1x mode, Only Real IF Spectrum is filtered and sent to ADC, so if Sampling rate\n'
        #      '       is X, ADC data would include frequency spectrum from 0 to X.')
    else: raise ValueError("Real Data Type Not Supported")
    cfg_list.append('adcCfg 2 %s' % outputFmt)  # 16 bits (mandatory), complex 1x or 2x

    # adc power
    if cfg['adcPower'] =='low':
        power_mode = 1
        #print('[NOTE] The Low power ADC mode limits the sampling rate to half the max value.')
    elif cfg['adcPower'] =='regular': power_mode = 0
    else: raise ValueError("ADC power level Not Supported")
    cfg_list.append('lowPower 0 %s' % power_mode)  # power mode

    # profile configs
    for profile_ii in cfg['profiles']:
        cfg_list.append('profileCfg %s %s %s %s %s %s %s %s %s %s %s %s %s %s'
                % (profile_ii['id'],
                float(profile_ii['start_frequency']/1e9),
                float(profile_ii['idle']/1e-6),
                float(profile_ii['adcStartTime']/1e-6),
                float(profile_ii['rampEndTime']/1e-6),
                int(profile_ii['txPower']),
                int(profile_ii['txPhaseShift']),
                float(profile_ii['freqSlopeConst']/1e12),
                float(profile_ii['txStartTime']/1e-6),
                int(profile_ii['adcSamples']),
                int(profile_ii['adcSampleRate']/1e3),
                int(profile_ii['hpfCornerFreq1']),
                int(profile_ii['hpfCornerFreq2']),
                int(profile_ii['rxGain'])))

    # chirp configs
    for chirp_ii in cfg['chirps']:

        # Check if chirp is referring to valid profile config
        profile_valid = False
        for profile_ii in cfg['profiles']:
            if chirp_ii['profileID'] == profile_ii['id']: profile_valid = True
        if profile_valid is False: raise ValueError("The following profile id used in chirp "
                                                    "is invalid: %i" % chirp_ii['profileID'])
        ###############################################################################################################
        '''
        # check if tx values are valid
        if hamming([chirp_ii['chirptx3'],chirp_ii['chirptx2'],chirp_ii['chirptx1']],
            [cfg['tx3'], cfg['tx2'], cfg['tx1']])*3 > 1:
            raise ValueError("Chirp should have at most one different Tx than channel cfg")
        '''
        ###############################################################################################################
        if chirp_ii['chirpStartIndex'] > chirp_ii['chirpStopIndex']: raise ValueError("Particular chirp start index after chirp stop index")
        tx_bool = [chirp_ii['chirptx3'],chirp_ii['chirptx2'],chirp_ii['chirptx1']]
        tx_mask = sum(2 ** i for i, v in enumerate(reversed(tx_bool)) if v)
        cfg_list.append('chirpCfg %s %s %s %s %s %s %s %s'
                % (chirp_ii['chirpStartIndex'],
                   chirp_ii['chirpStopIndex'],
                   chirp_ii['profileID'],
                   chirp_ii['startFreqVariation'],
                   chirp_ii['slopeVariation'],
                   chirp_ii['idleVariation'],
                   chirp_ii['adcStartVariation'],
                   tx_mask))

    # frame config
    chirpStop = 0
    chirpStart = 511  # max value for chirp start index
    for chirp_ii in cfg['chirps']:
        chirpStop = max(chirpStop, chirp_ii['chirpStopIndex'])
        chirpStart = min(chirpStart,chirp_ii['chirpStartIndex'])
    chirps_len  = chirpStop + 1

    numLoops = cfg['numChirps']/chirps_len
    if chirpStart > chirpStop: raise ValueError("Chirp(s) start index is after chirp stop index")
    if numLoops % 1 != 0: raise ValueError("Number of loops is not integer")
    if numLoops > 255 or numLoops < 1: raise ValueError("Number of loops must be int in [1,255]")

    numFrames = cfg['numFrames'] if 'numFrames' in cfg.keys() else 0  # if zero => inf

    cfg_list.append('frameCfg %s %s %s %s %s 1 0'
            % (chirpStart, chirpStop, int(numLoops), numFrames, 1000/cfg['fps']))

    cfg_list.append('testFmkCfg 0 0 0 1')
    cfg_list.append('setProfileCfg disable ADC disable')
    return cfg_list


def channelStr_to_dict(args, curr_cfg=None):
    """Handler for `channelcfg`"""

    if curr_cfg:
        cfg = curr_cfg
    else:
        cfg = {}

    # This is the number of receivers which is equivalent to the number of lanes in the source code
    # Later, may include the result from the number of transmitters
    rx_bin = bin(int(args[0]))[2:].zfill(4)
    cfg['numLanes'] = len([ones for ones in rx_bin if ones == '1'])
    (cfg['rx4'],cfg['rx3'],cfg['rx2'],cfg['rx1']) = [bool(int(ones)) for ones in rx_bin]

    # This is the number of transmitters
    tx_bin = bin(int(args[1]))[2:].zfill(3)
    cfg['numTx'] = len([ones for ones in tx_bin if ones == '1'])
    (cfg['tx3'], cfg['tx2'], cfg['tx1']) = [bool(int(ones)) for ones in tx_bin]
    #print('[NOTE] Azimuth angle can be determined from channel config.') if cfg['tx2'] is True and (cfg['tx1'] or cfg['tx3']) is False else 0
    #print('[NOTE] Azimuth angle can be determined from channel config.') if cfg['tx2'] is False and (cfg['tx1'] or cfg['tx3']) is True else 0
    #print('[NOTE] Elevation and Azimuth angle can be determined from channel config.') if cfg['tx2'] is True and (cfg['tx1'] or cfg['tx3']) else 0


    return cfg


def profileStr_to_dict(args, curr_cfg=None):
    """Handler for `profileCfg`"""
    normalizer = [None, 1e9, 1e-6, 1e-6, 1e-6, None, None, 1e12, 1e-6, None, 1e3, None, None, None]
    dtype = [int, float, float, float, float, float, float, float, float, int, float, int, int, float]
    keys = ['id',
            'start_frequency',
            'idle',
            'adcStartTime',
            'rampEndTime',
            'txPower',
            'txPhaseShift',
            'freqSlopeConst',
            'txStartTime',
            'adcSamples',
            'adcSampleRate',
            'hpfCornerFreq1',
            'hpfCornerFreq2',
            'rxGain',
            ]
    # Check if the main dictionary exists
    if curr_cfg:
        cfg = curr_cfg
        if 'profiles' not in cfg.keys():
            cfg['profiles']=[]
    else:
        cfg = {'profiles': []}

    profile_dict = {}
    for k, v, n, d in zip(keys, args, normalizer, dtype):
        profile_dict[k] = d(float(v) * n if n else v)

    cfg['profiles'].append(profile_dict)
    return cfg


def chirp_to_dict(args,curr_cfg=None):
    """Handler for `chirpCfg`"""
    if curr_cfg:
        cfg = curr_cfg
        if 'chirps' not in cfg.keys():
            cfg['chirps'] = []
    else:
        cfg = {'chirps': []}

    chirp_dict = {}
    chirp_dict['chirpStartIndex'] = int(args[0])
    chirp_dict['chirpStopIndex'] = int(args[1])
    chirp_dict['profileID'] = int(args[2])
    chirp_dict['startFreqVariation'] = float(args[3])
    chirp_dict['slopeVariation'] = float(args[4])
    chirp_dict['idleVariation'] = float(args[5])
    chirp_dict['adcStartVariation'] = float(args[6])

    tx_bin = bin(int(args[7]))[2:].zfill(3)
    (chirp_dict['chirptx3'], chirp_dict['chirptx2'], chirp_dict['chirptx1']) = [bool(int(ones)) for ones in tx_bin]

    cfg['chirps'].append(chirp_dict)
    return cfg


def power_to_dict(args,curr_cfg=None):
    """handler for `lowPower`"""
    if curr_cfg:
        cfg = curr_cfg
    else:
        cfg = {}
    if int(args[1]) ==1:
        cfg['adcPower'] = 'low'
        #print('[NOTE] The Low power ADC mode limits the sampling rate to half the max value.')
    elif int(args[1]) ==0:
        cfg['adcPower'] = 'regular'
    else:
        raise ValueError ("Invalid Power Level")
    return cfg


def frameStr_to_dict(args, cfg):
    """Handler for `frameCfg`"""

    # Number of chirps
    if 'chirps' not in cfg.keys():
        raise ValueError("Need to define chirps before frame")

    chirpStop =0
    for ii in range(len(cfg['chirps'])):
        chirpStop = max(chirpStop,cfg['chirps'][ii]['chirpStopIndex'])
    chirps_len = chirpStop + 1

    cfg['numChirps'] = int(args[2]) * chirps_len  # num loops * len(chirps)
    if int(args[3]) != 0: cfg['numFrames'] = int(args[3])

    # args[4] is the time in milliseconds of each frame
    cfg['fps'] = 1000/float(args[4])


    return cfg


def adcStr_to_dict(args, curr_cfg=None):
    """Handler for `adcCfg`"""
    if curr_cfg:
        cfg = curr_cfg
    else:
        cfg = {}

    if int(args[1]) == 1:
        cfg['isComplex'] = True
        cfg['image_band'] = False
        #print('[NOTE] Complex 1x mode, Only Real IF Spectrum is filtered and sent to ADC, so if Sampling rate\n'
        #      '       is X, ADC data would include frequency spectrum from 0 to X.')
    elif int(args[1]) == 2:
        cfg['isComplex'] = True
        cfg['image_band'] = True
        #print('[NOTE] Complex 2x mode, both Imaginary and Real IF spectrum is filtered and sent to ADC, so\n'
        #      '       if Sampling rate is X, ADC data would include frequency spectrum from -X/2 to X/2.')
    else:
        raise ValueError("Real Data Type Not Supported")

    return cfg

#Mapping of serial command to command handler
command_handlers = {
    'channelCfg': channelStr_to_dict,
    'profileCfg': profileStr_to_dict,
    'chirpCfg': chirp_to_dict,
    'frameCfg': frameStr_to_dict,
    'adcCfg': adcStr_to_dict,
    'lowPower': power_to_dict,
}

# Example Usage

For a text file with the following commands:
```
flushCfg
dfeDataOutputMode 1
channelCfg 15 5 0
adcCfg 2 1
lowPower 0 0
profileCfg 0 77.0 58.0 7.0 40.0 0 0 100.0 1.0 304 9499 0 0 30
chirpCfg 0 0 0 0.0 0.0 0.0 0.0 1
chirpCfg 1 1 0 0.0 0.0 0.0 0.0 4
frameCfg 0 1 32 0 33.333 1 0
testFmkCfg 0 0 0 1
setProfileCfg disable ADC disable
sensorStart
```

In [None]:
#hide
radar_config_filename = '../samples/indoor_human_rcs.cfg'

In [None]:
read_radar_params(radar_config_filename)

{'numLanes': 4,
 'rx4': True,
 'rx3': True,
 'rx2': True,
 'rx1': True,
 'numTx': 2,
 'tx3': True,
 'tx2': False,
 'tx1': True,
 'isComplex': True,
 'image_band': False,
 'profiles': [{'id': 0,
   'start_frequency': 77000000000.0,
   'idle': 5.8e-05,
   'adcStartTime': 7e-06,
   'rampEndTime': 3.9999999999999996e-05,
   'txPower': 0.0,
   'txPhaseShift': 0.0,
   'freqSlopeConst': 100000000000000.0,
   'txStartTime': 1e-06,
   'adcSamples': 304,
   'adcSampleRate': 9499000.0,
   'hpfCornerFreq1': 0,
   'hpfCornerFreq2': 0,
   'rxGain': 30.0}],
 'chirps': [{'chirpStartIndex': 0,
   'chirpStopIndex': 0,
   'profileID': 0,
   'startFreqVariation': 0.0,
   'slopeVariation': 0.0,
   'idleVariation': 0.0,
   'adcStartVariation': 0.0,
   'chirptx3': False,
   'chirptx2': False,
   'chirptx1': True},
  {'chirpStartIndex': 1,
   'chirpStopIndex': 1,
   'profileID': 0,
   'startFreqVariation': 0.0,
   'slopeVariation': 0.0,
   'idleVariation': 0.0,
   'adcStartVariation': 0.0,
   'chirptx3': True