In [1]:
from dask_kubernetes import KubeCluster
import dask.bag as db
from joblib import Memory
from dask.distributed import Client
import functools
from aflow_support_file import slab_nuclearity, select_bimetallic,get_initial_aflow_results, aflow_object_to_atoms
from surface_nuclearity_calculator import slab_enumeration,surface_nuclearity_calculator
from dask.distributed import progress
from pymatgen.io.ase import AseAtomsAdaptor
import dask

# Set the up a kube dask cluster
cluster = KubeCluster.from_yaml('worker-spec.yml')

# Adapt seems to be having problems, used fixed scaling
cluster.scale(2)
client = Client(cluster)

def _worker_upload(dask_worker, *, data, fname):
    dask_worker.loop.add_callback(
    callback=dask_worker.upload_file,
    comm=None,  # not used
    filename=fname,
    data=data,
    load=True)

### Code to upload surface_nuclearity code to every worker as they start/restart
# https://stackoverflow.com/questions/57118226/how-to-properly-use-dasks-upload-file-to-pass-local-code-to-workers
fname_ = ['surface_nuclearity_calculator.py','aflow_support_file.py']
for fname in fname_:
    with open(fname, 'rb') as f:
        data = f.read()
        client.register_worker_callbacks(
            setup=functools.partial(
                _worker_upload, data=data, fname=fname,
            )
        )


# Set up the cache directory - this will also be the local directory on each worker that will store cached results
location = './cachedir'
memory = Memory(location,verbose=1)

# Define the active and inactive elements to consider
actives = set(['Pd', 'Pt', 'Rh', 'Ru', 'Ag', 'Ir'])
hosts = set(['Zn', 'Cd', 'Ga', 'Al', 'In'])

# Gather the aflowlib bulk structures
get_initial_aflow_results = memory.cache(get_initial_aflow_results)
all_aflow_binaries = get_initial_aflow_results(enthalpy_formation_atom=-0.1)
print("Total aflow binaries found = ",len(all_aflow_binaries))

# Find all of the active/inactive combinations
active_inactive_aflow_binaries = list(filter(lambda r: select_bimetallic(r,actives,hosts), 
                                             all_aflow_binaries))[1000:1500] # There is a problematic AlPd structure that stops this from going over 700
print("Number of active/inactive bimetallic structures found = ",len(active_inactive_aflow_binaries))

# Load all of the bulks into a dask bag, and get the atoms objects from aflowlib
active_inactive_aflow_binaries_bag = db.from_sequence(active_inactive_aflow_binaries, 
                                                      npartitions=len(active_inactive_aflow_binaries))
all_structures = active_inactive_aflow_binaries_bag.map(memory.cache(aflow_object_to_atoms))

# Enumerate all of the slabs, and repartition into 10k chunks of surfaces to work on
all_slabs_list = all_structures.map(memory.cache(slab_enumeration), 
                                    active_inactive_aflow_binaries_bag)
all_slabs_list = all_slabs_list.flatten().repartition(npartitions=10000)

# Run the nuclearity calculation on all of the slabs
nuclearity_results = all_slabs_list.map(memory.cache(slab_nuclearity),actives)

# Compute!
nuclearity_results = nuclearity_results.persist()
progress(nuclearity_results)

distributed.scheduler - INFO - Clear task state
distributed.scheduler - INFO -   Scheduler at:  tcp://10.42.14.116:33111
distributed.scheduler - INFO -   dashboard at:                     :8787
distributed.scheduler - INFO - Receive client connection: Client-5164e634-4a0a-11eb-883e-eea2898f98fb
distributed.core - INFO - Starting established connection
distributed.scheduler - INFO - Register worker <Worker 'tcp://10.42.14.166:43159', name: 1, memory: 0, processing: 0>
distributed.scheduler - INFO - Starting worker compute stream, tcp://10.42.14.166:43159
distributed.core - INFO - Starting established connection


Total aflow binaries found =  80945
Number of active/inactive bimetallic structures found =  500


distributed.scheduler - INFO - Register worker <Worker 'tcp://10.42.14.165:39499', name: 0, memory: 0, processing: 0>
distributed.scheduler - INFO - Starting worker compute stream, tcp://10.42.14.165:39499
distributed.core - INFO - Starting established connection


VBox()

In [None]:
active_inactive_aflow_binaries

In [None]:
    env:
    - name: OMP_NUM_THREADS
      value: 2