Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

workers exceeding max_mem setting #100

Open
rsignell-usgs opened this issue Oct 4, 2021 · 20 comments
Open

workers exceeding max_mem setting #100

rsignell-usgs opened this issue Oct 4, 2021 · 20 comments

Comments

@rsignell-usgs
Copy link
Member

A colleague and I were struggling to rechunk a zarr dataset using this workflow: https://nbviewer.jupyter.org/gist/rsignell-usgs/89b61c3dc53d5107e70cf5574fc3c833

After much trial and error, we discovered that we needed to increase the worker size to 8GB and decrease max_mem to 3GB to avoid workers running out of memory and the cluster dying with "killed_worker".

Watching the dask dashboard shows a number of the workers spiking over 5GB, despite setting max_mem to 3GB:
2021-10-04_16-16-26

When we looked at the worker logs we saw tons of these warnings:

distributed.worker - WARNING - Unmanaged memory use is high. This may indicate a memory leak or the memory may not be released to the OS; see https://distributed.dask.org/en/latest/worker.html#memtrim for more information. -- Unmanaged memory: 5.66 GiB -- Worker memory limit: 8.00 GiB

Is this expected behavior?

@rabernat
Copy link
Member

rabernat commented Oct 4, 2021

Thanks for reporting Rich!

This ability to track unmanaged memory is new for Dask. So overall this is progress. At least we know that rechunker is not explicitly using this memory, as it is designed not to!

I would try with the trick discussed here: dask/distributed#2602 (comment)

Set {"MALLOC_TRIM_THRESHOLD_": "0"} in the environment variables on your dask workers. See if that improves things.

@rsignell-usgs
Copy link
Member Author

rsignell-usgs commented Oct 5, 2021

Thanks @rabernat for the idea.

I'm not sure I enabled it correctly, however.

I added this line to my notebook just after I imported dask, before I created the cluster:

dask.config.set({"MALLOC_TRIM_THRESHOLD_": "0"})

Didn't seem to have any impact -- I'm getting the same behavior as before, with memory going way above 3GB and lots of the same unmanaged memory warnings in the logs.

Do I need a dask worker plugin or something?

@rabernat
Copy link
Member

rabernat commented Oct 5, 2021

Yeah that's not right. You need to set an environment variable on the workers. The way you do this depends on how you are creating your dask cluster.

How are you creating your dask cluster? Dask gateway?

@rsignell-usgs
Copy link
Member Author

rsignell-usgs commented Oct 5, 2021

@rabernat, yes, Dask Gateway. But the Dask Gateway on Qhub for some reason is not configured to take a dict of environment variables on cluster creation (right @dharhas?)

So this time I created the cluster and then did client.run() on the workers to set that environment variable on the Dask workers:

def set_env(k,v):
    import os
    os.environ[k]=v
    
client.run(set_env,'MALLOC_TRIM_THRESHOLD_','0')

but in the worker logs I still see lots of:

distributed.worker - INFO - Run out-of-band function 'set_env'

distributed.worker - WARNING - Unmanaged memory use is high. This may indicate a memory leak or the memory may not be released to the OS; see https://distributed.dask.org/en/latest/worker.html#memtrim for more information. -- Unmanaged memory: 5.68 GiB -- Worker memory limit: 8.00 GiB

BTW, the workflow should be reproducible, as we are reading from an S3 requester-pays bucket. One would need to supply one's own S3 bucket for writing, of course.

@rabernat
Copy link
Member

rabernat commented Oct 5, 2021

client.run(set_env,'MALLOC_TRIM_THRESHOLD_','0')

Still not the right way to do it. You need to use cluster options (as in the notebook linked from the issue linked above).

from dask_gateway import Gateway
g = Gateway()
options = g.cluster_options()
options.environment = {"MALLOC_TRIM_THRESHOLD_": "0"}
cluster = g.new_cluster(options)

@dharhas
Copy link

dharhas commented Oct 5, 2021

@rsignell-usgs you can set environment variables on dask_gateway in QHub via environment_vars kwarg but looks like you need to upgrade to qhub 0.3.12

@martindurant
Copy link

Would it be fair to say that max_mem, which defines the upper limit of a chunk memory footprint, is expected to be much smaller than the actual peak memory usage? There will always be temporary arrays allocated and bytes objects during remote writes.

@rabernat
Copy link
Member

rabernat commented Oct 6, 2021

Would it be fair to say that max_mem, which defines the upper limit of a chunk memory footprint, is expected to be much smaller than the actual peak memory usage? There will always be temporary arrays allocated and bytes objects during remote writes.

Yes, I think that's correct. But here I think we have a bigger problem related to garbage collection.

@rsignell-usgs
Copy link
Member Author

Quansight is releasing a new version of Qhub later this week, at which point I will upgrade the ESIP qhub, and we will have a much easier way to set environment variables on the workers, which will facilitate trying out {"MALLOC_TRIM_THRESHOLD_": "0"} idea.

@fmaussion
Copy link
Member

I'm struggling a lot with distributed.worker - WARNING - Unmanaged memory use is high. This may indicate a memory leak at the moment.

Before I report the issue in more detail, I wanted to try this MALLOC_TRIM_THRESHOLD_ method - does someone know how I can set in on a LocalCluster instance? The dask doc is not particularly helpful and I don't need dask_gateway as in your examples @rabernat (which, btw, also seem to fail on recent dask_gateway as far as I could try with a local gateway server)

Thanks a lot!

@rabernat
Copy link
Member

rabernat commented May 2, 2022

The details of how I was setting MALLOC_TRIM_THRESHOLD_: dask/distributed#2602 (comment)

It really does seem to be a magic flag for Dask. I don't know how to set it on a LocalCluster.

@fmaussion
Copy link
Member

Thanks Ryan - I've asked on Dask Discourse and will report back.

@dcherian
Copy link

dcherian commented May 2, 2022

@fmaussion
Copy link
Member

Thanks for your help. I'll add my voice to the issues raised there. I wanted to showcase dask for my advanced programming class but this really won't help. Even the simplest of computations (that would fit in memory) are killing my local cluster: https://nbviewer.ipython.org/gist/fmaussion/5212e3155256e84e53d033e61085ca30

@dcherian
Copy link

dcherian commented May 3, 2022

Try chunking inside the open_*dataset calls by passing chunks. Otherwise, it will read the whole file into memory and then chunk and distribute it.

I guess we should say this here.

@fmaussion
Copy link
Member

Try chunking inside the open_*dataset calls by passing chunks. Otherwise, it will read the whole file into memory and then chunk and distribute it.

Well this is embarrassing - I did not know that. This saved my class if not my dignity ;-)

I guess we should say this here.

I will open a PR immediately. This should be priority number one for mf datasets. I guess this problem does not occur with single file open_datasets?

@dcherian
Copy link

dcherian commented May 3, 2022

I guess this problem does not occur with single file open_datasets?

I think it still does. "chunk as early as possible, and avoid rechunking as much as possible" is the principle in my experience.

@fmaussion
Copy link
Member

I think it still does

Why? Per default the variables are lazy loaded and not dask arrays, and a subsequent call to .chunk() will convert them?

"chunk as early as possible, and avoid rechunking as much as possible" is the principle in my experience.

This just made it to the docs ;-)

@dcherian
Copy link

dcherian commented May 3, 2022

Why? Per default the variables are lazy loaded and not dask arrays, and a subsequent call to .chunk() will convert them?

makes sense!

@NoeLahaye
Copy link

I am following on this issue (although I am not sure whether this is the correct place -- maybe this is more dask-related).
I am still struggling with this error: unmanaged memory getting very high, raising warnings and errors eventually causing the workers to be killed. I am usually using a LocalCluster or PBSCluster. Is setting MALLOC_TRIM_TRESHOLD_ to 0 still the way to go? How to do so? Beyond the issue mentioned by Ryan above, I have been going through various Dask issues (this one, seemingly fixed by that one) and parts of the doc, but my brain does not manage to reach a conclusion and get me through this.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

7 participants