In [1]:
import subprocess
import tempfile
import os
import pickle
from dask.distributed import as_completed
import coiled 

def run_vsflow(filename_and_content):
    input_file_name, file_content = filename_and_content

    print("Processing:", input_file_name)
    
    try:
        with tempfile.NamedTemporaryFile(delete=False, mode="w+", suffix=".sdf") as tmp_input_file, \
             tempfile.NamedTemporaryFile(delete=False, mode="wb+", suffix=".vsdb") as tmp_output_file:

            tmp_input_file.write(file_content)
            tmp_input_file.flush()

            command = ["vsflow", "preparedb", "-i", tmp_input_file.name, "-o", tmp_output_file.name, "-s", "-can", "-np", "4"]
            result = subprocess.run(command, stdout=subprocess.PIPE, stderr=subprocess.PIPE)

            if result.returncode == 0 and os.path.getsize(tmp_output_file.name) > 0:
                print(f"vsflow executed successfully for {input_file_name}.")
                with open(tmp_output_file.name, 'rb') as f:
                    return input_file_name, pickle.dumps(f.read())
            else:
                print(f"vsflow failed for {input_file_name}. Return code: {result.returncode}")
                if result.stdout:
                    print("Output:", result.stdout.decode())
                if result.stderr:
                    print("Error:", result.stderr.decode())

    except Exception as e:
        print(f"An error occurred: {e}")

    finally:
        # Clean up temporary files
        try:
            os.remove(tmp_input_file.name)
            os.remove(tmp_output_file.name)
        except OSError as e:
            print(f"Error cleaning up temporary files: {e}")

    return input_file_name, None

In [2]:
import coiled 

coiled.create_software_environment(
    name="vsflow",
    container="ghcr.io/nathanballou/vsflow:latest",
)

In [3]:
cluster = coiled.Cluster(n_workers=10,
                         name = "testing",
                         software = "vsflow",
                         worker_options={"nthreads": 1}, #Important
                        )

client = cluster.get_client()

Output()

In [5]:
destination_directory = "outputs"
input_files = ["fda.sdf", "fda copy.sdf"] + [f"fda copy {i}.sdf" for i in range(2, 20)]

file_contents_with_names = [(file, open(file, 'r').read()) for file in input_files]

file_contents_with_names = client.scatter(file_contents_with_names)

pickle_object_futures = client.map(run_vsflow, file_contents_with_names)

for future in as_completed(pickle_object_futures):
    input_file_name, pickle_object = future.result()
    with open(os.path.join(destination_directory, os.path.splitext(input_file_name)[0] + ".vsdb"), 'wb') as out_file:
        out_file.write(pickle_object)

In [None]:
# client.restart()

In [None]:
# for pickle_object in  map(run_vsflow, file_contents_with_names):
#     input_file_name, object = pickle_object
#     with open(os.path.join(destination_directory, os.path.splitext(input_file_name)[0] + ".vsdb"), 'wb') as out_file:
#         out_file.write(object)
#     break