In [29]:
from dask.distributed import Client
from dask_jobqueue import SLURMCluster
import os

In [30]:
# PSJ: Largely following Noel's script here for the configuration

n_processes = 18  # number of processes to run on each node
memory = 90000  # to fit on a standard node; ask for 184,000 for a bigmem node

cluster = SLURMCluster(
    project='rlmolecule',
    walltime='30',  # 30 minutes to fit in the debug queue; 180 to fit in short
    job_mem=str(memory),
    job_cpu=36,
    interface='ib0',
    local_directory='/tmp/scratch/dask-worker-space',
    cores=18,
    processes=n_processes,
    memory='{}MB'.format(memory),
    queue='debug'  # Obviously this is limited to only a single job -- comment this out for larger runs
)

print(cluster.job_script())

#!/usr/bin/env bash

#SBATCH -J dask-worker
#SBATCH -p debug
#SBATCH -A rlmolecule
#SBATCH -n 1
#SBATCH --cpus-per-task=36
#SBATCH --mem=90000
#SBATCH -t 30

/projects/rlmolecule/pstjohn/envs/tf2_gpu/bin/python -m distributed.cli.dask_worker tcp://10.148.8.97:33499 --nthreads 1 --nprocs 18 --memory-limit 5.00GB --name name --nanny --death-timeout 60 --local-directory /tmp/scratch/dask-worker-space --interface ib0



Perhaps you already have a cluster running?
Hosting the HTTP server on port 37203 instead
  http_address["port"], self.http_server.port


In [31]:
# Create the client
dask_client = Client(cluster)

In [32]:
n_nodes = 1 # set this to the number of nodes you would like to start as workers
cluster.scale(n_processes * n_nodes)

In [93]:
!squeue -u $USER

             JOBID PARTITION     NAME     USER ST       TIME  NODES NODELIST(REASON)
           3937554       gpu crystal_  pstjohn  R 1-10:10:15      1 r103u01
           3953949     debug dask-wor  pstjohn  R      18:47      1 r3i7n35


In [36]:
import psycopg2
import pandas as pd

In [37]:
dbparams = {
    'dbname': 'bde',
    'port': 5432,
    'host': 'yuma.hpc.nrel.gov',
    'user': 'bdeops',
    'password': '*****!',  # deleting the password from the repo
    'options': f'-c search_path=bde',
}

with psycopg2.connect(**dbparams) as conn:

    redf = pd.read_sql_query("""
    SELECT * from redoxcompound where estate='radical' and status='finished' limit 500
    """, conn)    

In [97]:
import rdkit.Chem
import numpy as np
import time

In [98]:
def do_calc(logfile, atom_index):
    """ Write your expensive calculation here """
    time.sleep(1)
    return np.random.random()

def apply_func(row):
    mol = rdkit.Chem.MolFromSmiles(row.smiles)
    results = np.asarray([(atom.GetIdx(), do_calc(row.logfile, atom.GetIdx())) for atom in mol.GetAtoms()])
    df = pd.DataFrame(results, columns=['atom_index', 'buried_vol'])
    df['atom_index'] = df['atom_index'].astype(int)
    df['smiles'] = row.smiles
    
    return df

In [99]:
pd.concat(redf.head(1).apply(apply_func, 1).values)

Unnamed: 0,atom_index,buried_vol,smiles
0,0,0.87832,[CH2]C1=COCC1
1,1,0.252866,[CH2]C1=COCC1
2,2,0.167195,[CH2]C1=COCC1
3,3,0.977913,[CH2]C1=COCC1
4,4,0.362401,[CH2]C1=COCC1
5,5,0.923857,[CH2]C1=COCC1


In [100]:
import dask.dataframe as dd
redf_dask = dd.from_pandas(redf, npartitions=10*n_processes)

In [101]:
meta = pd.DataFrame([[1, .1, 'test']], columns=['atom_index', 'buried_vol', 'smiles'])
results = redf_dask.map_partitions(lambda x: pd.concat(x.apply(apply_func, 1).values), meta=meta)

This is the big computation. you can monitor it by going to http://localhost:1235/proxy/37203/status (replace 1235 with whatever your local jupyterlab tunnel is routed to, and 37203 with the port you get in the second cell). More info here: https://jobqueue.dask.org/en/latest/interactive.html

In [102]:
finished = results.compute()

Honestly even ~8 millions floats isn't a ton of data. There's no real need for the SQL database if we're not using it's transactional capabilities, so we can just save this to a zipped CSV file.

In [None]:
finished.to_csv('buried_volumes.csv.gz', compression='gzip')

In [104]:
cluster.close()

distributed.client - ERROR - Failed to reconnect to scheduler after 10.00 seconds, closing client
_GatheringFuture exception was never retrieved
future: <_GatheringFuture finished exception=CancelledError()>
concurrent.futures._base.CancelledError
