In [1]:
# Basic
from threading import Thread
import os
import time
import datetime
import socket
import json

# Plotting and Data
from matplotlib import pyplot as plt
import numpy as np
import pandas as pd

# IPYthon
from IPython.display import clear_output
from IPython.display import IFrame
import ipywidgets as iw

#Debugging
import logging
logging.basicConfig(level=logging.WARNING)

In [50]:
### EEG Class ###

class EEG:
    '''The following variables are shared between a streaming and a plotting instance
    of EEG's subclasses. They run in different threads and need to access a shared variable space.'''
    
    # Data Structure
    n_channels = 8
    time = {"time":pd.Series([],dtype="int64")}
    channels = {str(i):pd.Series([],dtype="float64") for i in range(1,n_channels+1)}
    rate = {"sampling_rate":pd.Series([],dtype="float64")}
    data = pd.DataFrame({**time, **channels, **rate})
    plot_buffer = data
    
    # Preview of Data columns              #
    # time   | 0  | 1 | .. | sampling_rate # 
    
    # GUI editable parameters
    connected = False
    connecting = False
    recording = False
    plotting = False
    active_channels = [1,2,3,4,5,6,7,8]
    
    #Plot Parameters
    time_window = 2
    sampling_rate = 222
    xwindow = time_window*sampling_rate
    mV_window = 2000
    fps = 25
    
    def download():
            if 'sessions' not in os.listdir():
                os.mkdir('sessions')
            time = datetime.datetime.now().strftime("%Y-%m-%d-%H%M")
            filepath = f"sessions/{time}.csv"
            EEG.data.to_csv(filepath)
            print(f"Data saved to {filepath}")
            return
        
    class Streamer: 
        
        def __init__(self, timeout=10):
            self.timeout = timeout
            self.socket = None
            self.connection = None 

        def build_connection(self,host="",port=65432):
            '''
            Creates a UDP Socket and waits for a connection with the specified parameters.
            returns
            ss : streaming socket
            connection 
            '''
            ss = socket.socket(socket.AF_INET, socket.SOCK_STREAM)

            try:
                ss.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
                ss.bind((host, port))
            except socket.error as e:
                print(str(e))

            print("Waiting for a Connection..")
            ss.settimeout(self.timeout)
            try:
                EEG.connecting = True
                ss.listen()  
                connection, address = ss.accept()
                EEG.connected = True
                
                ip = address[0]
                port = address[1]
                print(f"Connected to {ip}:{port}")

                self.socket = ss
                self.connection = connection
                self.ipconn = ip
                
                # If the user has aborted the connection while the socket was listening
                if EEG.connecting == False: 
                    self.socket.close()
                    EEG.connected = False
                
            except socket.timeout:
                print("Socket Time Out")
                
            EEG.connecting = False
            return

        def record(self, buffersize=1):
            '''
            Builds a connection, reads data from it and creates packages in the form of dictionaries.
            Expects incoming data to be stringified json files. 
            ''' 
            EEG.recording = True
            connection = self.connection
            socket = self.socket
            pkg = ""        # 1 pkg: 1 json object
            pkg_count = 0
            empty_msgs = 0  # counter

            start_time = time.time()
            sampling_rate = 0 

            while EEG.recording:  
                # Receive data (buffer length=1byte)
                response = connection.recv(buffersize).decode("utf-8")   

                # Count Empty Responses
                if not response: 
                    empty_msgs += 1  
                    if empty_msgs > 1000: break

                # Beginning of Package
                if "{" in response:  
                    pkg += response[response.index("{"):]  # add everything after first curly
                    empty_msgs = 0   # reset counter

                # Content
                elif "}" not in response: 
                    pkg += response

                # End of Package 
                else:

                    pkg_count += 1
                    pkg += response

                    elapsed = (time.time() - start_time)
                    json_string = pkg.rstrip("\n").strip() # remove ending break lines and white spaces
                    current = json.loads(json_string)  # parse stringified JSON as dict

                    sampling_rate = int(pkg_count / elapsed) if pkg_count > 1 else 0

                    self.store(elapsed,current,sampling_rate)  # add current read package for plotting 
                    empty_msgs = 0
                    pkg = pkg[pkg.index("}")+1:]  # only keep data after the end of the first package

                    if pkg_count > EEG.xwindow+20:
                        recording=True

            socket.close()  # disconnect listening socket
            logging.info("Ended Streaming Session")
            EEG.recording = False
            
        def store(self, elapsed, current, sampling_rate):
            '''
            Rearranges incoming data into a dictionary of the following form
            '''
            xwindow = EEG.xwindow

            original_time = current["time"]
            del current["time"]
            datadic = {**{'time':elapsed},
                       **current,
                       **{'sampling_rate':sampling_rate}
                      }

            EEG.data = EEG.data.append(datadic, ignore_index=True)

            if EEG.data.shape[0] > xwindow+20:
                EEG.plot_buffer = EEG.data[-xwindow]
            return 
    class Plotter:
        
        def __init__(self):
            '''Feel free to change the time_window,fps and pkg_limit. Other changes probably break the programm'''
            self.xwindow = int(EEG.sampling_rate*EEG.time_window) # Expected number of simultaneously plotted data
            # Plotting Instance
            self.fig = None
            
        def live_plot(self):
            '''
            Starts a thread that records data from the socket and shows a live plot
            '''

            logging.info("Waiting for incoming data..")
            while not EEG.recording:
                time.sleep(0.5)

            self.build_plot()
            self.animate_plot()

            if input("Stream Again? | 'y' for 'yes'") == 'y':
                self.live_plot()
                clear_output()
            else:
                return

            recording_thread.join() # Wait for the thread to end properly

                                    #=== PLOT RELATED FUNCTIONS ===#

        def build_plot(self):
            # Preparing the Plot and extracting the lines
            plot = EEG.data.plot(x="time", y= [str(channel+1) for channel in range(EEG.n_channels)])
            self.fig = plott.gcf()
            self.lines = [line for line in self.fig.gca().lines]

            # Setting Some Plot properties
            self.fig.gca().set_title("EEG Streaming @ 0 Hz")
            self.fig.gca().set_ylim(-2500,2500)
            self.fig.gca().set_xlabel("time [s]")
            self.fig.gca().set_ylabel("signal [mV]")


        def animate_plot(self):
            global recording

            interval = 1/self.fps
            while recording:
                time.sleep(interval)
                success = self.update_lines()
                self.fig.canvas.draw()

            return 

        def update_lines(self):
            # Update Plot Lines
            for i,line in enumerate(self.lines):
                line.set_data(data.time, data[f"{i+1}"])
            #Update x-axis
            right = data.time.iloc[-1] # last timestamp
            left = right-self.time_window
            self.fig.gca().set_xlim(left,right)
            #Update y-axis
            self.fig.gca().set_ylim(-mV_window,mV_window)
            #Update other Plot elements
            self.fig.gca().set_title(f"Traumschreiber Streaming @ {data.sampling_rate.iloc[-1]}Hz")
            self.fig.gca().legend(loc="upper right")

            return 



In [55]:
# Initiating GUI:
# Message to user

class GUI:
    def __init__(self):
        self.streamer = EEG.Streamer()
        self.plotter = EEG.Plotter()
        self.gui_elements = []
        self.build_widgets()

    def build_widgets(self):
        instruction = iw.HTML(value= "<h1>Build connection and start streaming from the app. ")
        self.gui_elements.append(instruction)
        
        self.add_button("Build Connection", self.build_connection, toggle=True)
        self.add_html(value="<p style='color:red'>Not connected</p>")
        
        self.add_button("Start Recording", self.start_recording, toggle=True)
        self.add_html(value='<p style="color:red">Not recording, no data yet</p>')
        
        self.add_button("Download Data", self.download, toggle="False")
        self.add_html(value='<p style="color:red"></p>')
        
        self.add_button("Reload GUI", self.reload, toggle=False)
        
    def display(self):
        for i in self.gui_elements:
            display(i)

    def reload(self, b="v"):
        clear_output()
        self.display()
    
    def detect_connection_change(self):
        current = EEG.connected
        while EEG.connected == current:
            time.sleep(0.4)
        self.update_connection_status(delay=0.1)
        return
    
    def update_connection_status(self, delay=0.5):
        time.sleep(delay)
        if EEG.connecting:
            status = "<div style='color:green'>Waiting for Connection</p>"
        elif EEG.connected:
            status = f"<div style='color:green'> Connected to {self.streamer.ipconn}</p>"
        else:
            status = "<div style='color:red'> Not connected</p>"
            
        self.gui_elements[2] = iw.HTML(value=status)
        self.reload()
        return

    def update_recording_status(self, interval=5):
        while EEG.recording:
            time.sleep(interval)
            status = f"<div style='color:green'>Recording, package size: {EEG.data.size} </p>"
            self.gui_elements[4] = iw.HTML(value=status)
            self.reload()
            
        status = f"<div style='color:red'> Not recording, package size: {EEG.data.size} </p>"
        self.gui_elements[4] = iw.HTML(value=status)
        self.reload()
        return 
    
    
    def download(self, obj):
        EEG.download()
        time = datetime.datetime.now().strftime("%Y-%m-%d-%H%M")
        filepath = f"sessions/{time}.csv"
        status = f"<div style='color:green'> File saved to {filepath}. </p>"
        self.gui_elements[6] = iw.HTML(value=status)
        self.reload()
        return
    
    def add_html(self, value):
        html = iw.HTML(value=value)
        self.gui_elements.append(html)
        
    def add_button(self, description, target, toggle=False):
        self.gui_elements.append(self.make_button(description, target, toggle))
        
    def make_button(self, description, target, toggle=False, height="50px", **kwargs): 
        if not toggle:
            button = iw.Button(description=description)
            button.on_click(target)
        else:
            button = iw.ToggleButton(description=description)
            button.observe(target,"value")
        
        button.layout = iw.Layout(height=height)
        return button
    
        
    def build_connection(self,obj):
        if obj["new"]:  
            
            if not EEG.connecting:
                t = Thread(target=self.streamer.build_connection, daemon=True)
                t.start()
            else:
                self.update_connection_status()
            
        else:
            EEG.connected=False
            EEG.connecting=False
            try:
                self.streamer.socket.close()
            except:
                pass
            
        Thread(target=self.detect_connection_change).start()
        self.update_connection_status()
        return

    def start_recording(self,obj):
        if obj["new"]:
            recording_thread = Thread(target=self.streamer.record, daemon=True)
            recording_thread.start()
            gui_update = Thread(target=self.update_recording_status)
            gui_update.start()
        else:
            EEG.recording=False
            self.update_recording_status()
            return

    def start_plotting(self,obj):
        if obj["new"]:
            EEG.plotting = True
            print(f"Value of EEGStream.plotting is now {EEG.recording}")
        else:
            EEG.plotting=False
            print(f"Value of EEGStream.plotting is now {EEG.recording}")
            return

    def toggle_value(self, value):
        if obj["new"]:
            value = True
            print("Value is now True")
        else:
            value = False
            print("Value is now False")
    

In [56]:
gui = GUI()
gui.display()

HTML(value='<h1>Build connection and start streaming from the app. ')

ToggleButton(value=False, description='Build Connection', layout=Layout(height='50px'))

HTML(value="<p style='color:red'>Not connected</p>")

ToggleButton(value=False, description='Start Recording', layout=Layout(height='50px'))

HTML(value='<p style="color:red">Not recording, no data yet</p>')

ToggleButton(value=False, description='Download Data', layout=Layout(height='50px'))

HTML(value="<div style='color:green'> File saved to sessions/2020-07-13-0216.csv. </p>")

Button(description='Reload GUI', layout=Layout(height='50px'), style=ButtonStyle())