In [None]:
import os
import csv
import asyncio
import numpy as np
import time
import socket
from bleak import BleakScanner, BleakClient
from datetime import datetime
from scipy.signal import butter, filtfilt
from scipy.stats import skew, kurtosis, variation, moment
from scipy.signal import find_peaks, welch
from scipy.stats import hmean, gmean, iqr
from sklearn.preprocessing import StandardScaler
import neurokit2 as nk
from scipy.integrate import trapezoid
import joblib
import pandas as pd


# Apply nest_asyncio to allow asyncio event loop in a synchronous environment
import nest_asyncio
nest_asyncio.apply()

#-----------------------------------Setting up parameters-------------------------------     
# Device names and UUIDs for RHYTHM24 and Moodmetric ring
RHYTHM24_NAME = "Rhythm 24"
MOODMETRIC_NAME = "Moodmetric"

# Device UUIDs for RHYTHM24 and Moodmetric ring
HR_MEASUREMENT_UUID = "00002a37-0000-1000-8000-00805f9b34fb"
GATT_STREAM_CHAR_UUID = "a0956420-9bd2-11e4-bd06-0800200c9a66"
GATT_RAW_DATA_CHAR_UUID = "f1b41cde-dbf5-4acf-8679-ecb8b4dca6ff"

# TCP Server settings
TCP_IP = '127.0.0.1'  # Localhost
TCP_PORT = 5050        # Port to listen on

# Load the pre-trained model
model = joblib.load('model.joblib')
normalized_model = joblib.load('normalized_model.joblib')
scaler = joblib.load("scaler.joblib")

sampling_rate = 3  # Hz
SEGMENT_DURATION = int(5 * 60)  # 5 minutes in seconds
UPDATE_DURATION = int(0.5 * 60)  # 4 minutes in seconds

# Global variable to store the latest stress score and other readings
predicted_stress_state= None
latest_eda = None
latest_mm_stress_score = None
latest_rr_interval = None
latest_hr = None

eda_data = []
rr_interval_data = []

# Define a mapping dictionary
label_mapping = {
    0: "neutral",
    1: "stressed",
    2: "amused",
    3: "relaxed"
}

In [None]:
# Create folder if it doesn't exist
current_directory = os.getcwd()  # This will give you the directory where the script is executed
output_folder = os.path.join(current_directory, 'Readings')
os.makedirs(output_folder, exist_ok=True)

# File path for storing data
timestamp_init = datetime.now().strftime("%Y%m%d_%H%M")  # Format: YYYYMMDD_HHMMSS
combined_csv = os.path.join(output_folder, f'sensor_readings_{timestamp_init}.csv')

# Write headers for the combined CSV file if it doesn't exist
if not os.path.exists(combined_csv):
    with open(combined_csv, 'w', newline='') as file:
        writer = csv.writer(file)
        writer.writerow(['Timestamp', 'EDA data', 'MM Score','RR Interval','Heart Rate','State'])

#---------------------------------------------- Function for Saving to csv ----------------------------------------------        
def save_to_csv():
    timestamp = datetime.now().strftime("%H:%M:%S")
    with open(combined_csv, 'a', newline='') as file:
        writer = csv.writer(file)
        writer.writerow([timestamp, latest_eda, latest_mm_stress_score, latest_rr_interval, latest_hr, predicted_stress_state])
        

In [None]:
#---------------------------------------------- Functions to scan and connect to devices ----------------------------------------------
# Function to scan and connect to devices
async def connect_to_device(device_name, retries=3, scan_duration=10):
    for attempt in range(retries):
        print(f"Attempt {attempt + 1} of {retries}: Scanning for {device_name}...")
        devices = await BleakScanner.discover(timeout=scan_duration)
        for device in devices:
            if device_name in str(device.name):
                print(f"Found {device_name}")
                return device
        print(f"{device_name} not found. Retrying...")
    print(f"{device_name} not found after multiple attempts.")
    return None

# Function to read data from RHYTHM24
async def read_rhythm24_data(client):
    try:
        await client.start_notify(HR_MEASUREMENT_UUID, rhythm24_notification_handler)
        try:
            while True:
                await asyncio.sleep(1)
        except KeyboardInterrupt:
            print("Data collection stopped by user.")
        finally:
            await client.stop_notify(HR_MEASUREMENT_UUID)
            print("Notifications stopped.")
    except asyncio.TimeoutError:
        print(f"Connection to {RHYTHM24_NAME} timed out.")
    except Exception as e:
        print(f"An error occurred: {e}")

# Function to read data from Moodmetric ring
async def read_moodmetric_data(client):
    try:
        await client.start_notify(GATT_STREAM_CHAR_UUID, moodmetric_notification_handler)
        await client.start_notify(GATT_RAW_DATA_CHAR_UUID, moodmetric_notification_handler)
        try:
            while True:
                await asyncio.sleep(1)
        except KeyboardInterrupt:
            print("Data collection stopped by user.")
        finally:
            await client.stop_notify(GATT_STREAM_CHAR_UUID)
            await client.stop_notify(GATT_RAW_DATA_CHAR_UUID)
            print("Notifications stopped.")
    except asyncio.TimeoutError:
        print(f"Connection to {MOODMETRIC_NAME} timed out.")
    except Exception as e:
        print(f"An error occurred: {e}")

In [None]:
#---------------------------------------------- Function to send data to Unity ----------------------------------------------
# Function to send the latest stress score to Unity over TCP
def send_stress_score_to_unity():
    with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as server_socket:
        server_socket.bind((TCP_IP, TCP_PORT))
        server_socket.listen(1)  # Listen for a single client connection
        print(f"Waiting for connection on {TCP_IP}:{TCP_PORT}...")
        print("Do not close the program! It might take some time...")

        client_socket, client_address = server_socket.accept()
        with client_socket:
            print(f"Client connected: {client_address}")

            try:
                while True:
                    global predicted_stress_score
                    if predicted_stress_score is not None:
                        # Send the latest stress score as a string followed by a newline
                        stress_score_str = f"{predicted_stress_score}\n"
                        client_socket.sendall(stress_score_str.encode())

                    time.sleep(0.01)

            except (ConnectionResetError, BrokenPipeError):
                print("Client disconnected.")

In [None]:
#---------------------------------------------- Functions to extract features ----------------------------------------------
# Basic cleaning for RR intervals
def clean_rr_intervals(rr_intervals, threshold_low=0.3, threshold_high=2.0):
    valid_rr_intervals = rr_intervals[(rr_intervals >= threshold_low) & (rr_intervals <= threshold_high)]
    mean_rr = np.mean(valid_rr_intervals)
    rr_intervals = np.where((rr_intervals >= threshold_low) & (rr_intervals <= threshold_high), rr_intervals, mean_rr)
    return rr_intervals

# Resample to a new sampling rate with interpolation
def resample(values, target_timestamps, original_rate):
    if len(values) == 0 or len(target_timestamps) == 0:
        return np.array([])
    original_timestamps = np.arange(0, len(values) / original_rate, 1 / original_rate)
    min_length = min(len(values), len(original_timestamps))
    values = values[:min_length]
    original_timestamps = original_timestamps[:min_length]
    # print(f"Lengths - Values: {len(values)}, Original Timestamps: {len(original_timestamps)}, Target Timestamps: {len(target_timestamps)}")
    resampled_values = np.interp(target_timestamps, original_timestamps, values)
    return resampled_values

# Calculate SDNN for HRV
def calculate_hrv(rr_intervals):
    return np.std(rr_intervals * 1000)

# Lowpass filter for denoising EDA signal
def denoise_eda(eda_signal, fs, cutoff=1):
    nyquist = 0.5 * fs
    b, a = butter(4, cutoff / nyquist, btype='low')
    return filtfilt(b, a, eda_signal)

# Artifact correction for EDA signal
def preprocess_eda(eda_signal, fs):
    eda_denoised = denoise_eda(eda_signal, fs)
    return nk.eda_clean(eda_denoised, method="biosppy")

# Statistical Features
def statistical_features(signal, type=''):
    features = {
        f'{type}_mean': np.mean(signal),
        f'{type}_geometric_mean': gmean(signal) if len(signal) > 0 else np.nan,
        f'{type}_harmonic_mean': hmean(signal) if len(signal) > 0 else np.nan,
        f'{type}_standard_deviation': np.std(signal),
        f'{type}_mean_absolute_deviation': np.mean(np.abs(signal - np.mean(signal))),
        f'{type}_variance': np.var(signal),
        f'{type}_median': np.median(signal),
        f'{type}_maximum': np.max(signal),
        f'{type}_minimum': np.min(signal),
        f'{type}_25th_percentile': np.percentile(signal, 25),
        f'{type}_75th_percentile': np.percentile(signal, 75),
        f'{type}_range': np.max(signal) - np.min(signal),
        f'{type}_interquartile_range': iqr(signal),
        f'{type}_variation_coefficient': variation(signal) if len(signal) > 0 else np.nan,
        f'{type}_central_moment': moment(signal, moment=2),
        f'{type}_skewness': skew(signal) if len(signal) > 0 else np.nan,
        f'{type}_kurtosis': kurtosis(signal) if len(signal) > 0 else np.nan
    }
    return features

# Nonlinear Features Function with safe mode calculation
def nonlinear_features(signal, type=''):
    # Compute approximate entropy and sample entropy
    apen, apen_info = nk.entropy_approximate(signal, delay=1, dimension=2, tolerance='sd')
    sampen, sampen_info = nk.entropy_sample(signal, delay=1, dimension=2, tolerance='sd')
    
    # Store the results in the features dictionary directly
    features = {       
        f'{type}_approx_entropy': float(apen),  # Directly assign apen
        f'{type}_sample_entropy': float(sampen)  # Directly assign sampen
    }

    # Compute SD1 and SD2 manually for Poincaré plot features
    rr_diff = np.diff(signal)
    sd1 = np.sqrt(np.var(rr_diff) / 2)
    sd2 = np.sqrt(2 * np.var(signal) - (np.var(rr_diff) / 2))
    
    features.update({f'{type}_SD1': sd1, f'{type}_SD2': sd2})
    return features

# EDA Features
def eda_features(eda_signal, fs=4):
    eda_phasic = nk.eda_phasic(eda_signal, sampling_rate=fs)
    scl = np.mean(eda_phasic['EDA_Tonic'])
    peaks, _ = find_peaks(eda_phasic['EDA_Phasic'], height=0)
    scr_amplitude = np.mean(eda_phasic['EDA_Phasic'][peaks])
    scr_duration = np.mean(np.diff(peaks)) / fs if len(peaks) > 1 else 0
    scr_frequency = len(peaks) / len(eda_signal)
    return {
        'SCL': scl,
        'SCR Amplitude': scr_amplitude,
        'SCR Duration': scr_duration,
        'SCR Frequency': scr_frequency
    }

# HRV Time-Domain Features
def hrv_time_domain(rr_intervals, fs=4):
    rr_diff = np.diff(rr_intervals)
    sdnn = np.std(rr_intervals)
    avnn = np.mean(rr_intervals)
    rmssd = np.sqrt(np.mean(rr_diff ** 2))
    nn50 = np.sum(np.abs(rr_diff) > 0.05)
    pnn50 = nn50 / len(rr_diff) * 100
    sdann = np.std([np.std(rr_intervals[i:i + 5 * fs]) for i in range(0, len(rr_intervals), 5 * fs)])
    rsa_index = (np.max(rr_intervals) - np.min(rr_intervals)) / avnn if avnn != 0 else np.nan
    dbd = np.max(rr_intervals) - np.min(rr_intervals)
    ei_ratio = np.max(rr_intervals) / np.min(rr_intervals) if np.min(rr_intervals) != 0 else np.nan
    return {
        'SDNN': sdnn,
        'SDANN': sdann,
        'AVNN': avnn,
        'RMSSD': rmssd,
        'PNN50': pnn50,
        'RSA Index': rsa_index,
        'Deep Breathing Difference (DBD)': dbd,
        'Exhalation/Inspiration Ratio (EI)': ei_ratio
    }

# HRV Frequency-Domain Features
def hrv_frequency_domain(rr_intervals, fs=4):
    f, Pxx = welch(rr_intervals, fs, nperseg=256)
    vlf_band, lf_band, hf_band = (0.003, 0.04), (0.04, 0.15), (0.15, 0.4)
    vlf_power = trapezoid(Pxx[(f >= vlf_band[0]) & (f < vlf_band[1])])
    lf_power = trapezoid(Pxx[(f >= lf_band[0]) & (f < lf_band[1])])
    hf_power = trapezoid(Pxx[(f >= hf_band[0]) & (f < hf_band[1])])
    total_power = vlf_power + lf_power + hf_power
    return {
        'Total Power': total_power,
        'VLF Power': vlf_power,
        'LF Power': lf_power,
        'HF Power': hf_power,
        'LF/HF Ratio': lf_power / hf_power if hf_power != 0 else np.nan
    }

In [None]:
#---------------------------------------------- Functions to handle sensors' data ----------------------------------------------
# Function to handle Moodmetric notifications
def moodmetric_notification_handler(sender, data):
    global latest_eda, latest_mm_stress_score, eda_data, rr_interval_data, sampling_rate, start_time
    # print(len(rr_interval_data))
    end_time = time.time()
    elapsed_time = end_time - start_time
    timestamp = datetime.now().strftime("%H:%M:%S")
    if GATT_STREAM_CHAR_UUID in str(sender):
        mm_score = data[1]
        latest_mm_stress_score = mm_score  # This function should be defined or imported in the main code
        # print(f"Received MM Score (Stress Level): {mm_score} at {timestamp}")
    elif GATT_RAW_DATA_CHAR_UUID in str(sender):
        latest_eda = int.from_bytes(data[0:2], byteorder='big') / 1000
        eda_data.append(latest_eda)
        # print(f"Received raw EDA data: {latest_eda} at {timestamp}")   
    save_to_csv()  # This function should also be defined or imported in the main code
    len_eda_data = len(eda_data)
    len_segment = sampling_rate * SEGMENT_DURATION
    if (elapsed_time >= SEGMENT_DURATION) & (int(elapsed_time) % UPDATE_DURATION == 0) & (len(eda_data) > sampling_rate):
        predict_state(eda_data, rr_interval_data, sampling_rate)
        eda_data = []
        rr_interval_data = []
        

# Function to handle RHYTHM24 heart rate notifications
def rhythm24_notification_handler(sender, data):
    global latest_hr, latest_rr_interval, rr_interval_data
    timestamp = datetime.now().strftime("%H:%M:%S")
    if HR_MEASUREMENT_UUID in str(sender):
        flags = data[0]
        latest_hr = data[1]
        # print(f"Received Heart Rate data: {latest_hr} at {timestamp}")
        rr_intervals_present = flags & 0x10
        if rr_intervals_present:
            rr_count = (len(data) - 2) // 2
            rr_intervals = []
            for i in range(rr_count):
                latest_rr_interval = int.from_bytes(data[2 + i * 2:4 + i * 2], byteorder="little") / 1024
                # print(f"Received RR Interval data: {latest_rr_interval} at {timestamp}")
                rr_intervals.append(latest_rr_interval)
            rr_intervals = np.array(rr_intervals).flatten().tolist() 
            rr_interval_data.extend(rr_intervals)


In [None]:
#---------------------------------------------- Function to predict state ----------------------------------------------
def predict_state(eda_signal, rr_intervals, sampling_rate):
    global predicted_stress_state
    print("Starting to predict...")
    # starting_time = datetime.now() 
    cleaned_eda = preprocess_eda(eda_signal, fs=sampling_rate)
    
    eda_timestamps = np.arange(0, len(eda_signal) / sampling_rate, 1 / sampling_rate)
    org_rr_rate = len(rr_intervals) / eda_timestamps[-1]
    resampled_rr = resample(rr_intervals, eda_timestamps, org_rr_rate)
    
    eda_stats = statistical_features(cleaned_eda, "EDA")
    eda_nonlinear = nonlinear_features(cleaned_eda, "EDA")
    eda_other = eda_features(cleaned_eda, fs=sampling_rate)   
    
    rr_stats = statistical_features(resampled_rr, "HRV")
    rr_nonlinear = nonlinear_features(resampled_rr, "HRV")
    rr_hrv = {**hrv_time_domain(resampled_rr), **hrv_frequency_domain(resampled_rr, fs = sampling_rate)}
    
    # Convert combined features to DataFrame for model prediction
    combined_features = {**eda_stats, **eda_nonlinear, **eda_other, **rr_stats, **rr_nonlinear, **rr_hrv}
    combined_features_df = pd.DataFrame([combined_features])
    combined_features_df = scaler.transform(combined_features_df)

    # Predict using the model
    predicted_stress_label = normalized_model.predict(combined_features_df)[0]
    predicted_stress_state = label_mapping[predicted_stress_label]
    print("MM Score:", latest_mm_stress_score, "State:", predicted_stress_state, "label:", predicted_stress_label)
    # elapsed_time_ms = int((datetime.now() - starting_time).total_seconds() * 1000)
    # print("Elapsed time", elapsed_time_ms)


In [None]:
#-----------------------------------Main function-------------------------------
# Main function to run both RHYTHM24 and Moodmetric data collection concurrently if both are connected
async def main():
    global start_time
    rhythm24_device = await connect_to_device(RHYTHM24_NAME)
    moodmetric_device = await connect_to_device(MOODMETRIC_NAME)

    if rhythm24_device and moodmetric_device:
        print("Both RHYTHM24 and Moodmetric ring are connected. Starting data collection...")

        # Start TCP server in a separate thread
        # threading.Thread(target=send_stress_score_to_unity, daemon=True).start()
        
        start_time = time.time()
        async with BleakClient(rhythm24_device) as rhythm24_client, BleakClient(moodmetric_device) as moodmetric_client:
            await asyncio.gather(
                read_rhythm24_data(rhythm24_client),
                read_moodmetric_data(moodmetric_client)
            )

    else:
        print("Data collection aborted because both devices are not connected.")


if __name__ == "__main__":
    import threading
    asyncio.run(main())