In [1]:
import nest_asyncio
nest_asyncio.apply()

import asyncio
from bleak import BleakClient

address = "0D0F7D44-48A6-71E5-4523-A5427505D4C1"  # Replace with your WT901BLE's address

async def show_characteristics():
    async with BleakClient(address) as client:
        await client.get_services()  # still needed to populate `client.services`
        for service in client.services:
            print(f"[Service] {service.uuid}")
            for char in service.characteristics:
                print(f"  [Characteristic] {char.uuid} — {char.properties}")

await show_characteristics()


[Service] 0000ffe5-0000-1000-8000-00805f9a34fb
  [Characteristic] 0000ffe9-0000-1000-8000-00805f9a34fb — ['write-without-response', 'write']
  [Characteristic] 0000ffe4-0000-1000-8000-00805f9a34fb — ['notify']


  await client.get_services()  # still needed to populate `client.services`


In [2]:
from bleak import BleakScanner
import asyncio

async def scan():
    devices = await BleakScanner.discover()
    for d in devices:
        print(f"{d.name} — {d.address}")

asyncio.run(scan())


None — 67E02B93-F5AA-D846-D726-5C7110D02261
[TV]Astro-Sam — E2E5B321-7F0C-037C-6F32-DFAF1E0A49B7
None — D75101F4-5274-8FAC-8A2E-46A8268AC41D
None — 1079159B-85D1-F711-87C2-78CB31430783
None — 465A607B-0F00-4D74-5333-10C87E5B7628
None — F179B175-CD27-C123-845D-B01AA3E1E60F
None — D25A5DFD-0133-FFB5-CDF2-7AF00EFF7F2B
None — 2ED9AA08-3951-21FF-0A81-4275AFFA9351
None — 9DE06D07-2D1A-152B-3928-FED1E1F73EEC
None — 0E66903C-6A99-092C-16B6-BB6D91BF0D0D
WT901BLE68 — 0CE86DC8-97A4-3492-9C5B-D06CE921BC65
None — 05B1E168-D4B1-1E0E-D7A5-F13AFA926202
None — 75141F11-8655-FD70-50C4-9D24469D8525
None — C7D19385-5BB1-9B80-1F21-911D6985F460
None — 2C9719A4-4D72-0B6F-E894-E2B69D9851E2
None — C5D23717-B46A-6DF0-D2F2-893C485ED3CF
None — FB9937BB-D01D-0CD7-7514-689495870DDC
None — CC37ADA4-FA5F-97BD-F33C-2DFCD98F3CD2
None — CA8044EE-FBFE-1887-3FD5-BF78A4318123
None — A9FE46EE-B657-84F6-E3EF-82BFAEA94A6A
Pokemon GO Plus + — E0E02C0A-5928-BF06-3522-D5C40EF2BA10
None — E2FBC420-2248-6C8F-595F-C66B3EADAF78
None

## Plotting


In [3]:
%matplotlib qt

In [None]:
import asyncio
from bleak import BleakClient
from datetime import datetime
import pandas as pd
import os
import math
import matplotlib.pyplot as plt
from collections import deque

# BLE UUIDs
NOTIFY_UUID = "0000ffe4-0000-1000-8000-00805f9a34fb"
WRITE_UUID = "0000ffe9-0000-1000-8000-00805f9a34fb"

# Addresses (replace with your actual device IDs)
NECK_ADDRESS = "0CE86DC8-97A4-3492-9C5B-D06CE921BC65"
PELVIS_ADDRESS = "0D0F7D44-48A6-71E5-4523-A5427505D4C1"


sample = 2000
mag_calibragtion = False

# Real-time plotting
plt.ion()
lean_history = deque(maxlen=sample)
time_history = deque(maxlen=sample)
fig, plot_ax = plt.subplots()
line, = plot_ax.plot([], [], lw=2)
plot_ax.set_ylim(-120, 180)
plot_ax.set_xlim(0, sample)
plot_ax.set_xlabel("Time (frames)")
plot_ax.set_ylabel("Lean Angle (°)")
plot_ax.set_title("Real-Time Forward Lean (Neck - Pelvis)")
plt.show(block=False)

# Data storage
raw_neck = []
raw_pelvis = []
data_neck = []
data_pelvis = []
angle_diff_data = []
latest_pitch = {"neck": None, "pelvis": None}

# Universal decoder
def decode_packet(data, label):
    if data[0] != 0x55:
        return
    flag = data[1]
    if flag not in [0x61, 0x71]:
        return

    decoded = [int.from_bytes(data[i:i+2], byteorder='little', signed=True)
               for i in range(2, len(data), 2)]
    if len(decoded) < 9:
        return

    acc_x = decoded[0] / 32768.0 * 16 * 9.8
    acc_y = decoded[1] / 32768.0 * 16 * 9.8
    acc_z = decoded[2] / 32768.0 * 16 * 9.8
    wx = decoded[3] / 32768.0 * 2000
    wy = decoded[4] / 32768.0 * 2000
    wz = decoded[5] / 32768.0 * 2000
    roll = decoded[6] / 32768.0 * 180
    pitch = decoded[7] / 32768.0 * 180
    yaw = decoded[8] / 32768.0 * 180

    tilt_rad = math.acos(acc_y / math.sqrt(acc_x**2 + acc_y**2 + acc_z**2))
    tilt_deg = math.degrees(tilt_rad)

    timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S.%f")[:-3]
    row = {
        "timestamp": timestamp,
        "ax (m/s^2)": acc_x,
        "ay (m/s^2)": acc_y,
        "az (m/s^2)": acc_z,
        "wx (deg/s)": wx,
        "wy (deg/s)": wy,
        "wz (deg/s)": wz,
        "roll (deg)": roll,
        "pitch (deg)": pitch,
        "yaw (deg)": yaw,
        "tilt angle (deg)": tilt_deg
    }

    latest_pitch[label] = pitch

    if label == "neck":
        raw_neck.append(data.hex())
        data_neck.append(row)
    else:
        raw_pelvis.append(data.hex())
        data_pelvis.append(row)

    if latest_pitch["neck"] is not None and latest_pitch["pelvis"] is not None:
        angle_diff = abs(latest_pitch["neck"] - latest_pitch["pelvis"])
        angle_diff_data.append({"timestamp": timestamp, "lean angle (neck - pelvis)": angle_diff})
        print(f"🧍 timestamp: {timestamp}, Forward Lean Angle (Neck - Pelvis Pitch): {angle_diff:.2f}°")

        # === Plot update ===
        lean_history.append(angle_diff)
        time_history.append(len(time_history))
        line.set_data(time_history, lean_history)
        plot_ax.set_xlim(max(0, len(time_history)-2000), len(time_history))
        plot_ax.figure.canvas.draw()
        plot_ax.figure.canvas.flush_events()

# Notification handlers
def handler_neck(sender, data):
    decode_packet(data, "neck")

def handler_pelvis(sender, data):
    decode_packet(data, "pelvis")

# BLE Connection Task
async def run_dual_ble():
    async with BleakClient(NECK_ADDRESS) as neck, BleakClient(PELVIS_ADDRESS) as pelvis:
        print("✅ Connected to NECK sensor")
        print("✅ Connected to PELVIS sensor")
        
        if mag_calibragtion:
            async def perform_magnetic_calibration(client, label):
                print(f"🧭 Starting magnetic calibration for {label.upper()} in 5 seconds...")
                await asyncio.sleep(5)
                await client.write_gatt_char(WRITE_UUID, b'\xFF\xAA\x01\x07\x04') # 50 Hz
                print(f"🔄 Rotate {label.upper()} sensor around all 3 axes...")
                await asyncio.sleep(10)
                await client.write_gatt_char(WRITE_UUID, b'\xFF\xAA\x01\x00\x04') # 50 Hz
                print(f"✅ Magnetic calibration complete for {label.upper()}.")

            await perform_magnetic_calibration(neck, "neck")
            await perform_magnetic_calibration(pelvis, "pelvis")
            
        print("••Place the Sensor")
        await asyncio.sleep(10)
        await neck.start_notify(NOTIFY_UUID, handler_neck)
        await pelvis.start_notify(NOTIFY_UUID, handler_pelvis)
        print("📡 Receiving data from both sensors...")
        await asyncio.sleep(30)
        await neck.stop_notify(NOTIFY_UUID)
        await pelvis.stop_notify(NOTIFY_UUID)
        print("📴 Stopped both sensors.")

# Run the program
asyncio.run(run_dual_ble())





In [None]:

# Save results
output_dir = "/Users/moon/Documents/Posture/Bluetooth_dual_data"
os.makedirs(output_dir, exist_ok=True)

pd.DataFrame(data_neck).to_csv(os.path.join(output_dir, "neck_data.csv"), index=False)
pd.DataFrame(data_pelvis).to_csv(os.path.join(output_dir, "pelvis_data.csv"), index=False)

pd.DataFrame({"raw": raw_neck}).to_csv(os.path.join(output_dir, "neck_raw.csv"), index=False)
pd.DataFrame({"raw": raw_pelvis}).to_csv(os.path.join(output_dir, "pelvis_raw.csv"), index=False)

pd.DataFrame(angle_diff_data).to_csv(os.path.join(output_dir, "lean_angle_diff.csv"), index=False)

print("✅ CSV files saved for both sensors.")

## Data Anlaysis


In [None]:
import pandas as pd
import os

# === Load Data ===
output_dir = "/Users/moon/Documents/Posture/Bluetooth_dual_data"
neck_df = pd.read_csv(os.path.join(output_dir, "neck_data.csv"))
pelvis_df = pd.read_csv(os.path.join(output_dir, "pelvis_data.csv"))

# Convert timestamp to datetime
neck_df['timestamp'] = pd.to_datetime(neck_df['timestamp'])
pelvis_df['timestamp'] = pd.to_datetime(pelvis_df['timestamp'])

# Sort and rename for merging
neck_pitch = neck_df[['timestamp', 'pitch (deg)']].rename(columns={'pitch (deg)': 'neck_pitch'}).sort_values('timestamp')
pelvis_pitch = pelvis_df[['timestamp', 'pitch (deg)']].rename(columns={'pitch (deg)': 'pelvis_pitch'}).sort_values('timestamp')

# Merge with tolerance of 50ms
angles_df = pd.merge_asof(
    neck_pitch, pelvis_pitch,
    on='timestamp', direction='nearest',
    tolerance=pd.Timedelta("50ms")
).dropna(subset=['pelvis_pitch'])

# Compute lean angle and seconds
angles_df['lean_angle'] = angles_df['neck_pitch'] - angles_df['pelvis_pitch']
angles_df['seconds'] = (angles_df['timestamp'] - angles_df['timestamp'].min()).dt.total_seconds()
angles_df['is_slouched'] = angles_df['lean_angle'].abs() > 10

# Divide into 4 quarters
total_time = angles_df['seconds'].max()
quarter_length = total_time / 4
angles_df['quarter'] = pd.cut(
    angles_df['seconds'],
    bins=[-1, quarter_length, 2*quarter_length, 3*quarter_length, total_time + 1],
    labels=['Q1', 'Q2', 'Q3', 'Q4']
)

# === Helper: Microbreaks Count Function ===
def count_microbreaks(series, rate_hz=1):
    slouch_duration = 0
    breaks = 0
    for slouched in series:
        if slouched:
            slouch_duration += 1
        else:
            if slouch_duration >= 60 * rate_hz:
                breaks += 1
            slouch_duration = 0
    if slouch_duration >= 60 * rate_hz:
        breaks += 1
    return breaks

# === Summary Calculations ===
summary = angles_df.groupby('quarter').agg(
    slouched_pct=('is_slouched', lambda x: 100 * x.sum() / len(x)),
    peak_flexion=('lean_angle', lambda x: x.abs().max()),
    avg_deviation=('lean_angle', lambda x: x.abs().mean())
).reset_index()

summary['suggested_breaks'] = angles_df.groupby('quarter')['is_slouched'].apply(count_microbreaks).values

# === Score Scaling Function ===
def scale_score(value, ideal_max, worst_max):
    if value <= ideal_max:
        return 100
    elif value >= worst_max:
        return 0
    else:
        return 100 * (worst_max - value) / (worst_max - ideal_max)

# === Compute Scores ===
scores = []
for _, row in summary.iterrows():
    score_slouch = scale_score(row['slouched_pct'], 10, 50)
    score_peak = scale_score(row['peak_flexion'], 20, 60)
    score_dev = scale_score(row['avg_deviation'], 10, 40)
    score_break = min(100, 100 * (row['suggested_breaks'] / 2))  # 2 breaks expected per quarter
    total_score = (
        0.4 * score_slouch +
        0.2 * score_peak +
        0.2 * score_dev +
        0.2 * score_break
    )
    scores.append(total_score)

summary['quarter_score'] = scores

# === Add Total Row ===
overall = summary[['slouched_pct', 'peak_flexion', 'avg_deviation', 'suggested_breaks']].mean()
score_slouch = scale_score(overall['slouched_pct'], 10, 50)
score_peak = scale_score(overall['peak_flexion'], 20, 60)
score_dev = scale_score(overall['avg_deviation'], 10, 40)
score_break = min(100, 100 * (overall['suggested_breaks'] / 2))
total_score = (
    0.4 * score_slouch +
    0.2 * score_peak +
    0.2 * score_dev +
    0.2 * score_break
)

summary.loc[len(summary)] = ['Total', *overall.values, total_score]

# === Save Summary ===
summary.to_csv(os.path.join(output_dir, "todays_ergonomic_summary.csv"), index=False)

summary


## **\*\*\***


## Old


In [None]:
import asyncio
from bleak import BleakClient
from datetime import datetime
import pandas as pd
import os
import math


# BLE UUIDs
NOTIFY_UUID = "0000ffe4-0000-1000-8000-00805f9a34fb"
WRITE_UUID = "0000ffe9-0000-1000-8000-00805f9a34fb"

# Addresses (replace with your actual device IDs)
NECK_ADDRESS = "0CE86DC8-97A4-3492-9C5B-D06CE921BC65" # 
PELVIS_ADDRESS = "0D0F7D44-48A6-71E5-4523-A5427505D4C1" 



# Universal decoder
def decode_packet(data, label):
    if data[0] != 0x55:
        return
    flag = data[1]
    if flag not in [0x61, 0x71]:
        return

    decoded = [int.from_bytes(data[i:i+2], byteorder='little', signed=True)
               for i in range(2, len(data), 2)]
    if len(decoded) < 9:
        return

    ax = decoded[0] / 32768.0 * 16 * 9.8
    ay = decoded[1] / 32768.0 * 16 * 9.8
    az = decoded[2] / 32768.0 * 16 * 9.8
    wx = decoded[3] / 32768.0 * 2000
    wy = decoded[4] / 32768.0 * 2000
    wz = decoded[5] / 32768.0 * 2000
    roll = decoded[6] / 32768.0 * 180
    pitch = decoded[7] / 32768.0 * 180
    yaw = decoded[8] / 32768.0 * 180

    tilt_rad = math.acos(ay / math.sqrt(ax**2 + ay**2 + az**2))
    tilt_deg = math.degrees(tilt_rad)

    timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S.%f")[:-3]
    row = {
        "timestamp": timestamp,
        "ax (m/s^2)": ax,
        "ay (m/s^2)": ay,
        "az (m/s^2)": az,
        "wx (deg/s)": wx,
        "wy (deg/s)": wy,
        "wz (deg/s)": wz,
        "roll (deg)": roll,
        "pitch (deg)": pitch,
        "yaw (deg)": yaw,
        "tilt angle (deg)": tilt_deg
    }

    latest_pitch[label] = pitch

    if label == "neck":
        raw_neck.append(data.hex())
        data_neck.append(row)
    else:
        raw_pelvis.append(data.hex())
        data_pelvis.append(row)

    #print(f"[{label.upper()}] --> Tilt angle: {tilt_deg:.2f}°")
    #print(f"[{timestamp}] {label.upper()} RPY=({roll:.1f}, {pitch:.1f}, {yaw:.1f}) | Acc=({ax:.1f}, {ay:.1f}, {az:.1f}) | Gyro=({wx:.1f}, {wy:.1f}, {wz:.1f})")

    if latest_pitch["neck"] is not None and latest_pitch["pelvis"] is not None:
        angle_diff = latest_pitch["neck"] - latest_pitch["pelvis"]
        angle_diff_data.append({"timestamp": timestamp, "lean angle (neck - pelvis)": angle_diff})
        print(f"🧍 Forward Lean Angle (Neck - Pelvis Pitch): {angle_diff:.2f}°")


# Notification handlers
def handler_neck(sender, data):
    decode_packet(data, "neck")

def handler_pelvis(sender, data):
    decode_packet(data, "pelvis")

# Data storage
raw_neck = []
raw_pelvis = []
data_neck = []
data_pelvis = []
angle_diff_data = []
latest_pitch = {"neck": None, "pelvis": None}

# BLE Connection Task
async def run_dual_ble(recording_time=20):
    async with BleakClient(NECK_ADDRESS) as neck, BleakClient(PELVIS_ADDRESS) as pelvis:
        print("✅ Connected to NECK sensor")
        print("✅ Connected to PELVIS sensor")

        async def perform_magnetic_calibration(client, label):
            print(f"🧭 Starting magnetic calibration for {label.upper()} in 5 seconds...")
            await asyncio.sleep(5)
            await client.write_gatt_char(WRITE_UUID, b'\xFF\xAA\x01\x07\x00')
            print(f"🔄 Rotate {label.upper()} sensor around all 3 axes...")
            await asyncio.sleep(15)
            await client.write_gatt_char(WRITE_UUID, b'\xFF\xAA\x01\x00\x00')
            print(f"✅ Magnetic calibration complete for {label.upper()}.\n")
        await perform_magnetic_calibration(neck, "neck")
        await perform_magnetic_calibration(pelvis, "pelvis")

        print("••Place the Sensor")
        await asyncio.sleep(10)
        await neck.start_notify(NOTIFY_UUID, handler_neck)
        await pelvis.start_notify(NOTIFY_UUID, handler_pelvis)
        print("📡 Receiving data from both sensors...")   
        await asyncio.sleep(recording_time)
        await neck.stop_notify(NOTIFY_UUID)
        await pelvis.stop_notify(NOTIFY_UUID)
        print("📴 Stopped both sensors.")

# Run the program
asyncio.run(run_dual_ble())

# Save results
output_dir = "/Users/moon/Documents/Posture/Bluetooth_dual_data"
os.makedirs(output_dir, exist_ok=True)

pd.DataFrame(data_neck).to_csv(os.path.join(output_dir, "neck_data.csv"), index=False)
pd.DataFrame(data_pelvis).to_csv(os.path.join(output_dir, "pelvis_data.csv"), index=False)

pd.DataFrame({"raw": raw_neck}).to_csv(os.path.join(output_dir, "neck_raw.csv"), index=False)
pd.DataFrame({"raw": raw_pelvis}).to_csv(os.path.join(output_dir, "pelvis_raw.csv"), index=False)

pd.DataFrame(angle_diff_data).to_csv(os.path.join(output_dir, "lean_angle_diff.csv"), index=False)

print("✅ CSV files saved for both sensors.")