In [22]:
import cloudknot as ck
import numpy as np
import pandas as pd # for pd.to_datetime and the timedelta type
import pickle # to save results
import pywren

from functools import partial
from time import sleep

`pywren`'s `get_logs()` method prints the logs and also returns them. We wanted to silence the printing so we use the following `nostdout` context.

In [2]:
import contextlib
import sys

class DummyFile(object):
    def write(self, x): pass

@contextlib.contextmanager
def nostdout():
    save_stdout = sys.stdout
    sys.stdout = DummyFile()
    yield
    sys.stdout = save_stdout

In [25]:
def solve_heateq_2d(t_top=100.0, t_bottom=0.0,
                    t_left=0.0, t_right=0.0, side_len=10,
                    max_iter=10000, rtol=1e-4, atol=1e-7):
    """Solve steady-state 2D heat equation by Gauss-Seidel Method
    
    This is a pedagogical or benchmarking tool only. There are
    better ways to solve the 2D heat equation if that's all you
    want. There are even better ways to implement Gauss-Seidel.
    
    Parameters
    ----------
    max_iter : int, default=10000
        Maximum iteration number for Gauss-Seidel Method
        
    t_top : float, default=100.0
        Dirichlet boundary condition for the top of the plate

    t_bottom : float, default=0.0
        Dirichlet boundary condition for the bottom of the plate
    
    t_left : float, default=0.0
        Dirichlet boundary condition for the left of the plate
    
    t_right : float, default=0.0
        Dirichlet boundary condition for the right of the plate

    side_len : int, default=10
        Number of points on one side of the 2D grid
        
    rtol : float, default=1e-4
        Relative convergence tolerence for early exit of Gauss-Seidel loop

    atol : float, default=1e-7
        Absolute convergence tolerence for early exit of Gauss-Seidel loop
        
    Returns
    -------
    collections.namedtuple
        namedtuple with elements
        temp - final temperature
        iteration - convergence iteration number
            (or max_iter if convergence not reached)
    """
    import numpy as np
    from collections import namedtuple
    
    # Initial guess of interior grid
    t_init = np.mean([t_top, t_bottom, t_left, t_right])

    # Create grid of temps
    t = np.ones((side_len, side_len), dtype=np.float64) * t_init

    # Set Boundary condition
    t[-1, :] = t_top
    t[0, :] = t_bottom
    t[:, -1] = t_right
    t[:, 0] = t_left
    
    # Gauss-Seidel Loop
    t_old = np.copy(t)
    for iteration in range(0, max_iter):
        for i in range(1, side_len-1):
            for j in range(1, side_len-1):
                t[i, j] = 0.25 * (t[i+1][j] + t[i-1][j] + t[i][j+1] + t[i][j-1])

        if np.allclose(t, t_old):
            break
        else:
            t_old = np.copy(t)
    
    return {'temperature': t, 'iteration': iteration}

In [26]:
def solve_heateq_2d_ttop_arg(t_top):
    return solve_heateq_2d(t_top=t_top)

heateq_solver = {}
side_lens = np.array([10, 25, 50, 100, 125, 150, 175])
for side_len in side_lens:
    heateq_solver[side_len] = partial(solve_heateq_2d, side_len=side_len)

In [7]:
def get_max_execution_time(logs):
    start_times = pd.to_datetime([
        list(filter(
            lambda s: 'invocation started' in s[1],
            log
        ))[0][1].split('\t')[1]
        for log in logs
    ])
    
    end_times = pd.to_datetime([
        list(filter(
            lambda s: 'command execution finished' in s[1], 
            log
        ))[0][1].split('\t')[1]
        for log in logs
    ])
    
    return end_times.max() - start_times.min()

In [8]:
def logs_ready(logs):
    return all([any(['command execution finished' in tup[1] for tup in log]) for log in logs])

In [10]:
execution_times = {}
for npoints in np.power(2, np.arange(1, 13)):
    wrenexec = pywren.default_executor()
    print('npoints = {npoints:d}'.format(npoints=int(npoints)))
    args = np.linspace(0, 100, int(npoints))
    futures = wrenexec.map(solve_heateq_2d_ttop_arg, args)
    pywren.get_all_results(futures)
    with nostdout():
        logs = [wrenexec.get_logs(f) for f in futures]
    while not logs_ready(logs):
        sleep(5)
        print('Waiting for logs to catch up...')
        with nostdout():
            logs = [wrenexec.get_logs(f) for f in futures]

    execution_times[npoints] = get_max_execution_time(logs)

npoints = 2
Waiting for logs to catch up...
Waiting for logs to catch up...
npoints = 4
npoints = 8
npoints = 16
Waiting for logs to catch up...
npoints = 32
Waiting for logs to catch up...
npoints = 64
Waiting for logs to catch up...
npoints = 128
npoints = 256
npoints = 512
npoints = 1024
npoints = 2048
npoints = 4096


In [14]:
pywren_nargs_scaling = []
for npoints in execution_times.keys():
    pywren_nargs_scaling.append({
        'npoints': npoints,
        'max_job_time': execution_times[npoints]
    })
pywren_nargs_scaling

[{'max_job_time': Timedelta('0 days 00:00:02.291000'), 'npoints': 2},
 {'max_job_time': Timedelta('0 days 00:00:16.520000'), 'npoints': 4},
 {'max_job_time': Timedelta('0 days 00:00:17.443000'), 'npoints': 8},
 {'max_job_time': Timedelta('0 days 00:00:22.162000'), 'npoints': 16},
 {'max_job_time': Timedelta('0 days 00:00:19.345000'), 'npoints': 32},
 {'max_job_time': Timedelta('0 days 00:00:20.482000'), 'npoints': 64},
 {'max_job_time': Timedelta('0 days 00:00:22.035000'), 'npoints': 128},
 {'max_job_time': Timedelta('0 days 00:00:22.102000'), 'npoints': 256},
 {'max_job_time': Timedelta('0 days 00:00:14.131000'), 'npoints': 512},
 {'max_job_time': Timedelta('0 days 00:00:26.163000'), 'npoints': 1024},
 {'max_job_time': Timedelta('0 days 00:00:38.583000'), 'npoints': 2048},
 {'max_job_time': Timedelta('0 days 00:01:27.600000'), 'npoints': 4096}]

In [15]:
with open('pywren_nargs_scaling.pkl', 'wb') as fp:
    pickle.dump(pywren_nargs_scaling, fp)

In [27]:
execution_times = {}
for side_len in side_lens:
    wrenexec = pywren.default_executor()
    print('side_len = {sl:d}'.format(sl=int(side_len)))
    args = np.linspace(0, 100, 5)
    futures = wrenexec.map(heateq_solver[side_len], args)
    pywren.get_all_results(futures)
    with nostdout():
        logs = [wrenexec.get_logs(f) for f in futures]
    while not logs_ready(logs):
        sleep(5)
        print('Waiting for logs to catch up...')
        with nostdout():
            logs = [wrenexec.get_logs(f) for f in futures]

    execution_times[side_len] = get_max_execution_time(logs)

npoints = 10
Waiting for logs to catch up...
Waiting for logs to catch up...
npoints = 25
Waiting for logs to catch up...
Waiting for logs to catch up...
npoints = 50
Waiting for logs to catch up...
npoints = 100
npoints = 125
npoints = 150


Exception: process ran out of time

In [29]:
pywren_syssize_scaling = []
for side_len in execution_times.keys():
    pywren_syssize_scaling.append({
        'side_len': side_len,
        'max_job_time': execution_times[side_len]
    })
pywren_syssize_scaling

[{'max_job_time': Timedelta('0 days 00:00:02.337000'), 'side_len': 10},
 {'max_job_time': Timedelta('0 days 00:00:02.545000'), 'side_len': 25},
 {'max_job_time': Timedelta('0 days 00:00:12.680000'), 'side_len': 50},
 {'max_job_time': Timedelta('0 days 00:02:06.445000'), 'side_len': 100},
 {'max_job_time': Timedelta('0 days 00:04:30.665000'), 'side_len': 125}]

In [30]:
with open('pywren_syssize_scaling.pkl', 'wb') as fp:
    pickle.dump(pywren_syssize_scaling, fp)