Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add dask_memusage.install introduce "ValueError: Inputs contain futures that were created by another client." #9

Open
CAROLZXYZXY opened this issue Apr 24, 2021 · 2 comments

Comments

@CAROLZXYZXY
Copy link

CAROLZXYZXY commented Apr 24, 2021

Thank you for the wonderful tool!

I would like to profile peak memory of my dask application. I can run it successfully without dask_memusage.
However, after I add memusage.install, it causes "ValueError: Inputs contain futures that were created by another client."
I use dask-memusage v1.1, dask-core v2021.3.0.

Attached my chunk of code here:

import dask_memusage
import gc
from utility import get_batch_index
from dask.distributed import Client, LocalCluster
from sklearn.neighbors import NearestNeighbors


CLUSTER_KWARGS = {
    'n_workers': 4,
    'threads_per_worker': 1,
    'processes': False,
    'memory_limit': '8GB',
}

cluster = LocalCluster(**CLUSTER_KWARGS)
dask_memusage.install(cluster.scheduler, 'memory_stats/memusage.csv')

def kNN_graph(X, key_index, ref_index, n_neighbors=10):
    gc.collect()
    nbrs = NearestNeighbors(n_neighbors=n_neighbors).fit(X[ref_index[0]:ref_index[1], :])
    distance, indices = nbrs.kneighbors(X[key_index[0]:key_index[1], :])
    return (distance, indices)


contamination = 0.1  # percentage of outliers
n_train = args.n_train  # number of training points
n_test = 1000  # number of testing points
n_features = args.dim
    
# Generate sample data
X_train, y_train, X_test, y_test = \
    generate_data(n_train=n_train,
                  n_test=n_test,
                  n_features=n_features,
                  contamination=contamination,
                  random_state=42)

k = 5
batch_size = 5000
n_samples = n_train


start = time.time()
batch_index = get_batch_index(n_samples=n_samples, batch_size=batch_size)
n_batches = len(batch_index)

# save the intermediate results
full_list = []

# scatter the data
future_X = client.scatter(X_train)

delayed_knn =  delayed(kNN_graph)

for i, index_A in enumerate(batch_index):
    for j, index_B in enumerate(batch_index):
        full_list.append(delayed_knn(future_X, index_A, index_B, k))
        
full_list = dask.compute(full_list)

@itamarst
Copy link
Owner

Hi,

Sorry I missed this. I notice you have both cluster and client, I suspect you want only one or the other.

@itamarst
Copy link
Owner

I will try to clarify the documentation with examples for both cases.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants