## MicroPython ESP32 Experimentation

### Establishing connection to target board
First, make sure you've got the right serial port. On unix-based systems, you can run `ls /dev/tty.*` to see your available serial devices. Replace as necessary below.

This will allow Jupyter (your host computer) to run commands and send/receive information to/from your target board in real time using the MicroPython REPL.

In [17]:
#%serialconnect to --port="/dev/tty.usbserial-02U1W54L" --baud=115200
%serialconnect to --port="/dev/tty.usbserial-0001" --baud=115200

[34mConnecting to --port=/dev/tty.usbserial-0001 --baud=115200 [0m
[34mReady.
[0m

In [10]:
%sendtofile lib/decoding.py --source lib/decoding.py

[34m

*** Sending Ctrl-C

[0m[31m[Timed out waiting for recognizable response]
[0m

In [44]:
%sendtofile main.py --source main.py

Sent 473 lines (8718 bytes) to main.py.


In [None]:
%sendtofile lib/decodinga.py --source lib/decoding.py

In [None]:
import network
wlan = network.WLAN(network.STA_IF)
wlan.active(True)
ssid = env_vars.get("WIFI_SSID")
password = env_vars.get("WIFI_PASSWORD")
wlan.connect('ssid', 'password')
print(wlan.ifconfig())

In [18]:
f = open("lib/config.py", "r")
print(f.read())

import utime

BASE_CLK_FREQ = 240000000  # 240 MHz for ESP32

ADC_SAMPLE_FREQ = 256  # sample freq in Hz

RECORDING_LEN_SEC = 4

OVERLAP = 0.8

DOWNSAMPLED_FREQ = 64  # 64 Hz downsampled  to ensure nyquist condition

PREPROCESSING = True  # if true, LP filter and downsample

STIM_FREQS = [7, 10, 12]  # stimulus freqs. in Hz

DEFAULT_LOG_SESSION = "test-{0}".format(utime.ticks_ms())

MODE = "log"

HTTP_LOG_URL = "http://james-tev.local:5000/"



## Using a Runner for experimentation and logging
The a `Runner` is encapsulates the core functions in this EEG system, including peripheral setup, sampling, signal processing, logging and memory management. The `OnlineRunner` offers mostly the same functionality as the standard `Runner` class, except it allows for logging and other communication with a remote server - either on the Internet or on your local network.

### Offline functionality
The standard `Runner` is good for testing core functionality without the need for remote logging. See below for initialisation and execution.

In [None]:
from lib.runner import Runner

Nc = 1
Ns = 128
Nt = 3
stim_freqs = [7, 10, 12]

# Here, we select the algorithm. Can be one of ['MsetCCA', 'GCCA', 'CCA']
decoding_algo = 'MsetCCA'

runner = Runner(decoding_algo, buffer_size=Ns) # initialise a base runner
runner.setup() # setup peripherals and memory buffers

### Calibration
If you are using an algorithm that leverages calibration data (MsetCCA, GCCA), you will need to record some calibration data to fit the decoder model. This is usually only done once off before inference starts. You may want to recalibrate at some semi-regular interval too though. 

At the moment, there is not an integrated process to record calibration data in the `Runner` class. You have to record calibration data and provide it to the runner which it will in turn use to fit its internal decoder model. In future, this will hopefully become more integrated and easy. For now, some random calibration data is generated below to illustrate the format which the runner/decoder expects. You need to provide iid calibration data trials for each stimulus frequency.

Note that if you try to run calibration using an incompatible algorithm (such as standard CCA), a warning will be generated and the calibration sequence will be skipped.

In [None]:
gc.collect()

In [None]:
from lib.synthetic import synth_X

calibration_data = {f:synth_X(f, Nc, Ns, Nt=Nt) for f in stim_freqs}
runner.calibrate(calibration_data)

In [None]:
print(gc.mem_free())

### Decoding
When configured with a set of stimulus frequencies $\mathcal{F}=\{f_1, \dots, f_k, \dots, f_K\}$, the `Runner`'s decoder model consists of $K$ independent sub-classifiers $\Phi_k$ that each leverage the decoding algorithm selected. These independent classifiers must be calibrated independently. When the `Runner` is presented a new test observation, each sub-classifier $\Phi_k$ produces an output correlation estimate corresponding to $f_k$. Ultimately, the runner outputs a dictionary of frequency-correlation pairs of the form
```python
{f_1: 0.12, f_2: 0.03, f_3: 0.85}
```
The decoded output frequency is the one corresponding to the largest correlation in this output dictionary. In this example, it would be $f_3$.

In [None]:
test_freq = 7 # 7 Hz test signal
test_data = synth_X(test_freq, Nc, Ns, Nt=1)

print(runner.decoder.classify(test_data))

In [16]:
from ulab import numpy as np
import utime as time
from lib.runner import Runner

decode_period_s = 4 # decode every x seconds
Nc = 1
Ns = 128
Nt = 3
stim_freqs = [7, 10, 12]
decoding_period_s = 4

runner = Runner('CCA', buffer_size=Ns)
runner.setup() 

def preprocess_data(signal):
    
    """Preprocess incoming signal before decoding algorithms.
    This involves applying a bandpass filter to isolate the target SSVEP range
    and then downsampling the signal to the Nyquist boundary.
    
    Returns:
        [np.ndarray]: filtered and downsampled signal
    """
    from lib.signal import sos_filter
    downsample_freq = 64
    ds_factor = 256//downsample_freq
    return sos_filter(signal)[::ds_factor]

def collectData(decode_period_s):
    
    global runner
    
    time.sleep(decode_period_s)
    data = runner.output_buffer
#     print(data)
    gc.collect()
    return np.array(data)

def getData(Nt, decode_period_s):

    global runner
    
    runner.run()
    
    trials = []
    time.sleep(5)
    count=0
    
    if Nt <=1:
        return collectData(decode_period_s)
    for i in range(Nt):
        trials.append(collectData(decode_period_s).flatten())
        print(trials)
    runner.stop()

    print(len(trials))

    gc.collect()
    return np.array(trials)


ADC initialised
SPI initialised
DigiPot set to 100 = gain of 10.62498


In [17]:
#calibration_data = {f:getData(Nt, decoding_period_s) for f in stim_freqs}
calibration_data = {}
calibration_data[7] = getData(Nt, decoding_period_s)
gc.collect()

..[array([0.1022726, 6.393758, 30.04326, ..., -17.88625, 40.25251, 43.99335], dtype=float32)]
[array([0.1022726, 6.393758, 30.04326, ..., -17.88625, 40.25251, 43.99335], dtype=float32), array([0.02554362, 4.924626, 34.40299, ..., 13.66638, -37.46715, -46.48108], dtype=float32)]
.[array([0.1022726, 6.393758, 30.04326, ..., -17.88625, 40.25251, 43.99335], dtype=float32), array([0.02554362, 4.924626, 34.40299, ..., 13.66638, -37.46715, -46.48108], dtype=float32), array([0.04832244, 5.792087, 36.82669, ..., 47.1671, 5.583657, -50.93529], dtype=float32)]
3


In [18]:
calibration_data[10] = getData(Nt, decoding_period_s)
gc.collect()

..[array([0.08628019, 6.923466, 38.24591, ..., -18.19421, 34.08795, 46.6745], dtype=float32)]
[array([0.08628019, 6.923466, 38.24591, ..., -18.19421, 34.08795, 46.6745], dtype=float32), array([0.05372761, 5.886846, 36.22691, ..., 20.57845, -30.37518, -49.47893], dtype=float32)]
.[array([0.08628019, 6.923466, 38.24591, ..., -18.19421, 34.08795, 46.6745], dtype=float32), array([0.05372761, 5.886846, 36.22691, ..., 20.57845, -30.37518, -49.47893], dtype=float32), array([0.1091523, 5.572274, 22.46525, ..., 39.65464, 51.01727, 7.14376], dtype=float32)]
3


In [19]:
calibration_data[12] = getData(Nt, decoding_period_s)
gc.collect()

..[array([-0.07902402, -2.427289, -2.418796, ..., -60.14733, -24.28555, 30.13657], dtype=float32)]
.[array([-0.07902402, -2.427289, -2.418796, ..., -60.14733, -24.28555, 30.13657], dtype=float32), array([-0.05558455, -0.1863209, 11.00656, ..., 39.97823, 51.31582, 2.343739], dtype=float32)]
[array([-0.07902402, -2.427289, -2.418796, ..., -60.14733, -24.28555, 30.13657], dtype=float32), array([-0.05558455, -0.1863209, 11.00656, ..., 39.97823, 51.31582, 2.343739], dtype=float32), array([-0.04097127, 0.7651208, 15.67703, ..., -21.87494, 36.72865, 55.21906], dtype=float32)]
3


In [20]:
print(gc.mem_free())
del runner
print(gc.mem_free())

44400
44400


In [21]:
decode = Runner('MsetCCA', buffer_size=Ns) # initialise a base runner
decode.setup() # setup peripherals and memory buffers
decode.calibrate(calibration_data)
print(gc.mem_free())

ADC initialised
SPI initialised
DigiPot set to 100 = gain of 10.62498
45360


In [22]:
decode.run()

In [26]:
print(decode.decoder.classify(np.array(decode.output_buffer)))

.[34m

*** Sending Ctrl-C

[0m[31m[Timed out waiting for recognizable response]
[0m

In [27]:
decode.stop()

.[34m

*** Sending Ctrl-C

[0m[31m[Timed out waiting for recognizable response]
[0m

In [28]:
print(calibration_data)

.[34m

*** Sending Ctrl-C

[0m[31m[Timed out waiting for recognizable response]
[0m

In [29]:
print(1)

1


### Asynchronous operation
Once the `Runner` has been configured and calibrated (if applicable), its internal `run()` loop can be started in which it will asynchronously sample and decode EEG data at preconfigured frequencies. Timing is handled using hardware timers on the ESP32 and interrupts are used to run asynchronous ISRs that handle sampling, preprocessing, filtering and decoding.

Note that once the async run loop has begun, you can still run commands or view the `Runner`'s attributes although there may be a noticeable delay since ISRs will typically get higher execution priority and there are quite a few interrupt loops running.

In [None]:
# start sampling and recording data (logging not setup in this case)
runner.run()

In [None]:
# see if runner has indeed started smapling
print(runner.is_sampling)

In [None]:
# display the contents of the output buffer - this will be updated internally by the runner
# at a rate determined by the sampling frequency and sample buffer size (typically every 1s)
print(runner.output_buffer)

In [None]:
# decode the contents of the output buffer. There will be a delay here if the runner 
# is currently running (i.e. `is_sampling=True`).
print(runner.decode())

In [None]:
# stop runner
runner.stop()

#### Simple decoding loop
In order to test online decoding, here is a basic synchronous loop-based option. Interrupt the cell to stop the infinite loop.

In [None]:
import utime as time
from lib.runner import Runner

Nc = 1
Ns = 128
Nt = 3
stim_freqs = [7, 10, 12]

# Here, we select the algorithm. Can be one of ['MsetCCA', 'GCCA', 'CCA']
decoding_algo = 'MsetCCA'

decode_period_s = 2 # read decoded output every x seconds

runner = Runner(decoding_algo, buffer_size=Ns) # initialise a base runner
runner.setup()

if decoding_algo in ['MsetCCA', 'GCCA']:
    from lib.synthetic import synth_X

    calibration_data = {f:synth_X(f, Nc, Ns, Nt=Nt) for f in stim_freqs}
    runner.calibrate(calibration_data)

runner.run() # start async run loop

try:
    while True:
        time.sleep(decode_period_s)
        print(runner.decoded_output)
except KeyboardInterrupt:
    runner.stop()
    print('received SIGINT - stopping')

### Testing your WiFi connection
In order to connect to a local WiFi network, you'll need to supply your network SSID and password in a `.env` file on the board. Doing this is easy: 
1. On your computer, create a `.env` file using `touch .env`. Update the `.env` file with the required fields:
    
    ```bash
    #.env 
    WIFI_SSID=<your network name>
    WIFI_PASSWORD=<your network password>
    
    ```
    
2. Send this file to your target device using the following command:
    ```ipython
%sendtofile --source lib/.env lib/.env  --binary
```

You may need to update the local (source) path to your `.env` file depending on where you created/stored it.

In [None]:
%sendtofile --source /Users/rishil/Desktop/.env lib/.env  --binary

In [34]:
from lib.utils import connect_wifi, load_env_vars
from lib.synthetic import synth_X

Nc = 1
Ns = 128
Nt = 3
stim_freqs = [7, 10, 12]

# Here, we select the algorithm. Can be one of ['MsetCCA', 'GCCA', 'CCA']
decoding_algo = 'MsetCCA'
calibration_data = {f:synth_X(f, Nc, Ns, Nt=Nt) for f in stim_freqs}

env_vars = load_env_vars("lib/.env")
# connect WiFI
ssid = env_vars.get("WIFI_SSID")
password = env_vars.get("WIFI_PASSWORD")
connect_wifi(ssid, password)

connecting to network...
network config: ('192.168.0.76', '255.255.255.0', '192.168.0.1', '194.168.4.100')


#### Online Runner
Now that you've established network connectivitiy, you can test out an `OnlineRunner`. In order to test web logging to a remote server, we can use a basic HTTP logger. However, this obviously needs an API/server willing to accept our requests. There is a basic logging API using `Flask` in `/eeg_lib/logging_server.py`. You can run it using `python logging_server.py` which will spin up a development server on the predefined port (5000 or 5001). Then, just configure your `OnlineRunner` with the appropriate logger params and you're set.

In [35]:
from lib.runner import OnlineRunner
from lib.logging import logger_types

api_host = "http://192.168.0.13:5001/" # make sure the port corresponds to your logging server configuration
log_params = dict(server=api_host, log_period=4, logger_type=logger_types.HTTP, send_raw=True, session_id='test')
# runner = OnlineRunner('CCA', buffer_size=256)
runner = OnlineRunner(decoding_algo, buffer_size=Ns)
runner.setup(**log_params)

ADC initialised
SPI initialised
DigiPot set to 100 = gain of 10.62498
network config: ('192.168.0.76', '255.255.255.0', '192.168.0.1', '194.168.4.100')


In [36]:
# start the runner - you should see requests being made to your local server
runner.calibrate(calibration_data)
runner.run()

In [37]:
runner.stop()

## Experimentation

In [None]:
%rebootdevice

In [None]:
%lsmagic