feat: add scanner.plan_splits function#5792
feat: add scanner.plan_splits function#5792hamersaw wants to merge 6 commits intolance-format:mainfrom
Conversation
Signed-off-by: Daniel Rammer <hamersaw@protonmail.com>
Signed-off-by: Daniel Rammer <hamersaw@protonmail.com>
Signed-off-by: Daniel Rammer <hamersaw@protonmail.com>
Signed-off-by: Daniel Rammer <hamersaw@protonmail.com>
|
Before I plumb this through to the Lance Spark connector just wanted to get some input from interested parties: @majin1102 / @fangbo in this thread I know you expressed interest in a solution. This does currently work with zone maps. @fangbo you'll recognize a large bit of code from your PR - thanks! @Jay-ju IIUC your PR here is targeted at estimating row counts to achieve similar ends. I really like the idea of index hinting, as in my testing I noticed filtering index choices were not always what I expected them to be. |
python/python/lance/dataset.py
Outdated
| return self._scanner.analyze_plan() | ||
|
|
||
| def plan_splits( | ||
| self, max_split_size_bytes: Optional[int] = None |
There was a problem hiding this comment.
Will need to update this to include both max_split_size_bytes and max_row_count options, with one trumping the other if both are provided. I'm interested if people think this paradigm is useful? My intuition is that since we are estimating row sizes based on the schema that we could be VERY wrong (just using 64B for everything that is not known size - string / blob could be 1B - 1M+). In these scenarios a user will know their data better and can use a max_row_count to target a partition size. So basically, hopefully most use-cases we're close and estimation works well, but there are knobs to fine-tune in the other cases.
…nd use the min if both provided Signed-off-by: Daniel Rammer <hamersaw@protonmail.com>
| } else { | ||
| Arc::new(self.dataset.fragments().as_ref().clone()) | ||
| }; | ||
|
|
There was a problem hiding this comment.
does this change need to take care of the scanner range e.g. scan_range_before_filter?
There was a problem hiding this comment.
Great point. I'm going to have to look into this, but I think it probably should!
Signed-off-by: Daniel Rammer <hamersaw@protonmail.com>
|
closing in favor of #5863 |
This PR adds a
plan_splitsfunction the toScannerstruct. The goal is that this serves as a singular endpoint where distributed compute frameworks can effectively partition a Lance dataset for parallelized processing. The main goals are:(1) Prune fragments that do not satisfy a filter (if exists): We use an index lookup to determine which fragments contain rows (and which do not) to prune unnecessary fragments.
(2) Bin pack fragments into spiits: Distributed compute frameworks typically work best with a "sweet-spot" partition size. Within Lance, this means a partition should typically contain multiple fragments. We expose a user configurable strategy, namely max row count or split size, and then estimate row sizes based on the schema to determine the size of the resultant split.