In [1]:
import numpy as np
from scipy.optimize import fminbound
import matplotlib.pyplot as plt

from optimalgrowth_loglinear import LogLinearOG

# Solve over a Grid

In [2]:
#-LogLinear Optimal Growth Problem-#
lg = LogLinearOG()
# == Unpack parameters / functions for convenience == #
alpha, beta, mu, s = lg.alpha, lg.beta, lg.mu, lg.s
v_star = lg.v_star

In [3]:
grid_max = 4         # Largest grid point
grid_size = 200      # Number of grid points
shock_size = 250     # Number of shock draws in Monte Carlo integral

grid = np.linspace(1e-5, grid_max, grid_size)

#-Compute Required Info-#
w = v_star(grid)
u = np.log
f = lambda k: k**alpha
shocks = np.exp(mu + s * np.random.randn(shock_size))
w_func = lambda x: np.interp(x, grid, w)
Tw = np.empty_like(w)

In [4]:
%%timeit 
#-Iteration-#
for i, y in enumerate(grid):
    # == set Tw[i] = max_c { u(c) + beta E w(f(y  - c) z)} == #
    def objective(c):
        return - u(c) - beta * np.mean(w_func(f(y - c) * shocks))
    c_star = fminbound(objective, 1e-10, y)
    Tw[i] = - objective(c_star)

126 ms ± 721 µs per loop (mean ± std. dev. of 7 runs, 10 loops each)


In [5]:
Tw[0]

-51.494684747586383

# Using Dask to Compute Grid

In [6]:
import dask
from dask.distributed import Client
client = Client()
print(client)

<Client: scheduler='tcp://127.0.0.1:40272' processes=8 cores=8>


In [7]:
def generate_objective(y):
    return lambda c: - u(c) - beta * np.mean(w_func(f(y - c) * shocks))

In [8]:
#Delayed Computation using same parameters as above#
Tw_dask_delayed = []
#-Iteration-#
for i, y in enumerate(grid):
    objfunc = dask.delayed(generate_objective)(y)
    c_star = dask.delayed(fminbound)(objfunc, 1e-10, y)
    Tw_dask_delayed.append(-1 * dask.delayed(objfunc)(c_star))

In [9]:
%%timeit
Tw_dask = np.array(dask.compute(*Tw_dask_delayed))

337 ms ± 7.14 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)


In [10]:
Tw_dask = np.array(dask.compute(*Tw_dask_delayed))

In [11]:
Tw_dask[0]

-51.494684747586383

In [12]:
Tw[0]

-51.494684747586383

In [13]:
np.allclose(Tw, Tw_dask)

True

While the **correct** data is computed this is much slower due to the interprocess overhead that dask introduces. 

Currently dask introduces around **100ms** of overhead for the scheduler. 

http://dask.pydata.org/en/latest/shared.html

Dask is doing a lot of coordination work that is inefficient for this computation. We get better performance on a single core.
