# Config

In [1]:
DRY_RUN = False
N_SIMULTANEOUS_ARGO_JOBS = 15
N_SIMULATIONS = 100

# Setup

Build/deploy most recent container image

In [2]:
%%bash
docker build --platform linux/amd64 -t dberry-jetstream . 
docker tag dberry-jetstream gcr.io/moz-fx-data-experiments/dberry-jetstream
docker push gcr.io/moz-fx-data-experiments/dberry-jetstream

Using default tag: latest
The push refers to repository [gcr.io/moz-fx-data-experiments/dberry-jetstream]
b566b48f6c9a: Preparing
d8834038e9fd: Preparing
5f70bf18a086: Preparing
514a73f2b801: Preparing
769d8974a06f: Preparing
08b87a80ce3a: Preparing
82b375d6a9af: Preparing
ba00c52d3702: Preparing
04e8f209746d: Preparing
7ed1d87a050a: Preparing
3ffc178e6d86: Preparing
327e42081bbe: Preparing
6e632f416458: Preparing
e019be289189: Preparing
c9a63110150b: Preparing
08b87a80ce3a: Waiting
04e8f209746d: Waiting
7ed1d87a050a: Waiting
3ffc178e6d86: Waiting
327e42081bbe: Waiting
6e632f416458: Waiting
e019be289189: Waiting
c9a63110150b: Waiting
82b375d6a9af: Waiting
ba00c52d3702: Waiting
769d8974a06f: Layer already exists
514a73f2b801: Layer already exists
5f70bf18a086: Layer already exists
82b375d6a9af: Layer already exists
ba00c52d3702: Layer already exists
08b87a80ce3a: Layer already exists
7ed1d87a050a: Layer already exists
3ffc178e6d86: Layer already exists
04e8f209746d: Layer already exists

#1 [internal] load build definition from Dockerfile
#1 sha256:445696662ac32c88c154ceb4a74e934d9f5e083e1925a118359606403fe1a0fa
#1 transferring dockerfile: 37B done
#1 DONE 0.0s

#2 [internal] load .dockerignore
#2 sha256:ac536abdba951d9c6f3ea278a85b89fc231851097dc3b71936e2a32a25a70292
#2 transferring context: 34B done
#2 DONE 0.0s

#3 [internal] load metadata for docker.io/library/python:3.8
#3 sha256:edad251955f644c6004999f0af04035912392fa02db26821676452becbc715fb
#3 DONE 0.7s

#4 [1/7] FROM docker.io/library/python:3.8@sha256:02991ad89f1fc4e37205832fe7b4886d7035bb0cd151d5dc4ebc25e6965b0f0d
#4 sha256:42783365d070be6211be16cccc88d7e522c73242f8c7d7b53ebb249e7475a014
#4 DONE 0.0s

#5 [internal] load build context
#5 sha256:4e4d2586cee80e769ec8a9730caaa1d2be00156af98ba0d6bbab0febf6409eef
#5 transferring context: 111.93kB 0.0s done
#5 DONE 0.1s

#6 [2/7] COPY requirements.txt requirements.txt
#6 sha256:7440dff19ab32eb2e03da4e1404b78c80c9c922f08e859af0bdf99f518990530
#6 CACHED

#7 [3/7] RUN

Ensure Argo template is correct (in case 0_create_cache.sh was run immediately prior)

In [3]:
sed_str = f's/PARAM/{DRY_RUN}/g'

In [4]:
%%bash -s "$sed_str"
cp jetstream/workflows/run_template.yaml jetstream/workflows/run.yaml
sed -i '' -e $1 jetstream/workflows/run.yaml

Imports

In [5]:
import dask
import dask.array as da
from dask.distributed import Client
import subprocess

Set up dask client
- Set `threads_per_worker` to 1 to ensure that each worker only launches 1 job at a time
- Then set `n_workers` to the number of parallel Argo jobs you want to run

In [6]:
dask_client = Client(n_workers=N_SIMULTANEOUS_ARGO_JOBS, threads_per_worker=1)

Define a function which launches one simulation run using Argo

In [7]:
@dask.delayed
def run_simulation(simulation_iteration: int) -> bool:
    dataset = 'dberry_simulated_AA_tests_{:02d}'.format(simulation_iteration)
    
    drop_dataset_result = subprocess.run(['bq','rm', '-d', '-r', '-f', dataset])
    if drop_dataset_result.returncode != 0:
        print(f'Unable to delete dataset {dataset}')
        return (simulation_iteration, False)
        
    create_dataset_result = subprocess.run(['bq','mk', dataset], capture_output=True)
    if create_dataset_result.returncode != 0:
        print(f'Unable to create dataset {dataset}! {create_dataset_result.stdout} {create_dataset_result.stderr}')
        return (simulation_iteration, False)
        
    argo_run_result = subprocess.run([
        'jetstream',
        'run-argo',
        '--date','2022-02-22',
        '--project-id','moz-fx-data-experiments',
        '--dataset-id',f'{dataset}',
        '--bucket','dberry-simulated-aa-tests-temporary',
        '--experiment-slug', 'more-from-mozilla-96',
        '--cluster-id','jetstream',
        '--zone','us-central1-a'],
        capture_output=True
    )
    if argo_run_result.returncode != 0:
        print(f'Simulation {simulation_iteration} failed! {argo_run_result.stdout} {argo_run_result.stderr}')
        return (simulation_iteration, False)
        
    return (simulation_iteration, True)
        
    

Create a future for each of the number of simulations you want to run

In [8]:
sim_futures = [run_simulation(i) for i in range(N_SIMULATIONS)]

# Run Simulations
Note that the below cell will take a while, approximately $\left \lceil{\frac{\text{N_SIMULATIONS}}{\text{N_SIMULTANEOUS_ARGO_JOBS}}}\right \rceil \cdot 30$ minutes when not in dry run

In [None]:
%%time
res = dask.compute(*sim_futures)

In [None]:
res