# Entire Notebook Should be Executed for Rasberry Pi to Function Properly

# Import libraries and dependencies

In [1]:
!pip install adafruit-gpio

Looking in indexes: https://pypi.org/simple, https://www.piwheels.org/simple
You should consider upgrading via the '/home/feliperivas/project/env/bin/python -m pip install --upgrade pip' command.[0m


In [2]:
import Adafruit_GPIO as GPIO

In [3]:
!pip install Adafruit-MCP3008 

Looking in indexes: https://pypi.org/simple, https://www.piwheels.org/simple
You should consider upgrading via the '/home/feliperivas/project/env/bin/python -m pip install --upgrade pip' command.[0m


In [4]:
pip list

Package                          Version
-------------------------------- -------------------
absl-py                          0.15.0
Adafruit-Blinka                  8.16.1
adafruit-circuitpython-busdevice 5.2.4
adafruit-circuitpython-requests  1.13.1
adafruit-circuitpython-typing    1.9.0
Adafruit-GPIO                    1.0.3
Adafruit-MCP3008                 1.0.2
Adafruit-PlatformDetect          3.42.0
Adafruit-PureIO                  1.1.10
anyio                            3.6.2
argon2-cffi                      21.3.0
argon2-cffi-bindings             21.2.0
astunparse                       1.6.3
attrs                            22.2.0
backcall                         0.2.0
beautifulsoup4                   4.12.0
bleach                           6.0.0
cached-property                  1.5.2
cachetools                       5.3.0
certifi                          2022.12.7
cffi                             1.15.1
charset-normalizer               3.1.0
cycler                           0

# TestCode

In [5]:
import time
import os
import datetime
import numpy as np

import Adafruit_GPIO.SPI as SPI
import Adafruit_MCP3008
import tensorflow as tf
import spidev
import time

SPI_PORT = 0
SPI_DEVICE = 0
ncp = Adafruit_MCP3008.MCP3008(spi=SPI.SpiDev(SPI_PORT,SPI_DEVICE))

# Open SPI bus
spi = spidev.SpiDev()
spi.open(0,0)


# Initialize the model
model_path = '/home/feliperivas/project/best_model.tflite'
interpreter = tf.lite.Interpreter(model_path=model_path)
interpreter.allocate_tensors()

# Get input and output tensors
input_details = interpreter.get_input_details()
output_details = interpreter.get_output_details()
# print(input_details)

# Function to read SPI data from MCP3008 ADC
def ReadChannel(channel):
    adc = spi.xfer2([1,(8+channel)<<4,0])
    data = ((adc[1]&3) << 8) + adc[2]
    return data

# Function to convert data to voltage level
def ConvertVolts(data,places):
    volts = (data * 3.3) / float(1023)
    volts = round(volts,places)
    return volts

# ECG channel
ecg_channel = 0


# Set the timer for 20 seconds
duration = 20

print(f"Starting ECG monitoring for {duration} seconds...")
start_time = time.monotonic()
i =0
while (time.monotonic() - start_time) < duration:
    i+=1 /2
    if i.is_integer():
        print(int(i))
    ecg_level = ReadChannel(ecg_channel)
    ecg_volts = ConvertVolts(ecg_level, 2)

    # Prepare input data for the model
    input_data = np.array([ecg_level]*186).astype(np.float32)
    input_data = np.expand_dims(input_data, axis=0)
    input_data = np.expand_dims(input_data, axis=2)

    # Run the model
    interpreter.set_tensor(input_details[0]['index'], input_data)
    interpreter.invoke()
    output_data = interpreter.get_tensor(output_details[0]['index'])
    time.sleep(0.5)
    # Process the output of the model
if output_data[0][0] > 0.5:
    print("ECG level: {} ({}V), abnormal heart rate detected".format(ecg_level, ecg_volts))
else:
    print("ECG level: {} ({}V), heart rate normal".format(ecg_level, ecg_volts))
    
print(f"Finished ECG monitoring for {duration} seconds.")    

Starting ECG monitoring for 20 seconds...


INFO: Created TensorFlow Lite XNNPACK delegate for CPU.


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
ECG level: 65 (0.21V), heart rate normal
Finished ECG monitoring for 20 seconds.


In [6]:
!pip install --upgrade numpy

Looking in indexes: https://pypi.org/simple, https://www.piwheels.org/simple
Requirement already up-to-date: numpy in ./env/lib/python3.7/site-packages (1.21.6)
You should consider upgrading via the '/home/feliperivas/project/env/bin/python -m pip install --upgrade pip' command.[0m


In [7]:
!pip install --upgrade matplotlib

Looking in indexes: https://pypi.org/simple, https://www.piwheels.org/simple
Requirement already up-to-date: matplotlib in ./env/lib/python3.7/site-packages (3.5.3)
You should consider upgrading via the '/home/feliperivas/project/env/bin/python -m pip install --upgrade pip' command.[0m


In [8]:
!pip install scipy

Looking in indexes: https://pypi.org/simple, https://www.piwheels.org/simple
You should consider upgrading via the '/home/feliperivas/project/env/bin/python -m pip install --upgrade pip' command.[0m


# Working Code

In [42]:
import spidev
import time
import sys
import os
import datetime
import Adafruit_GPIO.SPI as SPI
import Adafruit_MCP3008
import tkinter as tk
from matplotlib.backends.backend_tkagg import FigureCanvasTkAgg
from matplotlib.figure import Figure
from scipy.signal import butter, sosfilt
import numpy as np

class MainWindow:
    def __init__(self, master):
        self.master = master
        master.title("ECG Reading")

        self.new_window_button = tk.Button(master, text="Open New ECG Reading", command=self.open_new_window)
        self.new_window_button.pack()

    def open_new_window(self):
        self.new_window = tk.Toplevel(self.master)
        self.new_window_label = tk.Label(self.new_window, text="ECG Results")
        self.new_window_label.pack()

        self.start_button = tk.Button(self.new_window, text="Start Monitoring", command=self.start_ecg_monitoring)
        self.start_button.pack()
    
    def open_new_window(self):
        self.new_window = tk.Toplevel(self.master)
        self.new_window_label = tk.Label(self.new_window, text="Live ECG Reading")
        self.new_window_label.pack()

        self.start_button = tk.Button(self.new_window, text="Start", command=self.start_function)
        self.start_button.pack()

        self.stop_button = tk.Button(self.new_window, text="Stop", command=self.stop_function, state=tk.DISABLED)
        self.stop_button.pack()

        self.ecg_monitor_button = tk.Button(self.new_window, text="Classify ECG Reading", command=self.start_ecg_monitoring, state=tk.DISABLED)
        self.ecg_monitor_button.pack()

        self.fig = Figure(figsize=(5, 4), dpi=100)
        self.ax = self.fig.add_subplot(111)
        self.ax.set_xlabel('Time')
        self.ax.set_ylabel('ECG Signal')
        self.line, = self.ax.plot([], [])
        self.canvas = FigureCanvasTkAgg(self.fig, self.new_window)
        self.canvas.draw()
        self.canvas.get_tk_widget().pack()

        self.running = False
        
        import sys

    def start_function(self):
        self.start_button.config(state=tk.DISABLED)
        self.stop_button.config(state=tk.NORMAL)
        self.ecg_monitor_button.config(state=tk.DISABLED)
        self.running = True

    # Open a txt file to save the print statements
        now = datetime.datetime.now()
        filename = "ecg_results.txt"
        file = open(filename, "a")
        sys.stdout = file
        self.update_graph()
        


    def stop_function(self):
        self.start_button.config(state=tk.NORMAL)
        self.stop_button.config(state=tk.DISABLED)
        self.ecg_monitor_button.config(state=tk.NORMAL)
        self.running = False

    def update_graph(self):
        SPI_PORT = 0
        SPI_DEVICE = 0
        ncp = Adafruit_MCP3008.MCP3008(spi=SPI.SpiDev(SPI_PORT, SPI_DEVICE))

        # ECG channel
        ecg_channel = 0

        # Butterworth filter parameters
        fs = 1000  # Sampling frequency (Hz)
        nyquist_freq = 0.5 * fs
        fc = 40  # Cutoff frequency (Hz)
        order = 4  # Filter order
        sos = butter(order, fc/nyquist_freq, btype='low', output='sos')
        padlen = 5 * (order - 1) #padding length

        ecg_data = []
        time_data = []

        start_time = time.time()


        # Calculate filter coefficients
        b, a = butter(order, fc / (fs / 2), btype='lowpass')

        while self.running:
            ecg_level = ncp.read_adc(ecg_channel)
            ecg_volts = round((ecg_level * 3.3) / float(1023), 2)

            # Append raw ECG data to buffer
            ecg_data.append(ecg_volts)
            time_data.append(time.time() - start_time)

            # Apply Butterworth filter to ECG data
            # Apply the filter to the ECG data
            if len(ecg_data) >= padlen:
                ecg_data_padded = np.pad(ecg_data, (0, padlen), 'constant', constant_values=(0, 0))
                filtered_ecg_data = sosfilt(sos, ecg_data_padded)
                filtered_ecg_data = filtered_ecg_data[padlen:]
                self.line.set_ydata(filtered_ecg_data)
            else:
                self.line.set_ydata(ecg_data)


            self.line.set_xdata(time_data)
            self.ax.relim()
            self.ax.autoscale_view()
            self.canvas.draw()

            self.master.update()
            time.sleep(0.01)

    def start_ecg_monitoring(self):
        ecg_data_filtered = self.update_graph()

        # Create a new window to display ECG monitoring results
        ecg_window = tk.Toplevel(self.master)
        ecg_window.title("ECG Classification Results")

        ecg_label = tk.Label(ecg_window, text="ECG Results\nMonitoring in Progress...")
        ecg_label.pack()

       

        # Initialize the model
        model_path = '/home/feliperivas/project/best_model.tflite'
        interpreter = tf.lite.Interpreter(model_path=model_path)
        interpreter.allocate_tensors()
        now = datetime.datetime.now()

        # Get input and output tensors
        input_details = interpreter.get_input_details()
        output_details = interpreter.get_output_details()

        # Set the timer for 20 seconds
        duration = 5

        print(f"Starting ECG classification for {duration} seconds...")
        start_time = time.monotonic()

        # Get the filtered ECG data
        ecg_data_filtered = self.line.get_ydata()

        while (time.monotonic() - start_time) < duration:
            
            ecg_level = ReadChannel(ecg_channel)
            ecg_volts = ConvertVolts(ecg_level, 2)

            # Prepare input data for the model
            input_data = np.array([ecg_level]*186).astype(np.float32)
            input_data = np.expand_dims(input_data, axis=0)
            input_data = np.expand_dims(input_data, axis=2)

            # Run the model
            interpreter.set_tensor(input_details[0]['index'], input_data)
            if input_data.shape != tuple(input_details[0]['shape']):
                raise ValueError("Input data shape doesn't match expected shape")
            interpreter.set_tensor(input_details[0]['index'], input_data)
            interpreter.invoke()
            output_data = interpreter.get_tensor(output_details[0]['index'])
            time.sleep(0.5)

            # Process the output of the model
            ecg_data_filtered = self.update_graph()
           

        now = datetime.datetime.now()
        print(now)
        print("ECG monitoring complete.")
        if output_data[0][0] > 0.5:
#             now = datetime.datetime.now()
#             print(now)
#             print("ECG monitoring complete.")
            if output_data[0][1] > 0.5:
                ecg_status = "atrial fibrillation detected"
                
                print("ECG level: {} ({}V), atrial fibrillation detected".format(ecg_level, ecg_volts))
                ecg_label.configure(fg="red",text="ECG Results\n{}\nECG level: {} ({}V)".format(ecg_status, ecg_level, round((ecg_level * 3.3) / float(1023), 2)))
            elif output_data[0][2] > 0.5:
                ecg_status = "premature ventricular contraction detected"
                prompt = "please see doctor "
                print("ECG level: {} ({}V), premature ventricular contraction detected".format(ecg_level, ecg_volts))
                ecg_label.configure(fg="red",text="ECG Results\n{}\nECG level: {} ({}V)".format(ecg_status, ecg_level, round((ecg_level * 3.3) / float(1023), 2)))
            elif output_data[0][3] > 0.5:
                ecg_status = "ventricular tachycardia detected"
                prompt = "please see doctor "
                print("ECG level: {} ({}V), ventricular tachycardia detected".format(ecg_level, ecg_volts))
                ecg_label.configure(fg="red",text="ECG Results\n{}\nECG level: {} ({}V)".format(ecg_status, ecg_level, round((ecg_level * 3.3) / float(1023), 2)))
            elif output_data[0][4] > 0.5:
                ecg_status = "ventricular fibrillation detected"
                prompt = "please see doctor "
                print("ECG level: {} ({}V), ventricular fibrillation detected".format(ecg_level, ecg_volts))
                ecg_label.configure(fg="red",text="ECG Results\n{}\nECG level: {} ({}V)".format(ecg_status, ecg_level, round((ecg_level * 3.3) / float(1023), 2)))
            else:
                ecg_status = "abnormal heart rate detected"
                prompt = "please see doctor "
                print("ECG level: {} ({}V), abnormal heart rate detected".format(ecg_level, ecg_volts))
                print("please see doctor".format(ecg_level, ecg_volts))
                ecg_label.configure(fg="red",text="ECG Results\n{}\nECG level: {} ({}V)".format(ecg_status, ecg_level, round((ecg_level * 3.3) / float(1023), 2)))
        else:
            ecg_status = "normal heart rate"
            prompt = "You Are Good!"
            print(prompt,"\nECG level: {} ({}V), heart rate normal".format(ecg_level, ecg_volts, ))
            ecg_label.configure(fg="green",text="ECG Results\n{}\nECG level: {} ({}V)".format(prompt,ecg_status, ecg_level, round((ecg_level * 3.3) / float(1023), 2)))
        ecg_window.update()
       
        
        ecg_label.configure(text="ECG Results\n{}\nECG level: {} ({}V)".format(prompt,ecg_status, ecg_level, round((ecg_level * 3.3) / float(1023), 2)))
        
        ecg_window.update()
        time.sleep(0.5)

root = tk.Tk()
gui = MainWindow(root)
root.mainloop()
