# Monte-Carlo Estimate of $\pi$

We want to estimate the number $\pi$ using a [Monte-Carlo method](https://en.wikipedia.org/wiki/Pi#Monte_Carlo_methods) exploiting that the area of a quarter circle of unit radius is $\pi/4$ and that hence the probability of any randomly chosen point in a unit square to lie in a unit circle centerd at a corner of the unit square is $\pi/4$ as well.  So for N randomly chosen pairs $(x, y)$ with $x\in[0, 1)$ and $y\in[0, 1)$, we count the number $N_{circ}$ of pairs that also satisfy $(x^2 + y^2) < 1$ and estimage $\pi \approx 4 \cdot N_{circ} / N$.

[<img src="https://upload.wikimedia.org/wikipedia/commons/8/84/Pi_30K.gif" 
     width="50%" 
     align=top
     alt="PI monte-carlo estimate">](https://en.wikipedia.org/wiki/Pi#Monte_Carlo_methods)

## Why Adaptive?

Using [Dask's adaptivity](http://docs.dask.org/en/latest/setup/adaptive.html), we'll show that it is possible to scale the available resources to meet almost any desired wall times irrespective of the actual work load.  This is important, because it allows for focussing on the situation of the human running the analysis:

> _"Am I in an explorative and creative phase of my work where it is important that I can see the next plot within seconds?"_

> _"Am I running routine analyses that can wait until tomorrow or next week?"_

## Actual timings

Aming for a duration of 20 seconds per calculation, this is what we actually get:

- $\pi$ from 50.0 GB of random data in 21.82 s with 13 workers
- $\pi$ from 100.0 GB of random data in 16.73 s with 25 workers
- $\pi$ from 200.0 GB of random data in 15.79 s with 53 workers
- $\pi$ from 400.0 GB of random data in 16.94 s with 122 workers
- $\pi$ from 800.0 GB of random data in 21.84 s with 241 workers
- $\pi$ from 1600.0 GB of random data in 26.89 s with 400 workers
- $\pi$ from 3200.0 GB of random data in 45.50 s with 392 workers

## Tuning adaptivity

The following tunes a Dask cluster to use anywhere between 1 and 400 workers and to scale its size so that any computation is finished within 20 seconds.  On Pangeo, time scales for starting / stopping workers are of the order of a few seconds, so we set a startup cost to 5 seconds (instead of the default value of 1 second) and increase possible scale-down times by setting the relevant interval to 2 seconds and the number of times a worker needs to be considered expendable before it is actually killed to `10`.  We also reduce the default factor that is applied to adapt the cluster to a more modest `1.2`.

To see all available args and defaults for tuning adaptivity, check the [docs of dask.distributed.Adaptive](http://docs.dask.org/en/latest/setup/adaptive.html) or skip to the cell with the [docstring of Adaptive at the end of this notebook](#Docstring-of-Adaptive).

In [1]:
from dask_kubernetes import KubeCluster
cluster = KubeCluster(n_workers=1)

In [2]:
cluster.adapt(minimum=1, maximum=400,
              target_duration="20s",
              interval="2s",
              wait_count=10,
              startup_cost="5s",
              scale_factor=1.2);

In [3]:
from dask.distributed import Client
c = Client(cluster)
c

0,1
Client  Scheduler: tcp://10.48.74.164:40711  Dashboard: /user/willirath-pange-ample-notebooks-yxzvs48p/proxy/8787/status,Cluster  Workers: 0  Cores: 0  Memory: 0 B


(Check the dash board to see the cluster scale up and down!)

## The actual calculations

We loop over volumes of 50 GB, 100 GB, 200 GB, ..., 3200 GB of double-precision random numbers and estimate $\pi$ as described above.

In [4]:
import dask.array as da
import numpy as np
from time import time

def calc_pi_mc(size):
    xy = da.random.uniform(0, 1,
                           size=(int(size / 8 / 2), 2),
                           chunks=(int(500e6 / 8), 2))
    
    in_circle = ((xy ** 2).sum(axis=-1) < 1)
    pi = 4 * in_circle.mean()

    start = time()
    pi = pi.compute()
    end = time()
    
    num_workers = len(cluster.scheduler.workers)
    
    print("Size of data:", xy.nbytes / 1e9, "GB")
    print("Monte-Carlo pi:", pi)
    print("Numpys pi:", np.pi)
    print("Delta:", abs(pi - np.pi))
    print("Duration: {:.2f} seconds with {} workers".format(
        end - start, num_workers))
    print()

In [5]:
from time import sleep

for size in [1e9 * n for n in [50, 100, 200, 400,
                               800, 1600, 3200]]:
    
    calc_pi_mc(size)
    sleep(30)  # allow for some scale-down time

Size of data: 50.0 GB
Monte-Carlo pi: 3.1415611776
Numpys pi: 3.141592653589793
Delta: 3.147598979325039e-05
Duration: 21.82 seconds with 13 workers

Size of data: 100.0 GB
Monte-Carlo pi: 3.14157815872
Numpys pi: 3.141592653589793
Delta: 1.4494869793324483e-05
Duration: 16.73 seconds with 25 workers

Size of data: 200.0 GB
Monte-Carlo pi: 3.14158670528
Numpys pi: 3.141592653589793
Delta: 5.94830979316896e-06
Duration: 15.79 seconds with 53 workers

Size of data: 400.0 GB
Monte-Carlo pi: 3.14159057072
Numpys pi: 3.141592653589793
Delta: 2.0828697930852513e-06
Duration: 16.94 seconds with 122 workers

Size of data: 800.0 GB
Monte-Carlo pi: 3.14159497536
Numpys pi: 3.141592653589793
Delta: 2.321770206759055e-06
Duration: 21.84 seconds with 241 workers

Size of data: 1600.0 GB
Monte-Carlo pi: 3.14159482016
Numpys pi: 3.141592653589793
Delta: 2.1665702067963366e-06
Duration: 26.89 seconds with 400 workers

Size of data: 3200.0 GB
Monte-Carlo pi: 3.14159423066
Numpys pi: 3.141592653589793
D

## Docstring of Adaptive

In [6]:
from dask.distributed import Adaptive

In [7]:
Adaptive?

[0;31mInit signature:[0m [0mAdaptive[0m[0;34m([0m[0mscheduler[0m[0;34m,[0m [0mcluster[0m[0;34m=[0m[0;32mNone[0m[0;34m,[0m [0minterval[0m[0;34m=[0m[0;34m'1s'[0m[0;34m,[0m [0mstartup_cost[0m[0;34m=[0m[0;34m'1s'[0m[0;34m,[0m [0mscale_factor[0m[0;34m=[0m[0;36m2[0m[0;34m,[0m [0mminimum[0m[0;34m=[0m[0;36m0[0m[0;34m,[0m [0mmaximum[0m[0;34m=[0m[0;32mNone[0m[0;34m,[0m [0mwait_count[0m[0;34m=[0m[0;36m3[0m[0;34m,[0m [0mtarget_duration[0m[0;34m=[0m[0;34m'5s'[0m[0;34m,[0m [0mworker_key[0m[0;34m=[0m[0;34m<[0m[0mfunction[0m [0mAdaptive[0m[0;34m.[0m[0;34m<[0m[0;32mlambda[0m[0;34m>[0m [0mat[0m [0;36m0x7fcbc0584e18[0m[0;34m>[0m[0;34m,[0m [0;34m**[0m[0mkwargs[0m[0;34m)[0m[0;34m[0m[0m
[0;31mDocstring:[0m     
Adaptively allocate workers based on scheduler load.  A superclass.

Contains logic to dynamically resize a Dask cluster based on current use.
This class needs to be paired with a system tha