In [5]:
from functools import partial, reduce
from itertools import accumulate, repeat, chain, takewhile, tee, islice
import more_itertools

In [6]:
def take(n):
    return lambda iterable: islice(iterable, 0, n)

def head(iterable, n=10):
    return list(take(n)(iterable))
    
last = more_itertools.last
        
def pipeline(*func_list):
    def compose_two(f, g):
        return lambda x: g(f(x))
    return reduce(compose_two, func_list)

def pipeline_eval(x, *func_list):
    def apply(x, f):
        return f(x)
    return reduce(apply, func_list, x)

# def filter_printer(iterable):
#     for element in iterable:
#         print(element)
#         yield element
        
def print_and_return(x):
    print(x)
    return x

filter_printer = partial(map, print_and_return)

In [7]:
def iterate_seq(x0, update):
    x = x0
    yield x
    while True:
        x = update(x)
        yield x

def iterate_seq(x0, update):
    apply = lambda x, f: f(x)
    return accumulate(chain((x0,), 
                            repeat(update)),
                      apply)

# def newton_sequence(x0, f, df):
#     x = x0
#     while True:
#         yield x
#         x -= f(x)/df(x)

def newton_sequence(x0, f, df):
    def newton_improve(x):
        return x - f(x)/df(x)
    return iterate_seq(x0, newton_improve)

# def filter_cauchy_tol(difference_function = lambda x1, x2: abs(x1 - x2), tol=1e-10):
#     def my_filter(iterable):
#         x1 = next(iterable)
#         yield x1
#         x2 = next(iterable)
#         while( difference_function(x1, x2) > tol ):
#             x1, x2 = x2, next(iterable)
#             yield x2
#     return my_filter

# def filter_cauchy_tol(difference_function = lambda x1, x2: abs(x1 - x2), tol=1e-10):
#     predicate = lambda tup: difference_function(*tup) > tol
#     def my_filter(iterable):
#         copy1, copy2 = tee(iterable)
#         x0 = next(copy2)
#         filtered_pairs = takewhile(predicate, zip(copy1, copy2))
#         filtered_vals = map(lambda tup: tup[1], filtered_pairs)
#         return chain((x0,), filtered_vals)
#     return my_filter

def collect_pairs(iterable):
    pair = tuple(islice(iterable, 0, 2))
    def next_pair(pair, next_element):
        return (pair[1], next_element)
    my_iterable = chain((pair,), iterable)
    yield from accumulate(my_iterable, next_pair)
    
def pairs_to_element(iterable):
    first_pair = next(iterable)
    yield from first_pair
    select_2nd = lambda tup: tup[1]
    yield from map(select_2nd, iterable)
    
def prepend_first(mapping, iterable):
    first = next(iterable)
    yield first
    yield from mapping(chain((first,), iterable))

def filter_cauchy_tol(difference_function = lambda x1, x2: abs(x1 - x2), tol=1e-10):
    predicate = lambda tup: difference_function(*tup) > tol
    my_filter = pipeline(collect_pairs, 
                         partial(takewhile, predicate),
                         pairs_to_element)
    return my_filter
    
def filter_f_tol(f, tol=1e-10):
    def my_filter(iterable):
        x = next(iterable)
        yield x
        while f(x) > tol:
            x = next(iterable)
            yield x
    return my_filter

def filter_f_tol(f, tol=1e-10):
    predicate = lambda x: abs(f(x)) > tol
    return partial(takewhile, predicate)
        
# def filter_max_iter(max_iter=10**3):
#     def my_filter(iterable):
#         for _, element in zip(range(max_iter), iterable):
#             yield element
#     return my_filter

# def filter_max_iter(max_iter=10**3):
#     def my_filter(iterable):
#         take_first = lambda tup: tup[0]
#         return map(take_first, zip(iterable, range(max_iter)))
#     return my_filter

# def take_10(iterable):
#     for _ in range(10):
#         yield next(iterable)
        
# def take_n(iterable, n=10):
#     for _ in range(n):
#         yield next(iterable)

# def filter_max_iter(max_iter=10**3):
#     def my_filter(iterable):
#         for _ in range(max_iter):
#             yield next(iterable)
#     return my_filter

def filter_max_iter(max_iter=10**3):
    def my_filter(seq):
        return islice(seq, 0, max_iter)
    return my_filter

In [8]:
def example_generator():
    yield 1
    yield 2
    yield 3
    
print(list(example_generator()))
my_gen = example_generator()
a = next(my_gen)
print(a)
for _ in range(4):
    print(next(my_gen))

[1, 2, 3]
1
2
3


StopIteration: 

In [9]:
x0 = 8
f = lambda x: (x-1)*(x-5)
df = lambda x: 2*x - 6
list(
    filter_max_iter(4)(
        newton_sequence(x0, f, df)))

[8, 5.9, 5.139655172413793, 5.004557642613021]

In [10]:
def pipeline_eval(x, *func_list):
    def apply(x, f):
        return f(x)
    return reduce(apply, func_list, x)
f = lambda x: x+1
g = lambda x: x**2
print(pipeline_eval(2, f, g))
print(pipeline_eval(2, g, f))

9
5


In [11]:
x0 = 8
f = lambda x: (x-1)*(x-5)
df = lambda x: 2*x - 6
pipeline_eval(newton_sequence(x0, f, df),
              filter_max_iter(4),
              list)

[8, 5.9, 5.139655172413793, 5.004557642613021]

In [12]:
def filter_f_tol(f, tol=1e-10):
    def predicate(x):
        return abs(f(x) > tol)
    return partial(takewhile, predicate)

x0 = 8
f = lambda x: (x-1)*(x-5)
df = lambda x: 2*x - 6
pipeline_eval(newton_sequence(x0, f, df),
              filter_max_iter(20),
              filter_f_tol(f, tol=1e-4),
              list)

[8, 5.9, 5.139655172413793, 5.004557642613021]

In [13]:
def distance(x1, x2):
    return abs(x1 - x2)

def filter_cauchy_tol(distance=distance, tol=1e-10):
    predicate = lambda tup: distance(*tup) > tol
    my_filter = pipeline(collect_pairs, 
                         partial(takewhile, predicate),
                         pairs_to_element)
    return my_filter

x0 = 8
f = lambda x: (x-1)*(x-5)
df = lambda x: 2*x - 6
pipeline_eval(newton_sequence(x0, f, df),
              filter_max_iter(20),
              filter_cauchy_tol(tol=1e-2),
              list)

[8, 5.9, 5.139655172413793, 5.004557642613021]

$f(x) = (x-1)(x-5) = x^2 - 6x + 5$

In [19]:
list(
    filter_max_iter(max_iter=20)(
#         filter_f_tol(f=lambda x: abs(f(x)), tol=1e-10)(
        filter_cauchy_tol(tol=1e-10)(
            newton_sequence(8, f, df)
        )
    )
)

[8,
 5.9,
 5.139655172413793,
 5.004557642613021,
 5.000005181219474,
 5.000000000006711]

In [38]:
pipeline_eval(
    newton_sequence(8, f, df),
    filter_max_iter(max_iter=20),
    filter_f_tol(f = lambda x: abs(f(x))),
    last)

5.000005181219474

In [39]:
def newton_root_find(x0, f, df=, df_inv=None, 
                     max_iter=20, tol=1e-15):
    return pipeline_eval(
        newton_sequence(x0, f, df), 
        filter_f_tol(f = lambda x: abs(f(x)), tol=tol), 
        filter_max_iter(max_iter), 
        more_itertools.last)

newton_root_find(8, f, df)

5.000000000006711

In [46]:
import numpy as np
import numpy.linalg as la
# def muli_newton_sequence(x0, jac, hes):
#     x = x0
#     yield x
#     while True:
#         x -= la.solve(hes(*x), jac(*x))
#         yield x

def multi_newton_sequence(x0, jac, hes):
    def muli_newton_improve(x):
        return x - la.solve(hes(*x), jac(*x))
    return iterate_seq(x0, muli_newton_improve)

def vec_dist(x, y):
    return la.norm(x - y)

def multi_newton_root_find(x0, jac, hes):
    return pipeline_eval(muli_newton_sequence(x0, jac, hes),
                         filter_max_iter(max_iter=10),
                         filter_cauchy_tol(distance = vec_dist),
                         last)

# symbolic stuff - this is just because I don't want to manually define the Jacobian and Hessian functions
import sympy as sym
x, y = sym.symbols('x y')
f = sym.sin(x*y)
jac = [f.diff(var) for var in (x,y)]
hes = sym.Matrix([[f.diff(var) for var in (x,y)] for f in jac])

# numeric stuff
f_numeric = sym.lambdify((x,y), f)
jac_numeric = sym.lambdify((x,y), jac, 'numpy')
hes_numeric = sym.lambdify((x,y), hes, 'numpy')



root = multi_newton_root_find(np.array([1, 1]), jac_numeric, hes_numeric)
print(f'The critical point is at \nx = {root}')
print(f'f(x) = {f_numeric(*root)}')
print(f'Jac(f)(x) = {jac_numeric(*root)}')

The critical point is at 
x = [1.25331414 1.25331414]
f(x) = 1.0
Jac(f)(x) = [-1.7218578332986044e-13, -1.7218578332986039e-13]


# Reuse with Secant Method

In [48]:
def secant_sequence(x0, x1, f):
    yield x0
    yield x1
    while True:
        x0, x1 = x1, x1 - f(x1)*(x1 - x0)/(f(x1) - f(x0))
        yield x1
        
def secant_sequence(x0, x1, f):
    def secant_update(tup):
        y0, y1 = tup
        return y1, y1 - f(y1)*(y1 - y0)/(f(y1) - f(y0))
    return map(lambda tup: tup[0],
               iterate_seq((x0, x1), secant_update))

In [49]:
f = lambda x: (x-1)*(x-5)
df = lambda x: 2*x - 6

pipeline_eval(secant_sequence(7, 8, f),
              filter_f_tol(f = lambda x: abs(f(x)), tol=1e-15),
              filter_max_iter(),
              take(15),
              list)

[7,
 8,
 5.666666666666666,
 5.260869565217391,
 5.035294117647059,
 5.002143112275271,
 5.000018734472505,
 5.000000010032098,
 5.000000000000047]

In [15]:
# %matplotlib inline
# import time
# from IPython import display
# from random import random
# import matplotlib.pyplot as plt
# for i in range(10):
#     plt.clf()
#     plt.plot([10*(random()-.5) for _ in range(100)], 'b.')
#     plt.ylim(-5, 5)
#     plt.xlim(-1, 101)
#     display.display(plt.gcf())
#     display.clear_output(wait=True)
#     time.sleep(1.0)

Libnitz pi

In [16]:
a, b = 1, 20000


v = 0
for i in range(a, b, 4):
    v += 1/(i * (i + 2))
print(v*8)

3.1414926535900367


In [17]:
def i_generator(i, step):
    yield i
    while True:
        i += step
        yield i
        
def print_running(iterable):
    for x in iterable:
        print(str(x) + ' '*100, end='\r')
        yield x

In [18]:
pipeline_eval( i_generator(1, 4), [
    partial(map, lambda x: 1/(x*(x+2))),
    accumulate,
    partial(map, lambda x: 8*x),
    take(20_000),
    print_running,
    last
] )

3.1415676535897927                                                                                                    

3.1415676535897927