In [1]:
import os
import pandas as pd
import requests
import datetime
from dateutil import parser
import time

In [2]:
API_TOKEN = os.getenv('UH_API_TOKEN')
SHARING_CODE = os.getenv('UH_PARTNER_SHARING_CODE')

In [3]:
PARTICIPANT_EMAILS = [
    "magdalena.h.fuchs@gmail.com",
    "victoria-brugger@hotmail.com",
    "gtrskh7yjz@privaterelay.appleid.com"
]

In [None]:
import datetime
import requests

BASE_URL = "https://partner.ultrahuman.com/api/v1/metrics"


def fetch_metrics(email, _date_str_ignored):
    # Always fetch yesterday
    yesterday = datetime.date.today() - datetime.timedelta(days=1)
    date_str = yesterday.strftime("%d/%m/%Y")   # UH expects DD/MM/YYYY

    params = {"email": email, "date": date_str}
    headers = {"Authorization": API_TOKEN}

    resp = requests.get(BASE_URL, params=params, headers=headers, timeout=20)
    resp.raise_for_status()
    return resp.json()


def has_real_data(metric_entry):
    if not isinstance(metric_entry, dict):
        return False

    obj = metric_entry.get("object")
    if not isinstance(obj, dict):
        return False

    if obj.get("value") not in (None, "", []):
        return True

    values = obj.get("values")
    if isinstance(values, list) and len(values) > 0:
        return True

    return False


def filter_non_empty(metric_data):
    if not isinstance(metric_data, list):
        return []

    out = []
    for m in metric_data:
        if not isinstance(m, dict):
            continue
        if m.get("type") == "vo2_max":
            continue
        if has_real_data(m):
            out.append(m)
    return out


def extract_metric_data(api_response):
    if not isinstance(api_response, dict):
        return []
    data = api_response.get("data")
    if not isinstance(data, dict):
        return []
    metric_data = data.get("metric_data")
    if not isinstance(metric_data, list):
        return []
    return metric_data


def within_last_25h(metric_entry, cutoff_ts):
    """Keep only metrics whose object timestamps fall within the last 25 hours."""
    if not isinstance(metric_entry, dict):
        return False

    obj = metric_entry.get("object", {})
    if not isinstance(obj, dict):
        return False

    ts = obj.get("day_start_timestamp")
    if not isinstance(ts, (int, float)):
        return False

    # Ultrahuman timestamps appear to be in seconds
    return ts >= cutoff_ts


def fetch_last_day_all():
    yesterday = datetime.date.today() - datetime.timedelta(days=1)
    date_str = yesterday.strftime("%d/%m/%Y")

    result = {}

    for email in PARTICIPANT_EMAILS:
        records = []
        any_data = False

        try:
            resp = fetch_metrics(email, date_str)
            raw = extract_metric_data(resp)

            # Only apply your metric-level filtering (non-empty + skip vo2_max)
            non_empty = filter_non_empty(raw)

            if non_empty:
                any_data = True

            records.append({
                "date": date_str,
                "metric_data": non_empty
            })

        except Exception as e:
            records.append({
                "date": date_str,
                "error": str(e),
                "metric_data": []
            })

        result[email] = {"records": records, "has_data": any_data}

    return result



In [5]:
res = fetch_last_day_all()
res

{'magdalena.h.fuchs@gmail.com': {'records': [{'date': '01/12/2025',
    'metric_data': []}],
  'has_data': False},
 'victoria-brugger@hotmail.com': {'records': [{'date': '01/12/2025',
    'metric_data': []}],
  'has_data': False},
 'gtrskh7yjz@privaterelay.appleid.com': {'records': [{'date': '01/12/2025',
    'metric_data': [{'type': 'hr',
      'object': {'day_start_timestamp': 1764543600,
       'title': 'Heart Rate',
       'values': [{'value': 124, 'timestamp': 1764577699},
        {'value': 112, 'timestamp': 1764577999}],
       'last_reading': 112,
       'unit': 'BPM'}},
     {'type': 'temp',
      'object': {'day_start_timestamp': 1764543600,
       'title': 'Skin Temperature',
       'values': [{'value': 29.91410064697266, 'timestamp': 1764577844},
        {'value': 32.45837020874023, 'timestamp': 1764578144}],
       'last_reading': 32,
       'unit': '°C'}},
     {'type': 'steps',
      'object': {'day_start_timestamp': 1764543600,
       'values': [{'value': 0.0, 'timestamp

In [9]:
import csv
import json
import io
import os


def slugify_email(email: str) -> str:
    return (
        email.replace("@", "_at_")
        .replace("+", "_plus_")
        .replace("/", "_")
        .replace("\\", "_")
    )


def expand_metric_to_rows(email: str, date_str: str, metric: dict):
    """Convert one metric object into 0..N CSV rows."""
    rows = []
    metric_type = metric.get("type", "")
    obj = metric.get("object", {}) or {}

    # Skip vo2_max globally
    if metric_type == "vo2_max":
        return rows

    # Case 1: Time-series data (most important)
    values = obj.get("values")
    if isinstance(values, list) and len(values) > 0:
        for item in values:
            if not isinstance(item, dict):
                continue
            ts = item.get("timestamp")
            val = item.get("value")

            rows.append(
                {
                    "email": email,
                    "date": date_str,
                    "metric_type": metric_type,
                    "timestamp": ts,
                    "value": val,
                }
            )
        return rows

    # Case 2: Single summary value
    if obj.get("value") not in (None, "", []):
        rows.append(
            {
                "email": email,
                "date": date_str,
                "metric_type": metric_type,
                "timestamp": obj.get("day_start_timestamp", ""),
                "value": obj.get("value"),
            }
        )
        return rows

    # Case 3: No datapoints → no rows
    return rows


def metrics_to_rows_longformat(email: str, date_str: str, metrics: list[dict]):
    """Flatten all metric objects into a long-format CSV-ready list."""
    all_rows = []
    for metric in metrics:
        if not isinstance(metric, dict):
            continue
        expanded = expand_metric_to_rows(email, date_str, metric)
        all_rows.extend(expanded)
    return all_rows


def save_results_as_csv(results, output_dir="./uh_output"):
    os.makedirs(output_dir, exist_ok=True)

    for email, content in results.items():
        email_slug = slugify_email(email)

        for entry in content["records"]:
            date_str = entry["date"]  # DD/MM/YYYY

            # Convert DD/MM/YYYY to usable components
            d, m, y = date_str.split("/")  # day, month, year

            # Folder name: YYYY_MM
            month_folder = f"{y}_{m}"
            folder_path = os.path.join(output_dir, month_folder)
            os.makedirs(folder_path, exist_ok=True)

            # Filename-safe version of the date: YYYY-MM-DD
            safe_date = f"{y}-{m}-{d}"

            metrics = entry.get("metric_data", [])
            rows = metrics_to_rows_longformat(email, date_str, metrics)

            filename = f"{safe_date}_{email_slug}.csv"
            filepath = os.path.join(folder_path, filename)

            with open(filepath, "w", newline="", encoding="utf-8") as f:
                writer = csv.DictWriter(
                    f,
                    fieldnames=["email", "date", "metric_type", "timestamp", "value"],
                )
                writer.writeheader()
                for r in rows:
                    writer.writerow(r)


In [10]:
results = fetch_last_day_all()
results

{'magdalena.h.fuchs@gmail.com': {'records': [{'date': '01/12/2025',
    'metric_data': []}],
  'has_data': False},
 'victoria-brugger@hotmail.com': {'records': [{'date': '01/12/2025',
    'metric_data': []}],
  'has_data': False},
 'gtrskh7yjz@privaterelay.appleid.com': {'records': [{'date': '01/12/2025',
    'metric_data': [{'type': 'hr',
      'object': {'day_start_timestamp': 1764543600,
       'title': 'Heart Rate',
       'values': [{'value': 124, 'timestamp': 1764577699},
        {'value': 112, 'timestamp': 1764577999}],
       'last_reading': 112,
       'unit': 'BPM'}},
     {'type': 'temp',
      'object': {'day_start_timestamp': 1764543600,
       'title': 'Skin Temperature',
       'values': [{'value': 29.91410064697266, 'timestamp': 1764577844},
        {'value': 32.45837020874023, 'timestamp': 1764578144}],
       'last_reading': 32,
       'unit': '°C'}},
     {'type': 'steps',
      'object': {'day_start_timestamp': 1764543600,
       'values': [{'value': 0.0, 'timestamp

In [11]:
save_results_as_csv(results, output_dir="./uh_output")