# Recording data in real time

In this notebook, we will look at how we can stream data from our sensors in real time. We will learn how to store that data in a csv file and how to process it using our functions and pandas. Let's load some stuff first:

In [None]:
%load_ext autoreload
%autoreload 2

In [None]:
from src.serialdevice import *
from src.formulas import*
from threading import Thread
import itertools  
import os
# Bokeh plotting tools
from bokeh.palettes import Dark2_5 as palette
from bokeh.models.sources import ColumnDataSource
from bokeh.plotting import figure
from bokeh.io import output_notebook, show, push_notebook
from bokeh.models import Panel, Tabs
from bokeh.layouts import column, gridplot

output_notebook()

## Streaming fundamentals

## Exporting data
We can directly export data using std_out redirection towards anything. For instance, if we use platformio or screen, we could do something like:

```
pio device monitor -b 115200 > output.txt
```

The ```>``` is redirecting the output of the ```pio device monitor``` command towards a text file, which is basically a log of the sensor. However, we can do better, and use the power of pandas to do some extra calculations and add a timestamp to the data we receive.

In [None]:
%matplotlib inline
import pandas as pd
import numpy as np
dS = pd.Series(np.arange(0,100,0.1))
# dS;
dS.plot()

In [None]:
dS.to_csv('data/dummy_series.csv')

## Interacting with the sensors

To log the data coming from the sensors, we will use a serial interface with the [pySerial](https://github.com/pyserial/pyserial) package. 

Other ways to interface with sensors and more scientific instrumentation options can be found in the [pyMeasure](https://github.com/ralph-group/pymeasure) package.

In [None]:
import serial
import serial.tools.list_ports

devices = list(serial.tools.list_ports.comports())
for d in devices: print(str(devices.index(d) + 1) + ' --- ' + d.device + ' --- ' + str(d.description))

## Streaming data

Three important frequencies to take into account:
- The frequency at which the sensor gives us data
- The frequency at which we sample the sensor
- The natural frequencies that we want to capture

<p><a href="https://commons.wikimedia.org/wiki/File:Signal_Sampling.png#/media/File:Signal_Sampling.png"><img src="https://upload.wikimedia.org/wikipedia/commons/5/50/Signal_Sampling.png" width="600px" alt="Signal Sampling.png"></a><br>By <a href="//commons.wikimedia.org/wiki/User:Email4mobile" title="User:Email4mobile">Email4mobile</a> (<a href="//commons.wikimedia.org/wiki/User_talk:Email4mobile" title="User talk:Email4mobile">talk</a>) - <a class="external free" href="https://en.wikipedia.org/wiki/File:Signal_Sampling.png">http://en.wikipedia.org/wiki/File:Signal_Sampling.png</a>, Public Domain, <a href="https://commons.wikimedia.org/w/index.php?curid=8693098">Link</a></p>

**Key take aways:**

1. We don't need to sample faster than the sensor gives us data
2. We need to sample at least twice as fast as the Nyquist frequency ([Nyquist-Shannon theorem](https://en.wikipedia.org/wiki/Nyquist%E2%80%93Shannon_sampling_theorem))

In [None]:
esp = serialdevice()
if esp.set_serial(): esp.update_serial()
print (f'Device serial number: {esp.serialNumber}')

In [None]:
# Self-explanatory
store_to_csv = True
# Frequency at which we will record the sensor data
raster = 0.05
# Default to 5 - increase if you need more smoothing
buffer_length = 5
# Number of points to show on the plot
n_show = 500

''' 
Set channels to monitor and calculate 

Each channel can have several processes to be done, in a sequential order, i.e.:
'y': {...
     '8': {'clean_na': ['drop', 'other']}, ...
The dict passes the name of the function, one parameter to it and if we want it:
- inplace: replace the channel with the formula, on the same column (inplace implies same + replacement)
- same: plot it on the same plot as the original channel
- other: plot it on another plot
''' 

channels_to_monitor = {'y': {'1': {'clean_na': ['fill', 'inplace']},
                             '2': {'smooth_convolution': [3, 'same', 'inplace']},
                             '3': {'exponential_smoothing': [0.2, 'same']},
                             '4': {'exponential_smoothing': [0.1, 'same']},
                             '5': {'exponential_smoothing': [0.05, 'same']},
                             '6': {'exponential_smoothing': [0.03, 'same']},
                             '7': {'exponential_smoothing': [0.02, 'same']},
                             '8': {'exponential_smoothing': [0.01, 'same']},#,
                             # '9': {'time_derivative': [1, 'same']},
                             # '10': {'time_diff': [1, 'other']}
                            }}

if store_to_csv: path_to_store = os.path.join(os.getcwd(), 'data/csv_export.csv'); print (f'Saving stream to: {path_to_store}')

We will use a separate [thread](https://docs.python.org/2/library/threading.html) for recording the data and processing it. This thread will also be in charge of streaming the data to the plot

In [None]:
def process_data(data):
    data = data.apply(pd.to_numeric, errors='coerce')
    for channel in channels_to_monitor.keys():
        for process_number in channels_to_monitor[channel].keys():
            # Process and formula
            process = list(channels_to_monitor[channel][process_number])[0]
            formula = process + f"(data['{channel}'], channels_to_monitor['{channel}']['{process_number}']['{process}'][0])" 
            # Name for new channel depending on inplace or not
            if 'inplace' in channels_to_monitor[channel][process_number][process]: channel_new_name = channel
            else: channel_new_name = channel + '_' + process + '_' + str(channels_to_monitor[channel][process_number][process][0])
            # Calculate
            if data.empty: data[channel_new_name] = []
            else: data[channel_new_name] = eval(formula)
    return data

# Start the stream
esp.start_streaming(buffer_length = buffer_length, raster = raster)
# Create plot columnar data
plot_data = ColumnDataSource(data = process_data(esp.worker.example))
# Number of tabs
n_tabs = len(list(channels_to_monitor.keys()))
tabs = Tabs(tabs = [])
colors = itertools.cycle(palette)

for channel in channels_to_monitor.keys():
    gridplots = list()
    p = figure(background_fill_color="#fafafa", x_axis_type='datetime')
    gridplots.append([p])
    p.line(y = channel, x="index", source = plot_data, legend_label = channel)
    p.title.text = f'Streaming {channel}'
    p.yaxis.axis_label = f'{channel}'
    p.xaxis.axis_label = 'Timestamp'

    for process_number in channels_to_monitor[channel].keys():
        process = list(channels_to_monitor[channel][process_number])[0]
        # We have already plotted it if it was inplace
        if 'inplace' in channels_to_monitor[channel][process_number][process]: continue

        channel_name = channel + '_' + process + '_' + str(channels_to_monitor[channel][process_number][process][0])
        if 'same' in channels_to_monitor[channel][process_number][process]:
            p.line(y=channel_name, x="index", legend_label = channel_name, source = plot_data, color = next(colors))
        elif 'other' in channels_to_monitor[channel][process_number][process]:
            p = figure(background_fill_color="#fafafa", x_axis_type='datetime')
            p.line(y=channel_name, x="index", legend_label = channel_name, source = plot_data, color = next(colors))
            p.yaxis.axis_label = f'{channel_name}'
            p.xaxis.axis_label = 'Timestamp'
            gridplots.append([p])

    p.legend.location='top_left'
    p.legend.click_policy="hide"

    grid = gridplot(gridplots,  plot_width=1000, plot_height=500)
    tab = Panel(child=grid, title=channel)
    tabs.tabs.append(tab)

handle = show(tabs, notebook_handle=True)
stop_threads = False

def worker_call(id, device, stop):
    df_data = pd.DataFrame()
    
    while True:
        if not device.worker.output.empty():
            new_data = device.worker.output.get()
            if 'Time' in new_data.columns: new_data.rename(columns={'Time': 'index'}, inplace=True)
            new_data = new_data.set_index('index')

            if df_data.empty: df_data = new_data
            else: df_data = pd.concat([df_data, new_data], sort = False)
            
            # We process everything
            # processed_data = process_data(new_data)    
            # if df_data.empty: df_data = processed_data
            #else: df_data = pd.concat([df_data, processed_data], sort = False)
            
            # We only process what we show
            processed_data = process_data(df_data.tail(n_show))
            # Stream and processing
            plot_data.stream(processed_data, n_show)
            
            # Store to csv
            if store_to_csv: df_data.to_csv(path_to_store, sep = ",")
            
            # Update plot
            push_notebook(handle = handle)

            if stop(): print("Finished thread"); break

thread = Thread(target=worker_call, args=(id, esp, lambda: stop_threads))
thread.start()

In [None]:
stop_threads = True
if esp.worker.is_alive():
    print ('Terminating device worker')
    esp.worker.terminate()
    esp.worker.join()

## References and more examples

- [Sampling](https://en.wikipedia.org/wiki/Sampling_(signal_processing))
- [PySerial](https://pyserial.readthedocs.io/en/latest/index.html)
- [Bokeh Streaming](https://docs.bokeh.org/en/latest/docs/user_guide/data.html?highlight=streaming%20data)
- [PyLab](https://scipy.github.io/old-wiki/pages/PyLab)
- Other packages for streaming data: https://streamz.readthedocs.io/en/latest/index.html
- [Create applications for DAQs using python](https://www.dashdaq.io/)