# Scope mimicry, Python version

to diagnose potential trigger and buffer storage issue

Pierre, March 2025

ScopeMimicry.h header:
```C++
enum e_dump_state {
	initialized,
	names,
	final_idx,
	datas,
	finished
};

class ScopeMimicry {
public:
	/**
	 * @brief create a scope 
	 *
	 * @param length : the size of each variable to record 
	 * @param nb_channel : number of variables to record
	 */
	ScopeMimicry(uint16_t length, uint16_t nb_channel);
	~ScopeMimicry();
	/**
	 * @brief define a variable to record
	 * Care!! the data must be a static float variable.
	 *
	 * @param channel : static float variable
	 * @param name : to define a name for the variable ex: "myvar"
	 */
	void connectChannel(float &channel, const char name[]);
	/**
   * @brief add one data in memories for all channels if trigger has been true
   * one time.
   *
   * @return an integer percentage of acquisition. then if you have 100 the
   * acquisition is finished.
   */
	uint16_t acquire(); // or trace ?
	// void dump();
	/**
   * @brief reset acquisition trigger
   */
	void start();

	/**
   * @brief define the trigger
   *
   * @param func must a function without arguments but which return a boolean
   * value when its true the data are recorded in memory by the way of the
   * `acquire()` method.
   */
	void set_trigger(bool (*func)());
	/**
   * @brief define a delay to have some datas before the trigger instant between
   * the delay is a relative value between [0, 1]
   * delay of 0 : mean no delay after trigger
   *
   * @param delay (float value)
   */
	void set_delay(float32_t delay);
	/**
   * @brief as there's a circular buffer, the last point recorded can be
   * anywhere in this buffer. this function give to us the index of this last
   * value. the index is between 0 and lenght-1
   *
   * @return
   */
	uint16_t get_final_idx();

	/**
   * @brief get pointer to the memory of data recorded
   *
   * @return
   */
	uint8_t *get_buffer();

	/**
   * @brief return number of byte used by the memory
   *
   * @return
   */
	uint16_t get_buffer_size();
	/**
   * @brief return the length of one channel recording
   *
   * @return
   */
	uint16_t get_length();
	/**
   * @brief number of channel recorded
   *
   * @return
   */
	uint16_t get_nb_channel();
	/**
   * @brief return the status of the trigger
   *
   * @return
   */
	bool has_trigged();

	/**
   * @brief return percentage of filling memory
   *
   * @return percentage of filling memory in % (integer)
   */
	uint16_t status();
	/**
	 * @brief return name of the channel "idx"
	 *
	 * @param idx : number of the channel (begin with 0)
	 * @return a c-string (const char*)
	 */
	const char *get_channel_name(uint16_t idx);
	/**
	 * @brief to retrieve one date saved
	 *
	 * @param index : number ot the data to get [0, length-1]
	 * @param channel_idx : number of the channel [0, nb_channel-1]
	 * @return a float number
	 */
	float32_t get_channel_value(uint32_t index, uint32_t channel_idx);
	/**
	 * @brief help to get all the data recorded in a csv format.
	 * you must call scope.reset_dump() before, then
	 * it must be called in a while loop until the scope.dump_state == finished
	 *
	 * @return string.
	 */
	char *dump_datas();
	/**
	 * @brief to call one time before using dump_datas()
	 */
	void reset_dump();
	/**
	 * @brief dump_state can be: initialized, names, final_idx, datas, and finished.
	 *
	 * @return current dump_state value
	 */
	e_dump_state get_dump_state();

private:
	uint16_t _length;
	uint16_t _nb_channel;
	uint16_t _acq_counter;
	uint16_t _effective_chan_number;
	bool _old_trigg_value;
	bool _trigged;
	uint16_t _delay;
	uint16_t _idx_name;
	uint16_t _idx_datas;


	float32_t **_channels;
	float32_t *_channel_numbers;
	const char **_names;
	float32_t *_memory;

	uint16_t _decimation;
	uint16_t _delay_complement;
	uint16_t _trigged_counter;
	uint16_t _final_idx;
	bool (*_triggFunc)();
	/* variable for dump_datas method */
	e_dump_state dump_state;
	char hash[2] = "#";
	char nullchar[2] = " ";
	char char_name[256];
	/* each data is a float written in hexa so eight chars with \n and \0 chars at end */
	char char_data[10];
	char *data_dumped;


};
```

ScopeMimicry.cpp code:

```C++
#include "ScopeMimicry.h"
#include <stdio.h>
#include <string.h>

ScopeMimicry::ScopeMimicry(uint16_t length, uint16_t nb_channel):
    _length(length),
    _nb_channel(nb_channel),
    _acq_counter(0),
    _effective_chan_number(0),
    _old_trigg_value(false),
    _trigged(false),
    _delay(0),
    _delay_complement(length),
    _trigged_counter(length),
    _final_idx(length-1)
{
    _memory = new float[_length*_nb_channel]();
    _names = new const char*[nb_channel];
    _channels = new float32_t *[nb_channel];
};

ScopeMimicry::~ScopeMimicry() {
    delete(_memory);
    delete(_names);
}

/**
 * @brief it get the reference of a static variable (channel) to be able to record its values when
 * it is necessary.
 *
 * @param channel : reference to a static variable
 * @param name: name of the variable you want to have after retrieving.
 */
void ScopeMimicry::connectChannel(float &channel, const char name[]) {
    if (_effective_chan_number < _nb_channel) {
        // capture the reference of the variable.
        _channels[_effective_chan_number] = &channel;
        _names[_effective_chan_number] = name;
        _effective_chan_number++;
    }
    // TODO: return errno code ?
}

/**
 * @brief add one value of each channel into its internal memory if the trigger has been
 * activated
 */
uint16_t ScopeMimicry::acquire() {
    bool trigg_value = (*_triggFunc)();
    // compute trigger
    if (!_old_trigg_value && trigg_value && !_trigged) {
        _trigged = true;
        _trigged_counter = (_acq_counter + _delay_complement - 1) % _length;
    }
    _old_trigg_value = trigg_value;

    if (!_trigged) {
        _trigged_counter = -1;
    }
    if (_acq_counter != _trigged_counter) {
        _acq_counter = (_acq_counter + 1)% _length;
        for (int k_ch=0; k_ch < _effective_chan_number; k_ch++) {
            _memory[(_acq_counter * _nb_channel) + k_ch] = *(_channels[k_ch]);
        }
        if (_trigged)
            return 1;
        else
            return 0;
    } else {
        _final_idx = _acq_counter;
        return 2;
    }
}

/**
 * @brief define a delay to record data before the trigg. delay is in [0, 1]
 *
 * @param d delay
 */
void ScopeMimicry::set_delay(float32_t d) {
    if (d < 0.0) _delay = 0.0;
    else if (d >= 1.0) _delay = _length;
    else _delay = d *_length;
    _delay_complement = _length - _delay;
}

/**
 * @brief reset the trigger and enable to record new data_dumped
 */
void ScopeMimicry::start() {
    _trigged = false;
    _old_trigg_value = false;
}

uint16_t ScopeMimicry::get_final_idx() {
    return _final_idx;
}

/**
 * @brief get a function without arguments which should return a boolean to enable
 * recording
 *
 * @param func: handle to a function which shall return a boolean.
 */
void ScopeMimicry::set_trigger(bool (*func)()) {
    _triggFunc = func;
}

uint8_t * ScopeMimicry::get_buffer() {
    return (uint8_t * )_memory;
}

uint16_t ScopeMimicry::get_buffer_size() {
    return (_length * _nb_channel * sizeof(float32_t));
}

uint16_t ScopeMimicry::get_length() {
    return _length;
}

uint16_t ScopeMimicry::get_nb_channel() {
    return _nb_channel;
}

bool ScopeMimicry::has_trigged() {
    return _trigged;
}

uint16_t ScopeMimicry::status() {
    return (uint16_t)((_trigged_counter*100) / _delay_complement);

}

const char *ScopeMimicry::get_channel_name(uint16_t idx) {
    if (idx < _nb_channel) {
        return this->_names[idx];
    }
    else {
        return "BadIdx";
    }
}

/**
 * @brief get one value of one channel
 *
 * @param index : value in the table between [0, length-1]
 * @param channel_idx :channel number.
 * @return float value.
 */
float32_t ScopeMimicry::get_channel_value(uint32_t index, uint32_t channel_idx) {
    /* to be sure we are not outside the buffer */
    index = (index) % _length;
    return _memory[(index * _nb_channel) + channel_idx];
}

void ScopeMimicry::reset_dump() {
	dump_state = initialized;
}

enum e_dump_state ScopeMimicry::get_dump_state() {
	return dump_state;
}

char* ScopeMimicry::dump_datas()
{
	uint16_t n_datas = _length * _nb_channel;
	/* reset char_name */
	char_name[0] = '\0';
	switch (dump_state) {
		case initialized:
			data_dumped = hash;
			_idx_name = 0;
			_idx_datas = 0;
			dump_state = names;
		break;
		case names:
			if (_idx_name < this->get_nb_channel())
			{
				strcat(char_name,get_channel_name(_idx_name));
				strcat(char_name, ",");
				data_dumped = char_name;
				_idx_name +=1;
			}
			else
			{
				data_dumped = (char *) "\n";
				dump_state = final_idx;
			}
		break;
		case final_idx:
			sprintf(char_name, "## %d\n", get_final_idx());
			data_dumped = char_name;
			dump_state = datas;
		break;
		case datas:
			if (_idx_datas < n_datas-1)
			{
				sprintf(char_data, "%08x\n", *((uint32_t *) _memory + _idx_datas));
				data_dumped = char_data;
				_idx_datas += 1;
			}
			else
			{
				dump_state = finished;
			}
		break;
		case finished:
			data_dumped = nullchar;
		break;
		}
	return data_dumped;
}

```

In [64]:
import numpy as np

In [280]:
class ScopeMimicry:
    def __init__(self, length: int, nb_channel: int):
        """
    	uint16_t _idx_name;
    	uint16_t _idx_datas;
    	float32_t *_channel_numbers;
    	float32_t *_memory;
    
    	uint16_t _decimation;

    	/* variable for dump_datas method */
    	e_dump_state dump_state;
    	char hash[2] = "#";
    	char nullchar[2] = " ";
    	char char_name[256];
    	/* each data is a float written in hexa so eight chars with \n and \0 chars at end */
    	char char_data[10];
    	char *data_dumped;
        """
        print(f"Init ScopeMimicry(length={length}, nb_channel={nb_channel})")
        self._length = length
        self._nb_channel = nb_channel
        self._acq_counter = 0
        self._effective_chan_number = 0
        self._old_trigg_value = False
        self._trigged = False
        self._delay = 0
        self._delay_complement = length
        self._trigged_counter = length
        self._final_idx = length-1

        self._memory = np.zeros(self._length*self._nb_channel)
        self._names = [f"ch{i}" for i in range(nb_channel)] # default name
        # Pointers to the global memory. 
        # Python-specific adaptation: values should be keys of the `global_memory` dict
        self._channels = [None for i in range(nb_channel)] 

        self._triggFunc = lambda : False
        # Python specific: mockup program global memory where the _channels pointer should point to
        self.global_memory = dict()
    # end init
    
    def connectChannel(self, channel_addr, name: str):
        """store channel address withing (Python-specific: in C it stores the pointer)
        along with channel name
        """
        print(f'Connect channel named "{name}" to variable at addr "{channel_addr}"')
        if self._effective_chan_number < self._nb_channel:
            # capture the reference of the variable.
            self._channels[self._effective_chan_number] = channel_addr
            self._names[self._effective_chan_number] = name
            self._effective_chan_number += 1
    # end connectChannel
    
    def acquire(self) -> int:
        """add one value of each channel into its internal memory,
        if the trigger has been activated.
        Returns 0,1,2 as status
        """
        # Call trigger function
        trigg_value = self._triggFunc()
        # compute trigger status
        if not self._old_trigg_value and trigg_value and not self._trigged:
            self._trigged = True
            # CHANGED from C++ ._acq_counter + self._delay_complement - 1: -1 removed! DOESN'T WORK!
            self._trigged_counter = (self._acq_counter + self._delay_complement - 0) % self._length
            if (self._acq_counter + self._delay_complement - 0) < self._length:
                compute_detail = f'{self._acq_counter}+{self._delay_complement}-0'
            else:
                compute_detail = f'({self._acq_counter}+{self._delay_complement}-0)%{self._length}'
            print(f'  !trigger started: _trigged_counter = {self._trigged_counter} [i.e. {compute_detail}]')

        self._old_trigg_value = trigg_value;
    
        if not self._trigged:
            self._trigged_counter = -1

        print(f'  _acq_counter={self._acq_counter}', end=', ')
        if trigg_value:
            print('trigg_value', end=', ')
        if self._trigged:
            print('_trigged', end=', ')
        print()
        if self._acq_counter != self._trigged_counter:
            self._acq_counter = (self._acq_counter + 1) % self._length
            for k_ch in range(self._effective_chan_number):
                # Python-specific: search self._channels[k_ch] value within self.global_memory:
                self._memory[(self._acq_counter * self._nb_channel) + k_ch] = self.global_memory[self._channels[k_ch]] 
            # CHANGED from C++: _acq_counter updated *after* record
            if self._trigged:
                return 1
            else:
                return 0
        else:
            print(f'  acquisition already done: no record (_final_idx={self._final_idx})')
            self._final_idx = self._acq_counter;
            return 2
    # end acquire
        
    def set_delay(self, d: float):
        """Define a delay to record data before the trigg. delay is in [0, 1]
        """
        if d < 0.0:
            self._delay = 0.0;
        elif d >= 1.0:
            self._delay = self._length
        else:
            self._delay = int(d *self._length)
        
        self._delay_complement = self._length - self._delay
        print(f'Set delay d={d}:')
        print(f'- _delay={self._delay}')
        print(f'- _delay_complement={self._delay_complement}')
    
    def start(self):
        """reset the trigger and enable to record new data_dumped"""
        self._trigged = False
        self._old_trigg_value = False

    def set_trigger(self, func):
        """set trigger function. `func` should return True at some instant to enable trigger
        """
        self._triggFunc = func

In [281]:
def trig_true():
    'always True trigger'
    return True
trig_true()

True

In [282]:
trig_seq = [False, True]
k = 1

In [283]:
def trig_global_seq():
    'trigger which returns the global value trig_seq[k]'
    return trig_seq[k]
trig_global_seq()

True

## Test

### Init scope

In [284]:
length = 4
nb_channel = 2
scope = ScopeMimicry(length, nb_channel)
scope._memory # size length*nb_channel

Init ScopeMimicry(length=4, nb_channel=2)


array([0., 0., 0., 0., 0., 0., 0., 0.])

In [285]:
delay = 0.5
scope.set_delay(delay)

Set delay d=0.5:
- _delay=2
- _delay_complement=2


Connect channels:

In [286]:
print('_effective_chan_number: ', scope._effective_chan_number)
scope.connectChannel('ptr1', 'ch1')
print('_effective_chan_number: ', scope._effective_chan_number)
scope.connectChannel('ptr2', 'ch2')
print('_effective_chan_number: ', scope._effective_chan_number)

_effective_chan_number:  0
Connect channel named "ch1" to variable at addr "ptr1"
_effective_chan_number:  1
Connect channel named "ch2" to variable at addr "ptr2"
_effective_chan_number:  2


Connect trigger

In [287]:
scope.set_trigger(trig_global_seq)
scope._triggFunc()

True

### Acquire a sequence of instants: first instant

In [288]:
ch1_seq = [11, 12, 13, 14, 15]
ch2_seq = [21, 22, 23, 24, 25]
trig_seq = [False, False, False, True, True]
K = len(ch1_seq)

for k in range(K):
    print(f'k={k}: ch1={ch1_seq[k]}, ch2={ch2_seq[k]}')
    scope.global_memory['ptr1'] = ch1_seq[k]
    scope.global_memory['ptr2'] = ch2_seq[k]
    status = scope.acquire()
    print(f'→ return {status},', 'memory=', scope._memory)

k=0: ch1=11, ch2=21
  _acq_counter=0, 
→ return 0, memory= [ 0.  0. 11. 21.  0.  0.  0.  0.]
k=1: ch1=12, ch2=22
  _acq_counter=1, 
→ return 0, memory= [ 0.  0. 11. 21. 12. 22.  0.  0.]
k=2: ch1=13, ch2=23
  _acq_counter=2, 
→ return 0, memory= [ 0.  0. 11. 21. 12. 22. 13. 23.]
k=3: ch1=14, ch2=24
  !trigger started: _trigged_counter = 1 [i.e. (3+2-0)%4]
  _acq_counter=3, trigg_value, _trigged, 
→ return 1, memory= [14. 24. 11. 21. 12. 22. 13. 23.]
k=4: ch1=15, ch2=25
  _acq_counter=0, trigg_value, _trigged, 
→ return 1, memory= [14. 24. 15. 25. 12. 22. 13. 23.]


### All in one function (for multiple tests)

In [289]:
def make_trig_fun(trig_seq):
    """create a trig function which will sequentially yield the values of `trig_seq`
    """
    k = 0
    K = len(trig_seq)
    print(f'creating trig function with sequence of length {K}')
    def trig_fun():
        nonlocal k
        if not k<K:
            raise RuntimeError(f'trig_fun() called more than {K} times')
        t = trig_seq[k]
        k += 1
        return t
    return trig_fun

f = make_trig_fun([False, True, True])
print(f(), f(), f())
f() # should raise RuntimeError

creating trig function with sequence of length 3
False True True


RuntimeError: trig_fun() called more than 3 times

In [290]:
def test_2ch_record(length, delay, ch1_seq, ch2_seq, trig_seq):
    """test scope recording with `length` and `delay` of two channels
    with list data:
    - ch1_seq, ch2_seq: values for ch1, ch2 along instants
    - trig_seq: trigger values along instants
    """
    K = len(ch1_seq)
    assert len(ch1_seq) == len(ch2_seq)
    assert len(ch1_seq) == len(trig_seq)
    print(f'Scope simulating along {K} instants...')
    nb_channel = 2
    scope = ScopeMimicry(length, nb_channel)
    
    scope.set_delay(delay)
    
    #Connect channels:
    scope.connectChannel('ptr1', 'ch1')
    scope.connectChannel('ptr2', 'ch2')
    
    #Connect trigger
    scope.set_trigger(make_trig_fun(trig_seq))
    
    ### Acquire a sequence of instants: first instant
    
    for k in range(K):
        print(f'k={k}: ch1={ch1_seq[k]}, ch2={ch2_seq[k]}')
        scope.global_memory['ptr1'] = ch1_seq[k]
        scope.global_memory['ptr2'] = ch2_seq[k]
        status = scope.acquire()
        print(f'  → return {status},', 'memory=', scope._memory)
        print()

In [291]:
length = 3
delay = 0.0
ch1_seq = [11, 12, 13, 14, 15]
ch2_seq = [21, 22, 23, 24, 25]
trig_seq = [True, True, True, True, True]

test_2ch_record(length, delay, ch1_seq, ch2_seq, trig_seq)

Scope simulating along 5 instants...
Init ScopeMimicry(length=3, nb_channel=2)
Set delay d=0.0:
- _delay=0
- _delay_complement=3
Connect channel named "ch1" to variable at addr "ptr1"
Connect channel named "ch2" to variable at addr "ptr2"
creating trig function with sequence of length 5
k=0: ch1=11, ch2=21
  !trigger started: _trigged_counter = 0 [i.e. (0+3-0)%3]
  _acq_counter=0, trigg_value, _trigged, 
  acquisition already done: no record (_final_idx=2)
  → return 2, memory= [0. 0. 0. 0. 0. 0.]

k=1: ch1=12, ch2=22
  _acq_counter=0, trigg_value, _trigged, 
  acquisition already done: no record (_final_idx=0)
  → return 2, memory= [0. 0. 0. 0. 0. 0.]

k=2: ch1=13, ch2=23
  _acq_counter=0, trigg_value, _trigged, 
  acquisition already done: no record (_final_idx=0)
  → return 2, memory= [0. 0. 0. 0. 0. 0.]

k=3: ch1=14, ch2=24
  _acq_counter=0, trigg_value, _trigged, 
  acquisition already done: no record (_final_idx=0)
  → return 2, memory= [0. 0. 0. 0. 0. 0.]

k=4: ch1=15, ch2=25
  