## Energy-Based Reward Function Demo

This demo shows how we can use JouleTrace to compare two algorithms solving the same problem, and assign rewards based on their energy efficiency and execution time.

**Scenario**: Computing Fibonacci numbers
- **Fast candidate**: Iterative O(n) algorithm
- **Slow candidate**: Naive recursive algorithm (exponential complexity)

Both produce correct results, but we'll see how the reward function prefers the more efficient implementation.

In [1]:
import os
import time
import math
import requests
from pprint import pprint

# JouleTrace API configuration
JOULETRACE_BASE_URL = os.getenv("JOULETRACE_BASE_URL", "http://127.0.0.1:8000")
MEASURE_URL = f"{JOULETRACE_BASE_URL}/api/v1/measure"

def poll_from_poll_url(poll_url: str, timeout_s: int = 600, interval_s: float = 0.5):
    """Poll a JouleTrace task until completion."""
    url = poll_url if poll_url.startswith("http") else f"{JOULETRACE_BASE_URL}{poll_url}"
    start = time.time()
    last_status = None
    
    while True:
        r = requests.get(url, timeout=30)
        r.raise_for_status()
        data = r.json()
        status = data.get("status")
        
        if status != last_status:
            print(f"Status: {status}")
            last_status = status
            
        if status in {"completed", "failed"}:
            return data
            
        if time.time() - start > timeout_s:
            raise TimeoutError(f"Polling timed out after {timeout_s}s")
            
        time.sleep(interval_s)

def queue_job(candidate_code, function_name, tests, timeout_seconds=15,
              memory_limit_mb=2048, trials=3, warmup=1):
    payload = {
        "candidate_code": candidate_code,
        "function_name": function_name,
        "test_cases": tests,
        "timeout_seconds": timeout_seconds,
        "memory_limit_mb": memory_limit_mb,
        "energy_measurement_trials": trials,
        "warmup_trials": warmup,
    }
    return requests.post(MEASURE_URL, json=payload, timeout=60).json()
    r.raise_for_status()
    return r.json()

print("Setup complete")

Setup complete


## Define Candidate Algorithms

We'll compare two implementations:
1. **Fast (Iterative)**: O(n) time complexity, minimal memory usage
2. **Slow (Naive Recursive)**: O(2^n) time complexity, significant call stack overhead

Both are functionally correct, but dramatically different in efficiency.

In [2]:
# Fast candidate: Iterative approach (O(n))
candidate_fast = """\
def solve(n):
    a, b = 0, 1
    for _ in range(n):
        a, b = b, a + b
    return a
"""

# Slow candidate: Naive recursion (O(2^n))
candidate_slow = """\
def solve(n):
    if n < 2:
        return n
    return solve(n-1) + solve(n-2)
"""

print("Fast candidate (iterative):")
print(candidate_fast)
print("\nSlow candidate (recursive):")
print(candidate_slow)

Fast candidate (iterative):
def solve(n):
    a, b = 0, 1
    for _ in range(n):
        a, b = b, a + b
    return a


Slow candidate (recursive):
def solve(n):
    if n < 2:
        return n
    return solve(n-1) + solve(n-2)



## Generate Test Cases

We'll use modest Fibonacci indices (22, 24) so that even the slow recursive version completes within a reasonable timeout.

In [3]:
# Local reference implementation to generate expected outputs
def fib_ref(n: int) -> int:
    a, b = 0, 1
    for _ in range(n):
        a, b = b, a + b
    return a

# Generate test cases
test_inputs = [22, 24]
tests = [
    {
        "test_id": f"fib-{n}",
        "inputs": [n],
        "expected_output": fib_ref(n)
    }
    for n in test_inputs
]

print("Test cases:")
pprint(tests)

Test cases:
[{'expected_output': 17711, 'inputs': [22], 'test_id': 'fib-22'},
 {'expected_output': 46368, 'inputs': [24], 'test_id': 'fib-24'}]


# Energy Reward Function

This reward function produces a score in **[0,1]**, where higher is better. It integrates **correctness**, **energy efficiency**, and **execution speed** in a balanced way.

## Key Principles

1. **Correctness Gate**  
   - If the result is not completed or fails validation, reward = **0.0**.  
   - Only correct results are eligible for scoring.

2. **Energy and Time Scaling**  
   - Define ratios relative to reference baselines:
     - $r_E = \frac{E}{E_{\text{ref}}}$  
     - $r_T = \frac{T}{T_{\text{ref}}}$
   - Transform with a bounded monotone function:
   
   $$s(r; k) = \frac{1}{1 + r^k}$$
   
   where $k$ controls steepness.
     - $r = 1 \Rightarrow s = 0.5$  
     - $r \ll 1 \Rightarrow s \to 1$ (very efficient)  
     - $r \gg 1 \Rightarrow s \to 0$ (very inefficient)  

3. **Blending ($F_\beta$-style harmonic mean)**  
   - Combine energy score $S_E$ and time score $S_T$ as:
   
   $$R = \frac{(1+\beta^2) \cdot (S_E \cdot S_T)}{\beta^2 \cdot S_E + S_T}$$
   
   - $\beta > 1$: emphasizes **energy** efficiency.  
   - $\beta < 1$: emphasizes **time** efficiency.  
   - Ensures the weaker dimension dominates the overall score.

4. **Hard Cap Safeguard**  
   - If energy or time exceeds a multiple of the reference (e.g., 50×), reward collapses to **0.0**.  
   - Prevents absurdly bad results from skewing comparisons.

## Properties

- **Range:** Strictly bounded to [0,1].  
- **Smoothness:** Diminishing returns when already efficient.  
- **Fairness:** Penalizes whichever metric (energy or time) is worse.  
- **Robustness:** NaN- and outlier-safe.

---

**Key fixes:**
- Added blank lines before and after `$$...$$` display math blocks
- Changed `E_{ref}` and `T_{ref}` to `E_{\text{ref}}` and `T_{\text{ref}}` for proper subscript formatting
- Ensured no text immediately follows the closing `$$`

In [4]:
def energy_reward(result_json: dict,
                     ref_energy_j: float = 1.0,
                     ref_time_s: float = 0.01,
                     beta_energy: float = 1.5,   # >1 emphasizes energy over time; <1 does the opposite
                     k_energy: float = 1.2,      # steepness for energy penalty
                     k_time: float = 1.0,        # steepness for time penalty
                     hard_cap_multiple: float = 50.0):
    """
    Reward in [0,1], higher is better.
    - Monotonically decreases with energy and time.
    - Diminishing returns when already below the reference.
    - Harmonic blending emphasizes the worse dimension.

    Transform:
      s(r; k) = 1 / (1 + r^k), where r is ratio to reference (>=0).
      r = value / reference. If r=1 => s=0.5. If r<<1 => s→1. If r>>1 => s→0.
    Blend:
      F_beta-like harmonic mean: R = (1+β^2) * (Se * St) / (β^2 * Se + St)
    """

    # Basic validity checks
    if result_json.get("status") != "completed":
        return 0.0
    validation = result_json.get("validation") or {}
    if not validation.get("is_correct", False):
        return 0.0

    em = result_json.get("energy_metrics") or {}
    E = em.get("median_total_energy_joules")
    T = em.get("median_execution_time_seconds")
    if E is None or T is None or E < 0 or T < 0:
        return 0.0

    # Optional hard cap: if absurdly worse than reference, collapse reward
    if E > hard_cap_multiple * ref_energy_j or T > hard_cap_multiple * ref_time_s:
        return 0.0

    # Dimensionless ratios
    rE = E / max(ref_energy_j, 1e-12)
    rT = T / max(ref_time_s, 1e-12)

    # Smooth, bounded, monotone scores (1 is best, 0 is worst)
    Se = 1.0 / (1.0 + (rE ** k_energy))
    St = 1.0 / (1.0 + (rT ** k_time))

    # F_beta-style harmonic mean emphasizing the weaker side
    beta2 = beta_energy * beta_energy
    denom = (beta2 * Se + St)
    if denom <= 0:
        return 0.0
    reward = (1.0 + beta2) * (Se * St) / denom

    # Numerical safety & clamp
    if not (reward == reward):  # NaN check
        return 0.0
    return float(max(0.0, min(1.0, reward)))


## Run Energy Measurements

Now we'll measure both candidates with JouleTrace and calculate their rewards.

In [5]:
print("Queueing slow candidate...")
q_slow = queue_job(candidate_slow, "solve", tests, timeout_seconds=15, trials=3, warmup=1)

print("Queueing fast candidate...")
q_fast = queue_job(candidate_fast, "solve", tests, timeout_seconds=15, trials=3, warmup=1)

print("\nPolling slow candidate...")
r_slow = poll_from_poll_url(q_slow["poll_url"])

print("\nPolling fast candidate...")
r_fast = poll_from_poll_url(q_fast["poll_url"])

print("\nBoth measurements complete")

Queueing slow candidate...
Queueing fast candidate...

Polling slow candidate...
Status: running
Status: completed

Polling fast candidate...
Status: completed

Both measurements complete


## Results Comparison

Let's calculate rewards and compare the two implementations.

In [6]:
# Calculate rewards
slow_reward = energy_reward(r_slow)
fast_reward = energy_reward(r_fast)

slow_em = r_slow.get("energy_metrics") or {}
fast_em = r_fast.get("energy_metrics") or {}


def format_energy(value):
    if value is None:
        return "n/a"
    abs_value = abs(value)
    if abs_value >= 1:
        return f"{value:.3f} J"
    if abs_value >= 1e-3:
        return f"{value * 1e3:.3f} mJ"
    if abs_value >= 1e-6:
        return f"{value * 1e6:.3f} uJ"
    return f"{value * 1e9:.3f} nJ"


def format_time(value):
    if value is None:
        return "n/a"
    abs_value = abs(value)
    if abs_value >= 1:
        return f"{value:.3f} s"
    if abs_value >= 1e-3:
        return f"{value * 1e3:.3f} ms"
    if abs_value >= 1e-6:
        return f"{value * 1e6:.3f} us"
    return f"{value * 1e9:.3f} ns"


def format_power(value):
    if value is None:
        return "n/a"
    abs_value = abs(value)
    if abs_value >= 1:
        return f"{value:.3f} W"
    if abs_value >= 1e-3:
        return f"{value * 1e3:.3f} mW"
    if abs_value >= 1e-6:
        return f"{value * 1e6:.3f} uW"
    return f"{value * 1e9:.3f} nW"


def pad(text):
    return f"{text:<20}"


print("=" * 70)
print("ENERGY EFFICIENCY COMPARISON")
print("=" * 70)

print(f"{'Metric':<30} {pad('Slow (Recursive)')} {pad('Fast (Iterative)')}")
print("-" * 70)
print(f"{'Status':<30} {pad(r_slow.get('status', 'n/a'))} {pad(r_fast.get('status', 'n/a'))}")
print(f"{'Correctness':<30} {pad(str(r_slow.get('validation', {}).get('is_correct')))} {pad(str(r_fast.get('validation', {}).get('is_correct')))}")
print(f"{'Energy':<30} {pad(format_energy(slow_em.get('median_total_energy_joules')))} {pad(format_energy(fast_em.get('median_total_energy_joules')))}")
print(f"{'Time':<30} {pad(format_time(slow_em.get('median_execution_time_seconds')))} {pad(format_time(fast_em.get('median_execution_time_seconds')))}")
print("-" * 70)
print(f"{'REWARD SCORE':<30} {slow_reward:<20.4f} {fast_reward:<20.4f}")
print("=" * 70)

winner = "Fast" if fast_reward > slow_reward else ("Slow" if slow_reward > fast_reward else "Tie")
print(f"Winner: {winner} (reward difference: {abs(fast_reward - slow_reward):.4f})")


ENERGY EFFICIENCY COMPARISON
Metric                         Slow (Recursive)     Fast (Iterative)    
----------------------------------------------------------------------
Status                         completed            completed           
Correctness                    True                 True                
Energy                         2.460 J              1.730 J             
Time                           9.988 ms             6.283 us            
----------------------------------------------------------------------
REWARD SCORE                   0.3850               0.6272              
Winner: Fast (reward difference: 0.2422)


## Statistical Validation

Run multiple trials to verify the reward function consistently identifies the more efficient algorithm.

In [7]:
def run_single_comparison():
    """Run one complete comparison of both candidates."""
    q1 = queue_job(candidate_slow, "solve", tests, timeout_seconds=15, trials=3, warmup=1)
    q2 = queue_job(candidate_fast, "solve", tests, timeout_seconds=15, trials=3, warmup=1)
    r1 = poll_from_poll_url(q1["poll_url"])
    r2 = poll_from_poll_url(q2["poll_url"])
    #return energy_reward(r1, t_budget_s=0.5), energy_reward(r2, t_budget_s=0.5)
    return energy_reward(r1), energy_reward(r2)

# Run multiple comparisons
num_runs = 3
slow_rewards, fast_rewards = [], []

print(f"Running {num_runs} comparison trials...\n")
for i in range(num_runs):
    print(f"Trial {i+1}/{num_runs}:")
    sr, fr = run_single_comparison()
    slow_rewards.append(sr)
    fast_rewards.append(fr)
    print(f"  Slow: {sr:.4f}, Fast: {fr:.4f}\n")

# Calculate averages
avg_slow = sum(slow_rewards) / len(slow_rewards)
avg_fast = sum(fast_rewards) / len(fast_rewards)

print("=" * 50)
print(f"Average rewards over {num_runs} runs:")
print(f"  Slow (recursive): {avg_slow:.4f}")
print(f"  Fast (iterative): {avg_fast:.4f}")
print(f"  Difference: {avg_fast - avg_slow:.4f}")
print("=" * 50)

Running 3 comparison trials...

Trial 1/3:
Status: running
Status: completed
Status: completed
  Slow: 0.4461, Fast: 0.6734

Trial 2/3:
Status: running
Status: completed
Status: completed
  Slow: 0.3862, Fast: 0.6321

Trial 3/3:
Status: running
Status: completed
Status: running
Status: completed
  Slow: 0.4464, Fast: 0.7350

Average rewards over 3 runs:
  Slow (recursive): 0.4262
  Fast (iterative): 0.6802
  Difference: 0.2540


# LCS!

In [None]:
def summarize_energy(result_json: dict) -> dict:
    em = result_json.get("energy_metrics") or {}
    return {
        "package_J": em.get("median_package_energy_joules"),
        "ram_J": em.get("median_ram_energy_joules"),
        "total_J": em.get("median_total_energy_joules"),
        "time_s": em.get("median_execution_time_seconds"),
        "power_W": em.get("power_consumption_watts"),
    }


def _format_with_units(value, thresholds_units, scale):
    if value is None:
        return "n/a"
    abs_value = abs(value)
    for threshold, unit in thresholds_units:
        if abs_value >= threshold:
            return f"{value / scale[unit]:.3f} {unit}"
    unit = thresholds_units[-1][1]
    return f"{value / scale[unit]:.3f} {unit}"


def format_energy(value):
    thresholds = ((1.0, "J"), (1e-3, "mJ"), (1e-6, "uJ"), (1e-9, "nJ"), (0.0, "pJ"))
    scale = {"J": 1.0, "mJ": 1e-3, "uJ": 1e-6, "nJ": 1e-9, "pJ": 1e-12}
    return _format_with_units(value, thresholds, scale)


def format_time(value):
    thresholds = ((1.0, "s"), (1e-3, "ms"), (1e-6, "us"), (1e-9, "ns"), (0.0, "ps"))
    scale = {"s": 1.0, "ms": 1e-3, "us": 1e-6, "ns": 1e-9, "ps": 1e-12}
    return _format_with_units(value, thresholds, scale)


def format_power(value):
    thresholds = ((1.0, "W"), (1e-3, "mW"), (1e-6, "uW"), (1e-9, "nW"), (0.0, "pW"))
    scale = {"W": 1.0, "mW": 1e-3, "uW": 1e-6, "nW": 1e-9, "pW": 1e-12}
    return _format_with_units(value, thresholds, scale)


def pad(text: str, width: int = 22) -> str:
    return f"{text:<{width}}"


In [9]:
import random


def lcs_reference(a: str, b: str) -> int:
    m, n = len(a), len(b)
    dp = [[0] * (n + 1) for _ in range(m + 1)]
    for i in range(m - 1, -1, -1):
        ai = a[i]
        row = dp[i]
        row_next = dp[i + 1]
        for j in range(n - 1, -1, -1):
            if ai == b[j]:
                row[j] = 1 + row_next[j + 1]
            else:
                row[j] = row[j + 1] if row[j + 1] >= row_next[j] else row_next[j]
    return dp[0][0]


rng = random.Random(1234)
alphabet = "abcd"
sizes = [
    (20, 24, "small"),
    (60, 65, "medium"),
    (120, 130, "large"),
]

tests_lcs = []
for m, n, label in sizes:
    a = "".join(rng.choice(alphabet) for _ in range(m))
    b = "".join(rng.choice(alphabet) for _ in range(n))
    expected = lcs_reference(a, b)
    tests_lcs.append({
        "test_id": f"{label}_{m}x{n}",
        "inputs": [a, b],
        "expected_output": expected,
    })

pprint([(t["test_id"], len(t["inputs"][0]), len(t["inputs"][1])) for t in tests_lcs])


[('small_20x24', 20, 24), ('medium_60x65', 60, 65), ('large_120x130', 120, 130)]


In [10]:
# Tests prepared above; this cell intentionally left simple.


In [11]:
candidates_lcs = {
    "fast_dp": """\
def solve(a, b):
    m, n = len(a), len(b)
    dp = [[0]*(n+1) for _ in range(m+1)]
    for i in range(m-1, -1, -1):
        ai = a[i]
        row = dp[i]
        row_next = dp[i+1]
        for j in range(n-1, -1, -1):
            if ai == b[j]:
                row[j] = 1 + row_next[j+1]
            else:
                row[j] = row[j+1] if row[j+1] >= row_next[j] else row_next[j]
    return dp[0][0]
""",
    "slow_dp": """\
def solve(a, b):
    m, n = len(a), len(b)
    res = 0
    for _ in range(12):   # recompute DP 12 times
        dp = [[0]*(n+1) for _ in range(m+1)]
        for i in range(m-1, -1, -1):
            ai = a[i]
            row = dp[i]
            row_next = dp[i+1]
            for j in range(n-1, -1, -1):
                if ai == b[j]:
                    row[j] = 1 + row_next[j+1]
                else:
                    row[j] = row[j+1] if row[j+1] >= row_next[j] else row_next[j]
        res = dp[0][0]
    return res
""",
    "glacial_dp": """\
def solve(a, b):
    m, n = len(a), len(b)
    res = 0
    for repeat in range(45):   # heavy recomputation with extra work
        dp = [[0]*(n+1) for _ in range(m+1)]
        for i in range(m-1, -1, -1):
            ai = a[i]
            row = dp[i]
            row_next = dp[i+1]
            for j in range(n-1, -1, -1):
                if ai == b[j]:
                    val = 1 + row_next[j+1]
                else:
                    val = row[j+1] if row[j+1] >= row_next[j] else row_next[j]
                tmp = val
                for _ in range(12):
                    tmp = tmp if tmp >= val else val
                row[j] = tmp
        res = dp[0][0]
    return res
"""
}

print("Candidates:", list(candidates_lcs.keys()))


Candidates: ['fast_dp', 'slow_dp', 'glacial_dp']


In [12]:
def measure_candidate(code: str,
                      function_name: str,
                      tests: list,
                      timeout_seconds: int = 40,
                      trials: int = 3,
                      warmup: int = 1) -> dict:
    queued = queue_job(code, function_name, tests,
                       timeout_seconds=timeout_seconds,
                       memory_limit_mb=2048,
                       trials=trials,
                       warmup=warmup)
    if isinstance(queued, dict) and "poll_url" in queued:
        return poll_from_poll_url(queued["poll_url"])
    return queued


results_lcs = {}
for name, code in candidates_lcs.items():
    print(f"Submitting {name}...")
    result = measure_candidate(code, "solve", tests_lcs)
    reward = energy_reward(result)
    energy_summary = summarize_energy(result)
    results_lcs[name] = {
        "status": result.get("status"),
        "validation": result.get("validation"),
        "reward": reward,
        "energy": energy_summary,
        "raw_result": result,
    }
    print(f"{name}: status={results_lcs[name]['status']} reward={reward:.4f}")

print("Done.")


Submitting fast_dp...
Status: running
Status: completed
fast_dp: status=completed reward=0.6138
Submitting slow_dp...
Status: running
Status: completed
slow_dp: status=completed reward=0.3137
Submitting glacial_dp...
Status: running
Status: completed
glacial_dp: status=completed reward=0.0226
Done.


In [13]:
# Legacy measurement cell replaced by the helper above.


In [14]:
if not results_lcs:
    print("No LCS results available.")
else:
    rows = sorted(
        results_lcs.items(),
        key=lambda item: item[1].get("reward", 0.0),
        reverse=True,
    )

    print("=" * 80)
    print("LCS CANDIDATE EFFICIENCY".center(74))
    print("=" * 80)
    print(f"{'Candidate':<14}{'Reward':<10}{'Energy':<18}{'Time':<16}{'Power'}")
    print("-" * 80)

    for name, info in rows:
        energy = info.get("energy") or {}
        print(f"{name:<14}"
              f"{info.get('reward', 0.0):<10.4f}"
              f"{pad(format_energy(energy.get('total_J')))}"
              f"{pad(format_time(energy.get('time_s')))}"
              f"{format_power(energy.get('power_W'))}")

    print("-" * 80)
    best_name, best_info = rows[0]
    print(f"Best reward: {best_name} ({best_info.get('reward', 0.0):.4f})")

    print("Energy details:")
    for name, info in results_lcs.items():
        energy = info.get("energy") or {}
        print(f"{name:>12} | "
              f"package={format_energy(energy.get('package_J'))}, "
              f"ram={format_energy(energy.get('ram_J'))}, "
              f"total={format_energy(energy.get('total_J'))}, "
              f"time={format_time(energy.get('time_s'))}, "
              f"power={format_power(energy.get('power_W'))}")


                         LCS CANDIDATE EFFICIENCY                         
Candidate     Reward    Energy            Time            Power
--------------------------------------------------------------------------------
fast_dp       0.6138    1.520 J               1.743 ms              871.891 W
slow_dp       0.3137    2.130 J               20.595 ms             103.423 W
glacial_dp    0.0226    26.950 J              394.148 ms            68.375 W
--------------------------------------------------------------------------------
Best reward: fast_dp (0.6138)
Energy details:
     fast_dp | package=1.350 J, ram=180.000 mJ, total=1.520 J, time=1.743 ms, power=871.891 W
     slow_dp | package=1.980 J, ram=150.000 mJ, total=2.130 J, time=20.595 ms, power=103.423 W
  glacial_dp | package=23.950 J, ram=3.000 J, total=26.950 J, time=394.148 ms, power=68.375 W
