
# HW 1 - Nigel Li (nl2992)
Implement the newton's metehod to compute the implied yield. Utilising Brent's Method from the SciPy package as a benchmark,

The code below is an example of using Brent's method using the [0,20] bracket. The methods comapred for 1000 random cases were: 

**Baseline**
- `brentq`: bracketed root-finder on `[0, 20]`. Very reliable.

**SciPy Newton-family (for comparison)**
- `spop.newton(..., x0, x1, fprime=None)`: secant method.
- `spop.newton(..., x0, fprime=root_prime)`: Newton’s method with derivatives. 

These were overall, quite slow, and not optimised for purpose.


### 3 optimisation ideas ##

Overall , three general ideas are explored that is, 

1) **Manual Newton Method**
**Problem:** Newton computed the dicount separately for the f & f'. We optimsied it in such a way, that it reduces iterations, and re-uses the disc, each time.
**Fix:** `fused_newton()`

2) **Bracketed Newton, using bisection**
**Problem:** Plain Newton can sometimes take a step that is too large (or heads toward the boundary `r -> -1`), which can cause slow convergence or divergence. This is especially risky because `(1+r)` must remain positive when `tk` is non-integer. A new update rule was created that is bounded using the `(lo + high)/2`, which is effectively akin to a `binary search`.
**Fix:** `fused_hybrid_newton()`

3) **Better starting values for SciPy secant (multiple `(x0, x1)` attempts)**
**Problem:** Secant/Newton-family methods are sensitive to initial guesses. A poor choice of `(x0, x1)` can increase iterations or cause failure. We use several potential "scale- aware" pairs and tries the first one that converges.
**Fix:** `scipy_secant_better_starts()`


| Method | user (ms) | sys (ms) | total (ms) | wall (ms) |
|---|---:|---:|---:|---:|
| brentq (benchmark) | 44.3 | 1.88 | 46.1 | 46.2 |
| scipy newton+fprime (benchmark) | 94.3 | 1.13 | 95.4 | 95.0 |
| scipy secant default (benchmark) | 87.4 | 1.03 | 88.5 | 88.0 |
| fused_newton (#1) | 20.3 | 1.76 | 22.1 | 20.9 |
| fused_hybrid_newton (#2) | 45.2 | 4.52 | 49.8 | 46.0 |
| scipy_secant_better_starts (#3) | 91.7 | 1.12 | 92.8 | 92.4 |




## Conclusion
- The main performance bottleneck in Newton-style methods is repeatedly computing `(1+r)^(-tk)`.
- The best improvement was the **manual fused Newton**, which computes the discount factors once per iteration and reuses them for both `f` and `f'`.


In [68]:
import numpy as np
import scipy.optimize as spop
import timeit
from time import perf_counter
import time


In [69]:
# NPV function. tk and ck are the array of time and cashflow.
# Sanity Check: length of tk and ck should be the same, as we want to ensure that each cashflow is matched to time.
# NPV is return as the summation of all discounted cashflows of sum (ck[k] / (1 +r) ** tk[k]) for k in range(len(tk))
def npv(r, tk, ck):
  assert(len(tk) == len(ck))
  return np.sum(np.power(1.0 + r, -tk) * ck)

In [70]:
# Condition for root, such that it will return an r such that npv(r, tk, ck) - p = 0, which is equivalent to npv(r, tk, ck) = p
def root(r, p, tk, ck):
  return npv(r, tk, ck) - p

In [71]:
# Defining the root function's derivative:
# f'(r) = sum (ck * (-tk) * (1 + r) ** (-tk - 1)) for k in range(len(tk))
def root_prime(r, p, tk, ck):
  return np.sum(-tk * np.power(1.0 + r, -tk - 1) * ck)

In [72]:
# intiial guess based on the cash-flow weighted time (like macaulay duration), and then approcimated the entire stream ass a single payment PV0
def initial_guess(p, tk, ck):
    PV0 = np.sum(ck)
    Tbar = np.sum(tk * ck) / PV0
    r0 = (PV0 / p) ** (1.0 / Tbar) - 1.0
    # keep 1+r positive (domain of (1+r)^(-t))
    return max(r0, -0.95)

In [73]:
rng = np.random.default_rng(12345)
cases = []
for _ in range(1000):
    ck = rng.uniform(0, 10, 100)
    ck[-1] += 100
    tk = np.cumsum(rng.uniform(0.1, 0.3, 100))
    p  = rng.uniform(10.0, 100)
    cases.append((float(p), tk.copy(), ck.copy()))

# Optimisations of existing Newton's Method:
1) Do Newton manually and compute f and f' together (reuse the same powers once per step).
- Shows as `fused_newton()`, such that each iteration computed the disc once, and f & f'  in re-using disc, and then using the update rule. The SciPy Newton with derivatives calls the root() fx and computes all powers, then the root_prime() again, so it does `np.power()` twice per iteration.

2) Keep a bracket and only take a Newton step if it stays inside; otherwise do a bisection step (effecitvley a binary search)
- In this case, we started with a bracket, and computed f & f', and used the same update rule. If r_new leaves the bracket initially proposed, it is replaced with the bisection (like a binary search), which does `r_new = (lo + hi)/2` such that we have f_new. 

3) Use better starting values: clamp r0 away from -1 and pick x1 based on scale (not a fixed bump).
- Utilised Secant model again but tried multiple x0, 1 pairs, and returns the first one that converges ()

In [74]:
def fused_newton(p, tk, ck, r0, tol=1e-8, maxiter=50):
    r = float(r0)
    w1 = ck * tk  # precompute once

    for _ in range(maxiter):
        if r <= -0.999999999:
            raise RuntimeError("Out of domain (1+r<=0).")

        onepr = 1.0 + r
        # (1+r)^(-tk)
        disc = np.power(onepr, -tk)       

        f = np.dot(ck, disc) - p
        if abs(f) < tol:
            return r

        fp = -np.dot(w1, disc) / onepr   # f'(r)
        if fp == 0.0 or (not np.isfinite(fp)):
            raise RuntimeError("Bad derivative.")

        r = r - f / fp

    raise RuntimeError("No convergence.")

In [75]:
def fused_hybrid_newton(p, tk, ck, lo=0.0, hi=20.0, r0=None, tol=1e-8, maxiter=80):
    f_lo = root(lo, p, tk, ck)
    f_hi = root(hi, p, tk, ck)
    if f_lo == 0.0:
        return float(lo)
    if f_hi == 0.0:
        return float(hi)
    if f_lo * f_hi > 0.0:
        raise RuntimeError("No sign change in bracket.")

    r = 0.5 * (lo + hi) if r0 is None else float(np.clip(r0, lo, hi))
    w1 = ck * tk  # precompute once

    for _ in range(maxiter):
        onepr = 1.0 + r
        if onepr <= 0.0:
            r = 0.5 * (lo + hi)
            onepr = 1.0 + r

        disc = np.power(onepr, -tk)
        f = np.dot(ck, disc) - p
        if abs(f) < tol:
            return float(r)

        fp = -np.dot(w1, disc) / onepr
        use_newton = np.isfinite(fp) and fp != 0.0
        r_new = (r - f / fp) if use_newton else np.nan

        # keep within bracket; otherwise bisection
        if (not np.isfinite(r_new)) or (r_new <= lo) or (r_new >= hi):
            r_new = 0.5 * (lo + hi)

        f_new = root(r_new, p, tk, ck)

        if f_lo * f_new <= 0.0:
            hi, f_hi = r_new, f_new
        else:
            lo, f_lo = r_new, f_new

        r = r_new

    raise RuntimeError("No convergence.")

In [76]:
def scipy_secant_better_starts(p, tk, ck, tol=1e-8, maxiter=80):
    r0 = initial_guess(p, tk, ck)
    pairs = [
        (r0, r0 + 0.1 * (1.0 + abs(r0))),
        (max(r0, 0.0), max(r0, 0.0) + 0.25),
        (0.02, 0.25),
        (0.10, 0.60),
    ]
    last = None
    for x0, x1 in pairs:
        try:
            r = spop.newton(root, x0=x0, x1=x1, args=(p, tk, ck), tol=tol, maxiter=maxiter)
            if abs(root(r, p, tk, ck)) < 1e-8:
                return float(r)
        except RuntimeError as e:
            last = e
    raise RuntimeError(f"Secant failed. Last: {last}")

# Redefining, such that all run on the same cases.

In [77]:
def solve_brentq(p, tk, ck):
    return spop.brentq(root, 0.0, 20.0, args=(p, tk, ck))

def solve_scipy_secant_default(p, tk, ck):
    r0 = initial_guess(p, tk, ck)
    r1 = r0 + 0.1 * (1.0 + abs(r0))  
    return spop.newton(root, x0=r0, x1=r1, fprime=None, args=(p, tk, ck), tol=1e-8, maxiter=80)

def solve_scipy_newton(p, tk, ck):
    r0 = initial_guess(p, tk, ck)
    return spop.newton(root, x0=r0, fprime=root_prime, args=(p, tk, ck), tol=1e-8, maxiter=50)

def solve_fused_newton(p, tk, ck):
    r0 = initial_guess(p, tk, ck)
    return fused_newton(p, tk, ck, r0=r0, tol=1e-8, maxiter=50)

def solve_fused_hybrid_newton(p, tk, ck):
    r0 = initial_guess(p, tk, ck)
    return fused_hybrid_newton(p, tk, ck, lo=0.0, hi=20.0, r0=r0, tol=1e-8, maxiter=80)

def solve_scipy_secant_better_starts(p, tk, ck):
    return scipy_secant_better_starts(p, tk, ck, tol=1e-8, maxiter=80)

RESULTS = []


In [78]:
def failures(solve_fn):
    fails = 0
    for (p, tk, ck) in cases:
        try:
            r = solve_fn(p, tk, ck)
            if abs(root(r, p, tk, ck)) >= 1e-8:
                fails += 1
        except Exception:
            fails += 1
    return fails

import resource


#  Benchmark 1 — brentq


In [79]:
%%time
ru0 = resource.getrusage(resource.RUSAGE_SELF); t0 = time.perf_counter()
for (p, tk, ck) in cases:
    r = solve_brentq(p, tk, ck)
    assert abs(root(r, p, tk, ck)) < 1e-8
ru1 = resource.getrusage(resource.RUSAGE_SELF); t1 = time.perf_counter()

user_ms = 1000*(ru1.ru_utime-ru0.ru_utime)
sys_ms  = 1000*(ru1.ru_stime-ru0.ru_stime)
wall_ms = 1000*(t1-t0)
RESULTS.append({"Method":"brentq (benchmark)", "user_ms":user_ms, "sys_ms":sys_ms, "total_ms":user_ms+sys_ms, "wall_ms":wall_ms})
print(f"[recorded] user {user_ms:.1f} ms | sys {sys_ms:.2f} ms | total {user_ms+sys_ms:.1f} ms | wall {wall_ms:.1f} ms")


[recorded] user 44.3 ms | sys 1.88 ms | total 46.1 ms | wall 46.2 ms
CPU times: user 44.4 ms, sys: 1.89 ms, total: 46.3 ms
Wall time: 46.3 ms


#  Benchmark 2 — scipy newton (with fprime) 


In [80]:
%%time
ru0 = resource.getrusage(resource.RUSAGE_SELF); t0 = time.perf_counter()
for (p, tk, ck) in cases:
    r = solve_scipy_newton(p, tk, ck)
    assert abs(root(r, p, tk, ck)) < 1e-8
ru1 = resource.getrusage(resource.RUSAGE_SELF); t1 = time.perf_counter()

user_ms = 1000*(ru1.ru_utime-ru0.ru_utime)
sys_ms  = 1000*(ru1.ru_stime-ru0.ru_stime)
wall_ms = 1000*(t1-t0)
RESULTS.append({"Method":"scipy newton+fprime (benchmark)", "user_ms":user_ms, "sys_ms":sys_ms, "total_ms":user_ms+sys_ms, "wall_ms":wall_ms})
print(f"[recorded] user {user_ms:.1f} ms | sys {sys_ms:.2f} ms | total {user_ms+sys_ms:.1f} ms | wall {wall_ms:.1f} ms")


[recorded] user 94.3 ms | sys 1.13 ms | total 95.4 ms | wall 95.0 ms
CPU times: user 94.4 ms, sys: 1.15 ms, total: 95.6 ms
Wall time: 95.1 ms


# Benchmark 3 — scipy secant method


In [81]:
%%time
ru0 = resource.getrusage(resource.RUSAGE_SELF); t0 = time.perf_counter()
for (p, tk, ck) in cases:
    r = solve_scipy_secant_default(p, tk, ck)
    assert abs(root(r, p, tk, ck)) < 1e-8
ru1 = resource.getrusage(resource.RUSAGE_SELF); t1 = time.perf_counter()

user_ms = 1000*(ru1.ru_utime-ru0.ru_utime)
sys_ms  = 1000*(ru1.ru_stime-ru0.ru_stime)
wall_ms = 1000*(t1-t0)
RESULTS.append({"Method":"scipy secant default (benchmark)", "user_ms":user_ms, "sys_ms":sys_ms, "total_ms":user_ms+sys_ms, "wall_ms":wall_ms})
print(f"[recorded] user {user_ms:.1f} ms | sys {sys_ms:.2f} ms | total {user_ms+sys_ms:.1f} ms | wall {wall_ms:.1f} ms")


[recorded] user 87.4 ms | sys 1.03 ms | total 88.5 ms | wall 88.0 ms
CPU times: user 87.6 ms, sys: 1.05 ms, total: 88.6 ms
Wall time: 88.2 ms


#  Improvement #1 — fused_newton


In [82]:
%%time
ru0 = resource.getrusage(resource.RUSAGE_SELF); t0 = time.perf_counter()
for (p, tk, ck) in cases:
    r = solve_fused_newton(p, tk, ck)
    assert abs(root(r, p, tk, ck)) < 1e-8
ru1 = resource.getrusage(resource.RUSAGE_SELF); t1 = time.perf_counter()

user_ms = 1000*(ru1.ru_utime-ru0.ru_utime)
sys_ms  = 1000*(ru1.ru_stime-ru0.ru_stime)
wall_ms = 1000*(t1-t0)
RESULTS.append({"Method":"fused_newton (#1)", "user_ms":user_ms, "sys_ms":sys_ms, "total_ms":user_ms+sys_ms, "wall_ms":wall_ms})
print(f"[recorded] user {user_ms:.1f} ms | sys {sys_ms:.2f} ms | total {user_ms+sys_ms:.1f} ms | wall {wall_ms:.1f} ms")


[recorded] user 20.3 ms | sys 1.76 ms | total 22.1 ms | wall 20.9 ms
CPU times: user 20.4 ms, sys: 1.77 ms, total: 22.2 ms
Wall time: 21 ms


#  Improvement #2 — fused_hybrid_newton 


In [83]:
%%time
ru0 = resource.getrusage(resource.RUSAGE_SELF); t0 = time.perf_counter()
for (p, tk, ck) in cases:
    r = solve_fused_hybrid_newton(p, tk, ck)
    assert abs(root(r, p, tk, ck)) < 1e-8
ru1 = resource.getrusage(resource.RUSAGE_SELF); t1 = time.perf_counter()

user_ms = 1000*(ru1.ru_utime-ru0.ru_utime)
sys_ms  = 1000*(ru1.ru_stime-ru0.ru_stime)
wall_ms = 1000*(t1-t0)
RESULTS.append({"Method":"fused_hybrid_newton (#2)", "user_ms":user_ms, "sys_ms":sys_ms, "total_ms":user_ms+sys_ms, "wall_ms":wall_ms})
print(f"[recorded] user {user_ms:.1f} ms | sys {sys_ms:.2f} ms | total {user_ms+sys_ms:.1f} ms | wall {wall_ms:.1f} ms")


[recorded] user 45.2 ms | sys 4.52 ms | total 49.8 ms | wall 46.0 ms
CPU times: user 45.3 ms, sys: 4.53 ms, total: 49.9 ms
Wall time: 46.1 ms


#  Improvement #3 — scipy_secant_better_starts 


In [84]:
%%time
ru0 = resource.getrusage(resource.RUSAGE_SELF); t0 = time.perf_counter()
for (p, tk, ck) in cases:
    r = solve_scipy_secant_better_starts(p, tk, ck)
    assert abs(root(r, p, tk, ck)) < 1e-8
ru1 = resource.getrusage(resource.RUSAGE_SELF); t1 = time.perf_counter()

user_ms = 1000*(ru1.ru_utime-ru0.ru_utime)
sys_ms  = 1000*(ru1.ru_stime-ru0.ru_stime)
wall_ms = 1000*(t1-t0)
RESULTS.append({"Method":"scipy_secant_better_starts (#3)", "user_ms":user_ms, "sys_ms":sys_ms, "total_ms":user_ms+sys_ms, "wall_ms":wall_ms})
print(f"[recorded] user {user_ms:.1f} ms | sys {sys_ms:.2f} ms | total {user_ms+sys_ms:.1f} ms | wall {wall_ms:.1f} ms")


[recorded] user 91.7 ms | sys 1.12 ms | total 92.8 ms | wall 92.4 ms
CPU times: user 91.9 ms, sys: 1.15 ms, total: 93 ms
Wall time: 92.6 ms


In [85]:

print("| Method | user (ms) | sys (ms) | total (ms) | wall (ms) |")
print("|---|---:|---:|---:|---:|")
for r in RESULTS:
    print(f"| {r['Method']} | {r['user_ms']:.1f} | {r['sys_ms']:.2f} | {r['total_ms']:.1f} | {r['wall_ms']:.1f} |")


| Method | user (ms) | sys (ms) | total (ms) | wall (ms) |
|---|---:|---:|---:|---:|
| brentq (benchmark) | 44.3 | 1.88 | 46.1 | 46.2 |
| scipy newton+fprime (benchmark) | 94.3 | 1.13 | 95.4 | 95.0 |
| scipy secant default (benchmark) | 87.4 | 1.03 | 88.5 | 88.0 |
| fused_newton (#1) | 20.3 | 1.76 | 22.1 | 20.9 |
| fused_hybrid_newton (#2) | 45.2 | 4.52 | 49.8 | 46.0 |
| scipy_secant_better_starts (#3) | 91.7 | 1.12 | 92.8 | 92.4 |
