# Monte-Carlo Estimate of $\pi$

We want to estimate the number $\pi$ using a [Monte-Carlo method](https://en.wikipedia.org/wiki/Pi#Monte_Carlo_methods) exploiting that the area of a quarter circle of unit radius is $\pi/4$ and that hence the probability of any randomly chosen point in a unit square to lie in a unit circle centerd at a corner of the unit square is $\pi/4$ as well.  So for N randomly chosen pairs $(x, y)$ with $x\in[0, 1)$ and $y\in[0, 1)$, we count the number $N_{circ}$ of pairs that also satisfy $(x^2 + y^2) < 1$ and estimage $\pi \approx 4 \cdot N_{circ} / N$.

[<img src="https://upload.wikimedia.org/wikipedia/commons/8/84/Pi_30K.gif" 
     width="50%" 
     align=top
     alt="PI monte-carlo estimate">](https://en.wikipedia.org/wiki/Pi#Monte_Carlo_methods)

## Core Lessons

- Resilience against dying workers

## Set up a Slurm cluster

In [None]:
from dask.distributed import Client
from dask_jobqueue import SLURMCluster
import os

cluster = SLURMCluster(
    cores=24,
    processes=2,
    memory="100GB",
    shebang='#!/usr/bin/env bash',
    queue="batch",
    walltime="00:30:00",
    local_directory='/tmp',
    death_timeout="15s",
    interface="ib0",
    log_directory=f'{os.environ["SCRATCH_cecam"]}/{os.environ["USER"]}/dask_jobqueue_logs/',
    project="ecam",
    name="resilient_clusters")

client = Client(cluster)
client

## The job scripts

In [None]:
print(cluster.job_script())

## Scale the cluster to six nodes

In [None]:
cluster.adapt(minimum=12, maximum=12)  # will lead to six jobs

## The Monte Carlo Method

In [None]:
import dask.array as da
import numpy as np


def calc_pi_mc(size_in_bytes, chunksize_in_bytes=200e6):
    """Calculate PI using a Monte Carlo estimate."""
    
    size = int(size_in_bytes / 8)
    chunksize = int(chunksize_in_bytes / 8)
    
    xy = da.random.uniform(0, 1,
                           size=(size / 2, 2),
                           chunks=(chunksize / 2, 2))
    
    in_circle = ((xy ** 2).sum(axis=-1) < 1)
    pi = 4 * in_circle.mean()

    return pi


def print_pi_stats(size, pi, time_delta, num_workers):
    """Print pi, calculate offset from true value, and print some stats."""
    print(f"{size / 1e9} GB\n"
          f"\tMC pi: {pi : 13.11f}"
          f"\tErr: {abs(pi - np.pi) : 10.3e}\n"
          f"\tWorkers: {num_workers}"
          f"\t\tTime: {time_delta : 7.3f}s")

## The actual calculations

We loop over different volumes of double-precision random numbers and estimate $\pi$ as described above.

In [None]:
from time import time, sleep

In [None]:
for size in (1e9 * n for n in (1, 10, 100)):
    
    start = time()
    pi = calc_pi_mc(size).compute()
    elaps = time() - start

    print_pi_stats(size, pi, time_delta=elaps,
                   num_workers=len(cluster.scheduler.workers))

## What happens if a worker dies?

We'll find out all "our" job ids, mark a few of them non-preemptible, filter for the preemptible jobs, and define a function to kill one randomly selected preemptible job.

In [None]:
def get_current_jobs():
    current_jobs = !squeue | grep R | grep $USER | grep resil | awk '{print $1}'
    return current_jobs

In [None]:
non_preemptible_jobs = get_current_jobs()[:2]
non_preemptible_jobs

In [None]:
def get_preemptible_jobs():
    return list(filter(lambda j: j not in non_preemptible_jobs, get_current_jobs()))

In [None]:
import random

def kill_random_preemptible_job():
    preemptible_jobs = get_preemptible_jobs()
    if preemptible_jobs:
        worker_to_kill = random.choice(preemptible_jobs)
        print(f"will cancel job {worker_to_kill}")
        !scancel {worker_to_kill}

## Let's start a computation with disappearing workers

In [None]:
pi = calc_pi_mc(1e12, 500e6)

In [None]:
pi = client.compute(pi)
print(pi)

In [None]:
sleep(5)

while not pi.done():
    kill_random_preemptible_job()
    sleep(10)

In [None]:
print(pi)

## And get the result

In [None]:
print(pi.result())

## What happened?

The Dask scheduler keeps a suspiciousness counter for each task it manages.  Whenever a worker dies, all tasks that belong to the worker at the time of its death will have their suspiciousness increased by one. In doing so, the scheduler has no way of telling which exact task was responsible for the death of the worker and just flag all of them as bad.

All tasks with suspiciousness `>= 3` (default) are considered bad and won't be rescheduled.

## Make dask more resilient

We can increase the number of allowed failures.  Let's practically disable the threshold and re-do the calculation.

In [None]:
cluster.scheduler.allowed_failures = 1000

_(Note that the above is internal API that we need to use to increase the number of allowed failures for now.  With the current Dask.distributed release that we can't, however, use with Dask jobqueue yet, this can be changed by changing the Dask configuration at runtime.)_

In [None]:
pi = calc_pi_mc(1e12, 500e6)

In [None]:
pi = client.compute(pi)
print(pi)

In [None]:
sleep(5)

while not pi.done():
    kill_random_preemptible_job()
    sleep(10)

In [None]:
print(pi)

In [None]:
print(pi.result())

## Complete listing of software used here

In [None]:
%pip list

In [None]:
%conda list --explicit