In [1]:
from dask.distributed import Client, LocalCluster
import dask.dataframe as dd

cluster = LocalCluster()
cluster.adapt(minimum=1, maximum=8)
client = Client(cluster)

In [3]:
from pathlib import Path
from qm9.main import download_qm9_data, make_dataset, data_root

file_path: Path | str = data_root.joinpath("data")
manifest_file: str = "manifest.csv"
if not file_path.exists():
    make_dataset(file_path, manifest_file)
manifest_path = file_path.joinpath(manifest_file)

## Pandas DataFrame

In [4]:
from timeit import default_timer as timer
import pandas as pd
from qm9.main import make_fingerprint_feature

def pandas_worker():
    df = pd.read_csv(manifest_path)
    results = df['smiles'].apply(make_fingerprint_feature)
    print(results)
    
t = timer()
results = pandas_worker()
et = timer() - t
print(f"elapsed time: {et:.3f} secs")


0                                  4P///wAQAAABAAAAwTE9Cg==
1                                  4P///wAQAAABAAAAiQx1Lw==
2                                  4P///wAQAAABAAAAWQqlMQ==
3                                  4P///wAQAAACAAAAfQ/NGrEP
4                              4P///wAQAAADAAAApSWlBFEFXQg=
                                ...                        
130181    4P///wAQAAAVAAAA1QGBAVoRAtUCkQFCUQA0tA55A60HCQ...
130182    4P///wAQAAAWAAAALikG2uoEkQLYqQAAxQddAt0BSNaY2Q...
130183    4P///wAQAAATAAAAuQElAXUABAEEMQDBAwSxCAUJjBi4UK...
130184    4P///wAQAAAWAAAAXQY0RgSurQKhAwDFBzkBMQAhAj7Q2Q...
130185    4P///wAQAAAXAAAAWQcEbECtAqBdAX4AEQnZBApIvQSKEh...
Name: smiles, Length: 130186, dtype: object
elapsed time: 10.805 secs


In [12]:
def dask_worker():
    ddf = dd.read_csv(manifest_path)
    ddf = ddf.repartition(npartitions=8)
    futures = ddf.smiles.apply(make_fingerprint_feature, meta=pd.Series(dtype=str))
    results = futures.compute()
    return results

t = timer()
results = dask_worker()
et = timer() - t
print(f"elapsed time: {et:.3f} secs")
results

elapsed time: 5.517 secs


0                                  4P///wAQAAABAAAAwTE9Cg==
1                                  4P///wAQAAABAAAAiQx1Lw==
2                                  4P///wAQAAABAAAAWQqlMQ==
3                                  4P///wAQAAACAAAAfQ/NGrEP
4                              4P///wAQAAADAAAApSWlBFEFXQg=
                                ...                        
130181    4P///wAQAAAVAAAA1QGBAVoRAtUCkQFCUQA0tA55A60HCQ...
130182    4P///wAQAAAWAAAALikG2uoEkQLYqQAAxQddAt0BSNaY2Q...
130183    4P///wAQAAATAAAAuQElAXUABAEEMQDBAwSxCAUJjBi4UK...
130184    4P///wAQAAAWAAAAXQY0RgSurQKhAwDFBzkBMQAhAj7Q2Q...
130185    4P///wAQAAAXAAAAWQcEbECtAqBdAX4AEQnZBApIvQSKEh...
Length: 130186, dtype: object

2022-07-16 02:06:25,642 [776] ERROR Timed out during handshake while connecting to tcp://127.0.0.1:46037 after 30 s
Traceback (most recent call last):
  File "/usr/local/lib/python3.10/site-packages/tornado/iostream.py", line 867, in _read_to_buffer
    bytes_read = self.read_from_fd(buf)
  File "/usr/local/lib/python3.10/site-packages/tornado/iostream.py", line 1140, in read_from_fd
    return self.socket.recv_into(buf, len(buf))
ConnectionResetError: [Errno 104] Connection reset by peer

The above exception was the direct cause of the following exception:

Traceback (most recent call last):
  File "/usr/local/lib/python3.10/site-packages/distributed/comm/core.py", line 328, in connect
    handshake = await asyncio.wait_for(comm.read(), time_left())
  File "/usr/local/lib/python3.10/asyncio/tasks.py", line 445, in wait_for
    return fut.result()
  File "/usr/local/lib/python3.10/site-packages/distributed/comm/tcp.py", line 239, in read
    convert_stream_closed_error(self, e)
  File 