-
Notifications
You must be signed in to change notification settings - Fork 6
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
odd performance degradation when row chunks are set too small #29
Comments
P.S. The reason I'm messing around with chunk size is because I'm trying to get a handle on RAM footprint with far larger (MeerKAT) datasets. These are at about 5e+9 points per plot (i.e. x50 larger than the test above). In this regime:
So, in summary: not an urgent problem (for me), but it is a nagging puzzle. The defaults are fine for my fat nodes, so I'm not touching them for now....People trying to plot MeerKAT data on smaller boxes will probably twist the chunk size knob and get slapped in the face by this. |
I'd need to inspect the actual graph, but my working hypothesis is that datashader is creating an image for each input chunk in the MS and then trying to stack them all in one go. Hence when the input MS chunks are small, many images are created, leading to OutOfMemory errors when numpy tries to create the output for the stack operation. File "/scratch/oms/projects/datashader/datashader/compiler.py", line 147, in combine
bases = tuple(np.stack(bs) for bs in zip(*base_tuples))
File "/scratch/oms/projects/datashader/datashader/compiler.py", line 147, in <genexpr>
bases = tuple(np.stack(bs) for bs in zip(*base_tuples))
File "<__array_function__ internals>", line 6, in stack
File "/home/oms/.venv/sms/lib/python3.6/site-packages/numpy/core/shape_base.py", line 433, in stack
return _nx.concatenate(expanded_arrays, axis=axis, out=out)
File "<__array_function__ internals>", line 6, in concatenate
MemoryError: Unable to allocate 106. GiB for an array with shape (1544, 900, 1280, 16) and data type int32
Error shutting down executor: 'NoneType' object is not callable Here is the datashader code: And here is my commented analysis of it: def chunk(df):
""" Function applied per input chunk """
aggs = create(shape) # Create output array
extend(aggs, df, st, bounds) # Aggregate data into output array
return aggs # Return output array
name = tokenize(df.__dask_tokenize__(), canvas, glyph, summary)
# Input graph keys, referencing input chunks
keys = df.__dask_keys__()
# Output graph keys
keys2 = [(name, i) for i in range(len(keys))]
# Map chunk function over input chunks (referred to by input keys) onto output keys
dsk = dict((k2, (chunk, k)) for (k2, k) in zip(keys2, keys))
# Call finalize(*[(combine, keys2)], **dict(...))
# This calls combine on all output chunks (referenced by keys2)
# which internally calls np.stack
dsk[name] = (apply, finalize, [(combine, keys2)],
dict(cuda=cuda, coords=axis, dims=[glyph.y_label, glyph.x_label])) Put another way, all images produced by each separate chunk node are parents of the combine node. A tree reduction is probably the standard way to resolve this, but I'm not sure of what the implications of such a reduction would be on datashader's internal API. @jbednar, would it be possible for you to provide comment here and let us know whether this is something we should raise on the datashader repo? @o-smirnov and @IanHeywood have blazed away to produce some beautiful Radio Astronomy plots that they may want to share. Our input data can be very large so they're interested in seeing how low they can set the memory budgets. |
Cool, thanks @sjperkins. That makes a lot of sense, and explains where the mysterious extra outer dimension comes from. This also suggests there may be more efficient ways of implementing our particular reduction. I'm going to fork the DS repo anyway, as I see a bug in colorize which I want to fix. We may as well play around with this while we're at it, right? |
If the following is anything like the dask Array reduction function, it should construct a reduction tree from a dataframe. https://docs.dask.org/en/latest/dataframe-api.html#dask.dataframe.Series.reduction |
Yes, that's basically it, as long as you replace "image" with "rasterized array of values" (as it's not turned into an RGB image until later, after all such output arrays are combined). Datashader's algorithms are designed for a particular situation that may not apply to what you want to do. Specifically, Datashader assumes that your chunk size is >> your output array size, which is typically a good assumption if you are rendering to a computer screen (and therefore have a relatively small total output array size) and if you choose a chunk size about as large as a worker node can handle (to minimize communication overhead). Under those conditions, it should have good performance. But what if you choose a tiny chunk size, such that the output array >> your chunk size? In that case the output array size will dominate your memory needs both per worker and (as you apparently are seeing) overall (as you're then multiplying that output array size times a very large number of chunks). I'm not sure what to suggest here, as I haven't studied your situation in detail, other than not to select a chunk size that's smaller than your output array size. Essentially, your output image resolution will determine your minimum viable worker-node memory capacity; you need to be able to hold a copy of the entire output array plus one chunk, on any worker. Trying to extend Datashader to distribute the output array as well as the input array would only be practical if you have a spatially indexed data structure, because otherwise any given chunk needs to write to anywhere on the output array. Datashader already does support spatial indexing via spatialpandas, but (a) it currently only exploits that on the input side, not on the output, (b) building the spatial index itself then has this same problem and is very time consuming, (c) you can only have a spatial index for a single pair of coordinates, so you would be limited to only one type of plot per data structure, and (d) rendering to partial arrays on the output side would take a good bit of coding. Could be done, but not something I'd attempt myself! |
Sure! But please keep any PRs fixing colorize separate from any fixing the other above issues, as colorize fixes are likely to be simple and local, unlike trying to implement a distributed output array! :-) |
Looking back up at the comments above it sounds like you might have identified a bottleneck specifically in how the output arrays are being reduced, not in the fact that there is an output array per chunk? If so, then that might indeed be something feasible to implement, though it's very tricky to get it right across CPU and GPU cases. |
@jbednar Thanks for your detailed response!
Yes, we're rasterising Radio Astronomy data, which in its raw form is stored as complex data in the frequency domain. There's a lot of it (terabytes for MeerKAT and petabytes for the future SKA. For interest's sake, producing a RA image involves gridding these complex values and Fourier Transforming the grid to produce an image but this is a separate case from what we're trying to achieve with DataShader. Here, @o-smirnov is producing plots to inspect the raw complex data (no gridding or FFT involved) which I suspect is highly categorical: a measurement is characterised by the TIME it was taken, the ANTENNA's that observed it, the FEED on each antenna, and the multiple CHANNELS/FREQUENCIES at which it was taken, amongst others.
The raw complex data has a magnitude of 1e12 -- 1e13 data points (for the moment), while the plots are far smaller (512**2 or 1024**2 @o-smirnov?). This is why we're trying to use ever smaller chunks for the raw data and running into the issue at hand.
Yes, exactly: scriptimport dask.array as da
A = da.zeros(20, chunks=1)
tree = A.sum(split_every=2)
combine = da.blockwise(np.sum, (), A, ("x",), dtype=A.dtype)
tree.visualize("tree.png")
combine.visualize("combine.png") Visually, I think datashader's current strategy is a combination strategy: Whereas in terms of optimal memory usage I am proposing a tree reduction strategy: I've personally had very good experiences with the tree reduction on our data in the dask CPU case (single node). I would expect the memory usage of the reduction to be O(T x I x F) where T is the number of threads, I is the image size and F is some fudge factor that in my experience doesn't exceed 2.0. By contrast the combination strategy would incur a memory usage of O(I x C) where C is the number of input data chunks. In the distributed case, a tree reduction should also do well by reducing data movement. I can't speak from personal experience about how the dask scheduler handles things in the constrained memory environment of a GPU. It's currently a bit unclear to me as to how a tree reduction would interact with the various datashader reduction operators and API's. ds.mean() would need to track counts throughout the reduction (I've coded this kind of thing myself), ds.any() or ds.all() would not and ds.mode() is very difficult in any parallel paradigm (but I suspect not important in the rendering case). My question would be: are the datashader operators and API's specifically coded with the combination strategy in mind, or do you think they'd be able to handle the tree reduction strategy? |
Sure @jbednar! It's basically this issue: holoviz/datashader#899, but I'm now realizing it's a bit more broad than that. I've been trying to use the new Regarding the performance issue, @sjperkins will try to implement a tree reduction on another branch, and we'll see how far that gets us. Great job, and thanks, on the whole DS framework -- it's really doing great stuff for us: |
As another note to self to be tested later. This (1e+10 points):
...blows my memory pas the 512GB on the node. With a chunk size of 10000, it's chugging along, finishing in 168s. This is a very low-IO case, since it's only reading UWVs here. |
Note that if ANTENNA1 is in group_cols or the TAQL to select out rows there'll probably be strided disk access in a TIME ordered MS. |
No, it's in neither -- it's colouring by values of The problem is the nchunks x ncanvas allocation. Any luck putting a tree reduction in? |
I think this is solved by the tree reduction in our datashader fork. Let's try to get the fork merged... |
@sjperkins here's a puzzle for you. My default row chunk size is set to 100000, and everything was rosy. Here's a little test plot with a test MS.
5 seconds, nice and smooth. Then in a fit of mischief, I set it to 1024. OK that's small and suboptimal, but it should still run, right? Except the process proceeded to chew up all my RAM slowly, then went boom:
That allocation of a (1544, 900, 1280, 16) array is especially puzzling. The last three numbers are familiar -- the datashader canvas size is (900, 1280, 16) here. I don't know where the extra 1544 dimension comes from (and it's not a number I recognize!)
And when I go back to larger chunks, the problem goes away (it wouldn't be able to allocate an array that size on my puny desktop, so clearly it's not doing it when the script runs normally!) I really don't understand how chunk size can affect this logic, yet clearly it does...
The text was updated successfully, but these errors were encountered: