# Libraries

In [1]:
!pip install gspread
!pip install google-auth
!pip install pyserial gspread oauth2client


Defaulting to user installation because normal site-packages is not writeable



[notice] A new release of pip is available: 24.3.1 -> 25.0.1
[notice] To update, run: python.exe -m pip install --upgrade pip


Defaulting to user installation because normal site-packages is not writeable



[notice] A new release of pip is available: 24.3.1 -> 25.0.1
[notice] To update, run: python.exe -m pip install --upgrade pip


Defaulting to user installation because normal site-packages is not writeable



[notice] A new release of pip is available: 24.3.1 -> 25.0.1
[notice] To update, run: python.exe -m pip install --upgrade pip


# Sample test code to check if readings are appended to a given spreadsheet

In [None]:
import gspread
from google.oauth2.service_account import Credentials

# Path to your JSON key file
SERVICE_ACCOUNT_FILE = './.json' # Replace with your actual path to the JSON key file
SCOPES = ["https://www.googleapis.com/auth/spreadsheets"]

credentials = Credentials.from_service_account_file(SERVICE_ACCOUNT_FILE, scopes=SCOPES)
client = gspread.authorize(credentials)

# Access your Google Sheet by ID
sheet_id = ""  # Replace with your actual Sheet ID
sheet = client.open_by_key(sheet_id).sheet1  # First sheet

# Try appending a row as a test
sheet.append_row(["PersonId","Test Timestamp", "Test BPM", "Test SpO2"])
print("Test data appended successfully.")


Test data appended successfully.


# Code that take continous readings and append final one

In [None]:
import serial
import time
import gspread
from oauth2client.service_account import ServiceAccountCredentials
from datetime import datetime
import re

# Path to your Service Account JSON file
SERVICE_ACCOUNT_FILE = './your-project.json'#Replace with your actual json file path

# Google Sheets setup
SCOPES = ["https://www.googleapis.com/auth/spreadsheets"]
credentials = ServiceAccountCredentials.from_json_keyfile_name(SERVICE_ACCOUNT_FILE, SCOPES)
client = gspread.authorize(credentials)

# Replace with your Google Sheet ID and worksheet name
sheet_id = "sheet_id"
sheet = client.open_by_key(sheet_id).sheet1  # Access the first sheet

# Serial setup
SERIAL_PORT = ''  # Update with your serial port for ESP32
BAUD_RATE = 115200

# Initialize the serial number (check the last serial number in the spreadsheet)
def get_last_serial_number():
    try:
        last_row = len(sheet.get_all_values())  # Get the number of rows with data
        if last_row > 1:  # Assuming the first row is headers
            last_serial = int(sheet.cell(last_row, 1).value)  # Serial number in the first column
        else:
            last_serial = 0  # Start with 0 if there are no entries yet
        return last_serial
    except Exception as e:
        print(f"[ERROR] Could not retrieve last serial number: {e}")
        return 0

# Start the serial number from the last row
serial_number = get_last_serial_number()

def append_to_sheet(serial, bpm, spo2):
    """Function to append serial, BPM, and SpO2 values to Google Sheets."""
    try:
        now = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
        sheet.append_row([serial, now, bpm, spo2])
        print(f"[INFO] Appended to sheet: Serial = {serial}, BPM = {bpm}, SpO2 = {spo2}")
    except Exception as e:
        print(f"[ERROR] Failed to append to sheet: {e}")

def calculate_average(readings):
    """Calculate the average of the readings."""
    if readings:
        avg_bpm = sum(r[0] for r in readings) / len(readings)
        avg_spo2 = sum(r[1] for r in readings) / len(readings)
        return avg_bpm, avg_spo2
    return 0, 0

def main():
    global serial_number
    try:
        # Initialize serial connection
        print("[INFO] Attempting to connect to serial port...")
        ser = serial.Serial(SERIAL_PORT, BAUD_RATE, timeout=1)
        print("[INFO] Connected to serial port")

        readings = []  # List to store readings (bpm, spo2)
        start_time = time.time()  # Track when the measurement period starts

        while True:
            # Read data from serial
            if ser.in_waiting > 0:
                try:
                    line = ser.readline().decode('utf-8').strip()
                    print(f"[DEBUG] Received raw data: {line}")

                    # Match the format "Sent: BPM: <value> SpO2: <value>%"
                    match = re.match(r"Sent: BPM:\s*(\d+\.\d+)\s*SpO2:\s*(\d+\.\d+)%", line)
                    if match:
                        bpm = float(match.group(1))
                        spo2 = float(match.group(2))
                        readings.append((bpm, spo2))  # Store the readings
                        print(f"[DEBUG] Current Readings: BPM = {bpm}, SpO2 = {spo2}")
                    else:
                        print("[WARN] Data format does not match expected pattern")

                except UnicodeDecodeError as e:
                    print(f"[ERROR] Failed to decode line: {e}")

            # After 20 seconds, calculate the average and append to the sheet
            if time.time() - start_time >= 20:  # Check if 20 seconds have passed
                print("[INFO] 20 seconds elapsed. Calculating average...")
                if readings:  # Ensure that there are readings to process
                    avg_bpm, avg_spo2 = calculate_average(readings)
                    serial_number += 1  # Increment serial number for each new entry
                    print(f"[INFO] Average BPM: {avg_bpm}, Average SpO2: {avg_spo2}")
                    append_to_sheet(serial_number, avg_bpm, avg_spo2)  # Append the average reading to the sheet
                else:
                    print("[WARN] No readings taken in the 20-second window")
                
                readings.clear()  # Reset readings for the next person
                start_time = time.time()  # Reset the timer for the next person

            # Add a delay to prevent high CPU usage; adjust as needed
            time.sleep(1)
            
    except serial.SerialException as e:
        print(f"[ERROR] Could not open serial port: {e}")
    except KeyboardInterrupt:
        print("[INFO] Program terminated by user")
    finally:
        if 'ser' in locals():
            ser.close()
            print("[INFO] Serial port closed")

if __name__ == "__main__":
    main()


[INFO] Attempting to connect to serial port...
[INFO] Connected to serial port
[DEBUG] Received raw data: Beat!!!
[WARN] Data format does not match expected pattern
[DEBUG] Received raw data: Beat!!!
[WARN] Data format does not match expected pattern
[DEBUG] Received raw data: Beat!!!
[WARN] Data format does not match expected pattern
[DEBUG] Received raw data: Beat!!!
[WARN] Data format does not match expected pattern
[DEBUG] Received raw data: Person detected. Starting measurement...
[WARN] Data format does not match expected pattern
[DEBUG] Received raw data: Sent: BPM: 56.70 SpO2: 93.00%
[DEBUG] Current Readings: BPM = 56.7, SpO2 = 93.0
[DEBUG] Received raw data: Beat!!!
[WARN] Data format does not match expected pattern
[DEBUG] Received raw data: Sent: BPM: 43.49 SpO2: 93.00%
[DEBUG] Current Readings: BPM = 43.49, SpO2 = 93.0
[DEBUG] Received raw data: Beat!!!
[WARN] Data format does not match expected pattern
[DEBUG] Received raw data: Beat!!!
[WARN] Data format does not match ex

# Code that takes readings of a person, append final one and stops after that

In [1]:
import serial
import time
import gspread
from oauth2client.service_account import ServiceAccountCredentials
from datetime import datetime
import re

# Path to your Service Account JSON file
SERVICE_ACCOUNT_FILE = './your-project.json'#Replace with your actual json file path

# Google Sheets setup
SCOPES = ["https://www.googleapis.com/auth/spreadsheets"]
credentials = ServiceAccountCredentials.from_json_keyfile_name(SERVICE_ACCOUNT_FILE, SCOPES)
client = gspread.authorize(credentials)

# Replace with your Google Sheet ID and worksheet name
sheet_id = "sheet_id"
sheet = client.open_by_key(sheet_id).sheet1  # Access the first sheet

# Serial setup
SERIAL_PORT = '/dev/cu.usbserial-0001'  # Update with your serial port for ESP32
BAUD_RATE = 115200

# Initialize the serial number (check the last serial number in the spreadsheet)
def get_last_serial_number():
    try:
        last_row = len(sheet.get_all_values())  # Get the number of rows with data
        if last_row > 1:  # Assuming the first row is headers
            last_serial = int(sheet.cell(last_row, 1).value)  # Serial number in the first column
        else:
            last_serial = 0  # Start with 0 if there are no entries yet
        return last_serial
    except Exception as e:
        print(f"[ERROR] Could not retrieve last serial number: {e}")
        return 0

# Start the serial number from the last row
serial_number = get_last_serial_number()

def append_to_sheet(serial, bpm, spo2):
    """Function to append serial, BPM, and SpO2 values to Google Sheets."""
    try:
        now = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
        sheet.append_row([serial, now, bpm, spo2])
        print(f"[INFO] Appended to sheet: Serial = {serial}, BPM = {bpm}, SpO2 = {spo2}")
    except Exception as e:
        print(f"[ERROR] Failed to append to sheet: {e}")

def calculate_average(readings):
    """Calculate the average of the readings."""
    if readings:
        avg_bpm = sum(r[0] for r in readings) / len(readings)
        avg_spo2 = sum(r[1] for r in readings) / len(readings)
        return avg_bpm, avg_spo2
    return 0, 0

def main():
    global serial_number
    try:
        # Initialize serial connection
        print("[INFO] Attempting to connect to serial port...")
        ser = serial.Serial(SERIAL_PORT, BAUD_RATE, timeout=1)
        print("[INFO] Connected to serial port")

        readings = []  # List to store readings (bpm, spo2)
        start_time = time.time()  # Track when the measurement period starts

        while time.time() - start_time < 20:  # 20-second window for one person's readings
            # Read data from serial
            if ser.in_waiting > 0:
                try:
                    line = ser.readline().decode('utf-8').strip()
                    print(f"[DEBUG] Received raw data: {line}")

                    # Match the format "Sent: BPM: <value> SpO2: <value>%"
                    match = re.match(r"Sent: BPM:\s*(\d+\.\d+)\s*SpO2:\s*(\d+\.\d+)%", line)
                    if match:
                        bpm = float(match.group(1))
                        spo2 = float(match.group(2))
                        readings.append((bpm, spo2))  # Store the readings
                        print(f"[DEBUG] Current Readings: BPM = {bpm}, SpO2 = {spo2}")
                    else:
                        print("[WARN] Data format does not match expected pattern")

                except UnicodeDecodeError as e:
                    print(f"[ERROR] Failed to decode line: {e}")

            # Add a small delay to prevent high CPU usage; adjust as needed
            time.sleep(1)

        # After 20 seconds, calculate the average and append to the sheet
        print("[INFO] 20 seconds elapsed. Calculating average...")
        if readings:  # Ensure that there are readings to process
            avg_bpm, avg_spo2 = calculate_average(readings)
            serial_number += 1  # Increment serial number for each new entry
            print(f"[INFO] Average BPM: {avg_bpm}, Average SpO2: {avg_spo2}")
            append_to_sheet(serial_number, avg_bpm, avg_spo2)  # Append the average reading to the sheet
        else:
            print("[WARN] No readings taken in the 20-second window")

    except serial.SerialException as e:
        print(f"[ERROR] Could not open serial port: {e}")
    except KeyboardInterrupt:
        print("[INFO] Program terminated by user")
    finally:
        if 'ser' in locals():
            ser.close()
            print("[INFO] Serial port closed")

if __name__ == "__main__":
    main()


[INFO] Attempting to connect to serial port...
[INFO] Connected to serial port
[DEBUG] Received raw data: Beat!!!
[WARN] Data format does not match expected pattern
[DEBUG] Received raw data: Beat!!!
[WARN] Data format does not match expected pattern
[DEBUG] Received raw data: Beat!!!
[WARN] Data format does not match expected pattern
[DEBUG] Received raw data: Beat!!!
[WARN] Data format does not match expected pattern
[DEBUG] Received raw data: Beat!!!
[WARN] Data format does not match expected pattern
[DEBUG] Received raw data: Beat!!!
[WARN] Data format does not match expected pattern
[DEBUG] Received raw data: Person detected. Starting measurement...
[WARN] Data format does not match expected pattern
[DEBUG] Received raw data: Sent: BPM: 45.60 SpO2: 94.00%
[DEBUG] Current Readings: BPM = 45.6, SpO2 = 94.0
[DEBUG] Received raw data: Beat!!!
[WARN] Data format does not match expected pattern
[DEBUG] Received raw data: Sent: BPM: 40.63 SpO2: 94.00%
[DEBUG] Current Readings: BPM = 40.6

# Code that take readings of a person, appends all and then stops

In [18]:
import serial
import time
import gspread
from oauth2client.service_account import ServiceAccountCredentials
from datetime import datetime

# Path to your Service Account JSON file
SERVICE_ACCOUNT_FILE = './your-project.json'#Replace with your actual json file path

# Google Sheets setup
SCOPES = ["https://www.googleapis.com/auth/spreadsheets"]
credentials = ServiceAccountCredentials.from_json_keyfile_name(SERVICE_ACCOUNT_FILE, SCOPES)
client = gspread.authorize(credentials)

# Replace with your Google Sheet ID and worksheet name
sheet_id = "sheet_id"
sheet = client.open_by_key(sheet_id).sheet1  # Access the first sheet

# Serial setup
SERIAL_PORT = '/dev/cu.usbserial-0001'  # Updated serial port for ESP32
BAUD_RATE = 115200

def get_next_person_id():
    """Retrieve the last person ID from the Google Sheet and return the next one."""
    try:
        last_row = sheet.get_all_values()[-1]  # Get the last row in the sheet
        last_person_id = int(last_row[0])  # Assume person_id is in the first column
        return last_person_id + 1
    except IndexError:
        # If the sheet is empty, start with person ID 1
        return 1
    except ValueError as e:
        print(f"[ERROR] Could not retrieve last person ID: {e}")
        return 1

def append_to_sheet(person_id, bpm, spo2):
    """Function to append Person ID, BPM, and SpO2 values to Google Sheets."""
    try:
        now = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
        sheet.append_row([person_id, now, bpm, spo2])
        print(f"[INFO] Appended to sheet: Person ID = {person_id}, BPM = {bpm}, SpO2 = {spo2}")
    except Exception as e:
        print(f"[ERROR] Failed to append to sheet: {e}")

def main():
    try:
        # Initialize serial connection
        print("[INFO] Attempting to connect to serial port...")
        ser = serial.Serial(SERIAL_PORT, BAUD_RATE, timeout=1)
        print("[INFO] Connected to serial port")

        person_id = get_next_person_id()  # Get next person ID from the sheet

        # Track readings for a person for a fixed time window (20 seconds)
        start_time = time.time()
        print(f"[INFO] Starting new session for Person ID: {person_id}")

        while time.time() - start_time <= 20:  # 20-second window for one person's readings
            if ser.in_waiting > 0:
                try:
                    line = ser.readline().decode('utf-8').strip()
                    print(f"[DEBUG] Received raw data: '{line}'")

                    # Parse data if it follows the format "Sent: BPM: <value> SpO2: <value>%"
                    if line.startswith("Sent: BPM:") and "SpO2:" in line:
                        parts = line.replace("Sent: ", "").split(" ")
                        print(f"[DEBUG] Parsed data parts: {parts}")

                        try:
                            bpm = float(parts[1])
                            spo2 = float(parts[3].replace("%", ""))
                            print(f"[INFO] Parsed BPM = {bpm}, SpO2 = {spo2}")
                            append_to_sheet(person_id, bpm, spo2)  # Append each reading to the sheet with person ID
                        except ValueError as e:
                            print(f"[ERROR] Failed to convert data to float: {e}")
                    else:
                        print("[WARN] Data format does not match expected pattern")
                
                except UnicodeDecodeError as e:
                    print(f"[ERROR] Failed to decode line: {e}")

            # Add a small delay to prevent high CPU usage
            time.sleep(1)

        # End the session after 20 seconds
        print(f"[INFO] 20-second session completed for Person ID: {person_id}")

    except serial.SerialException as e:
        print(f"[ERROR] Could not open serial port: {e}")
    except KeyboardInterrupt:
        print("[INFO] Program terminated by user")
    finally:
        if 'ser' in locals():
            ser.close()
            print("[INFO] Serial port closed")

if __name__ == "__main__":
    main()


[INFO] Attempting to connect to serial port...
[INFO] Connected to serial port
[INFO] Starting new session for Person ID: 5
[DEBUG] Received raw data: 'Beat!!!'
[WARN] Data format does not match expected pattern
[DEBUG] Received raw data: 'Beat!!!'
[WARN] Data format does not match expected pattern
[DEBUG] Received raw data: 'Beat!!!'
[WARN] Data format does not match expected pattern
[DEBUG] Received raw data: 'Person detected. Starting measurement...'
[WARN] Data format does not match expected pattern
[DEBUG] Received raw data: 'Beat!!!'
[WARN] Data format does not match expected pattern
[DEBUG] Received raw data: 'Sent: BPM: 78.26 SpO2: 97.00%'
[DEBUG] Parsed data parts: ['BPM:', '78.26', 'SpO2:', '97.00%']
[INFO] Parsed BPM = 78.26, SpO2 = 97.0
[INFO] Appended to sheet: Person ID = 5, BPM = 78.26, SpO2 = 97.0
[DEBUG] Received raw data: 'Beat!!!'
[WARN] Data format does not match expected pattern
[DEBUG] Received raw data: 'Sent: BPM: 82.43 SpO2: 97.00%'
[DEBUG] Parsed data parts: [