In [11]:
import json
import random
import re
from datetime import datetime, timedelta
from llama_cpp import Llama

In [12]:
llm = Llama(
    model_path="models/mistral-7b-instruct-v0.2.Q4_K_M.gguf",
    n_ctx=1024,
    n_threads=6
)

llama_model_loader: loaded meta data with 24 key-value pairs and 291 tensors from models/mistral-7b-instruct-v0.2.Q4_K_M.gguf (version GGUF V3 (latest))
llama_model_loader: Dumping metadata keys/values. Note: KV overrides do not apply in this output.
llama_model_loader: - kv   0:                       general.architecture str              = llama
llama_model_loader: - kv   1:                               general.name str              = mistralai_mistral-7b-instruct-v0.2
llama_model_loader: - kv   2:                       llama.context_length u32              = 32768
llama_model_loader: - kv   3:                     llama.embedding_length u32              = 4096
llama_model_loader: - kv   4:                          llama.block_count u32              = 32
llama_model_loader: - kv   5:                  llama.feed_forward_length u32              = 14336
llama_model_loader: - kv   6:                 llama.rope.dimension_count u32              = 128
llama_model_loader: - kv   7:           

In [13]:
def build_prompt(log_type: str, time_str: str) -> str:
    """
    Builds prompt for the LLM based on given log type and time.
    """
    return (
        f"You are a diabetic patient using a chatbot to log health data.\n"
        f"It is {time_str}, and you want to log a {log_type} update.\n"
        f"Generate only your message in natural language."
    )

def generate_message(prompt: str, max_tokens: int = 100) -> str:
    """
    Runs prompt on LLM and returns response
    """
    response = llm(prompt, max_tokens=max_tokens)
    return response["choices"][0]["text"].strip()

def extract_glucose_mgdl(text: str) -> float:
    """
    Extracts glucose value from text
    Handles both mg/dL and mmol/L units, converting mmol/L to mg/dL.
    Also works without a unit, just extracting the number.
    """
    # Try to match mg/dL format first
    match_mgdl = re.search(r"([\d.]+)\s*mg\/dL", text)
    if match_mgdl:
        return round(float(match_mgdl.group(1)))
    
    # Try mmol/L format if mg/dL not found
    match_mmol = re.search(r"([\d.]+)\s*mmol\/[lL]", text)
    if match_mmol:
        mmol = float(match_mmol.group(1))
        return round(mmol * 18.0)
        
    # Finally try extracting a number if blood sugar/glucose is mentioned
    if re.search(r"blood sugar|glucose", text.lower()):
        match_num = re.search(r"([\d.]+)", text)
        if match_num:
            return round(float(match_num.group(1)))
        
    return None

def simulate_log(user_id: str, timestamp: datetime, log_type: str) -> dict:
    """
    Simulates a log entry for a user at a given timestamp.
    Returns a dictionary with the user ID, timestamp, log type, message, and glucose value.
    """
    time_str = timestamp.strftime("%H:%M")
    prompt = build_prompt(log_type, time_str)
    message = generate_message(prompt)

    return {
        "user_id": user_id,
        "timestamp": timestamp.isoformat(),
        "log_type": log_type,
        "message": message,
        "glucose_mgdl": extract_glucose_mgdl(message)
    }


In [14]:
"""
Defines log types and their weights (how often they are generated).
"""
LOG_TYPE_WEIGHTS = {
    # High frequency logs
    "glucose": 4,
    
    # Medium frequency logs 
    "diet": 2,
    "mood": 2, 
    "activity": 2,
    "insulin": 2,
    "medication": 2,
    
    # Low frequency logs
    "sleep": 1,
    "weight": 1,
    "notes": 1, 
    "other": 1
}

In [15]:
def generate_logs(user_id="sim_user_01", days=1, logs_per_day=3, output_path="synthetic_logs.json"):
    """
    Generates synthetic logs over a given number of days.
    Returns a list of dictionaries, each representing a log entry.
    Log types are weighted according to LOG_TYPE_WEIGHTS.
    """
    
    # Create weighted list of log types based on weights
    weighted_log_types = []
    for log_type, weight in LOG_TYPE_WEIGHTS.items():
        weighted_log_types.extend([log_type] * weight)
        
    logs = []
    base_date = datetime.now().replace(hour=6, minute=0, second=0, microsecond=0)

    for day in range(days):
        day_start = base_date + timedelta(days=day)
        for _ in range(logs_per_day):
            log_type = random.choice(weighted_log_types)
            time_offset = timedelta(minutes=random.randint(0, 840))  # 14 hours spread
            timestamp = day_start + time_offset
            log = simulate_log(user_id, timestamp, log_type)
            logs.append(log)

    with open(output_path, "w") as f:
        json.dump(logs, f, indent=2)

    print(f"Successfully generated {len(logs)} logs for {user_id}. Output saved to {output_path}")


In [None]:
def generate_persona_logs(days=30, output_dir="synthetic_logs"):
    """
    Generates synthetic logs for different user personas with distinct adherence patterns.
    Saves logs to separate JSON files for each persona.
    """
    # Create output directory if it doesn't exist
    os.makedirs(output_dir, exist_ok=True)
    
    # Define personas and their characteristics
    personas = {
        "alice": {
            "id": "sim_user_alice",
            "type": "consistent_adherent",
            "description": "Consistent and adherent user with regular logging patterns"
        },
        "bob": {
            "id": "sim_user_bob", 
            "type": "inconsistent_adherent",
            "description": "Inconsistent but adherent user with variable logging times"
        },
        "charlie": {
            "id": "sim_user_charlie",
            "type": "non_adherent_deceptive",
            "description": "Non-adherent user showing suspicious consistency over time"
        },
        "dan": {
            "id": "sim_user_dan",
            "type": "non_adherent_deteriorating", 
            "description": "Initially adherent user showing deteriorating compliance"
        }
    }

    for name, persona in personas.items():
        logs = []
        base_date = datetime.now().replace(hour=6, minute=0, second=0, microsecond=0)
        
        for day in range(days):
            day_start = base_date + timedelta(days=day)
            
            # Adjust parameters based on persona type
            if persona["type"] == "consistent_adherent":
                # Regular intervals with small time variation
                log_times = [
                    day_start + timedelta(hours=h, minutes=random.randint(-30, 30))
                    for h in [2, 8, 14]  # Morning, afternoon, evening
                ]
                
            elif persona["type"] == "inconsistent_adherent":
                # Random times within wider window
                log_times = [
                    day_start + timedelta(minutes=random.randint(0, 840)) # 14 hours spread
                    for _ in range(persona["logs_per_day"])
                ]
                
            elif persona["type"] == "non_adherent_deceptive":
                # Gradually decrease time variation to show suspicious consistency
                variation_minutes = max(5, 60 - (day * 2))  # Starts at 1 hour variation, decreases to 5 min
                base_times = [2, 8, 14]  # Target times
                log_times = [
                    day_start + timedelta(
                        hours=h,
                        minutes=random.randint(-variation_minutes, variation_minutes)
                    )
                    for h in base_times
                ]
                
            elif persona["type"] == "non_adherent_deteriorating":
                # Gradually reduce logging frequency
                current_logs = max(
                    1,
                    persona["logs_per_day"] - int(day / 10)
                )
                log_times = [
                    day_start + timedelta(minutes=random.randint(0, 840))
                    for _ in range(current_logs)
                ]
            
            # Generate logs for each time point
            for log_time in sorted(log_times):
                log_type = random.choice(weighted_log_types)
                log = simulate_log(persona["id"], log_time, log_type)
                
                # Add metadata about adherence pattern
                log["adherence_pattern"] = {
                    "persona_type": persona["type"],
                    "description": persona["description"],
                    "day_in_sequence": day
                }
                
                logs.append(log)
        
        # Save logs for this persona
        output_file = os.path.join(output_dir, f"{name}_logs.json")
        with open(output_file, "w") as f:
            json.dump(logs, f, indent=2)
            
        print(f"Generated {len(logs)} logs for {name} ({persona['type']})")

In [None]:
# Generate multiple personas' logs for analysis
def generate_multiple_personas():
    # Generate logs for different personas to analyze patterns
    personas = [
        "adherent_stable",
        "adherent_improving", 
        "non_adherent_stable",
        "non_adherent_deteriorating"
    ]
    
    for persona in personas:
        # Generate 30 days of data for each persona type
        generate_persona_logs(
            name=f"sim_user_{persona}",
            persona_type=persona,
            days=30,
            logs_per_day=4
        )

# Run the multi-persona generation
generate_multiple_personas()