Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Arrow 1 struct-based API + concurrent streams + column selections + File reader #407

Merged
merged 17 commits into from
Jan 25, 2024

Conversation

H-Plus-Time
Copy link
Contributor

I've structured the commits as feature-separated breaks (the later ones have more radical/arguable design decisions), so I can just branch off if need be.

The file reader interface doesn't seem to increase the bundle much at all, though the first draft (sans generic trait) bloats the source file by a bit.

The very last commit contains a decent compromise for wasm-bindgen objects + generic traits (similar to lifetimes, trait objects and generics on anything bindgen'd are both verboten).

All three of the TODOs in #393 should be solved, along with #81 (the dot-separated column names ended up being quote robust)

reader.clone(),
meta.clone(),
)
.with_batch_size(batch_size.clone())
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would love to have a way to "unset" a batch size, where it uses the same batch size as the input file, but not sure that exists. The current default that arrow-rs sets is 1024 I think which seems way too small.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🤔 I came across this granularity mismatch today actually, wrt data page sizes in pyarrow.

It looks like the ParquetOffsetIndex is the critical piece of metadata for sub-column-chunk IO/addressing (you actually have to explicitly request it in the ArrowReaderOptions passed into load_async, and it needs to be present in the file). That gives you [row_group][column][data_page] row offsets, byte offsets and compressed page size (the first property would be all that's needed to spit out a mode(batch_size) - the data pages are usually all exactly the same number of rows except for the last one).

Side note on that: the offset index makes a gigantic difference, and it only seems to be coming in recently to implementations (polars, pyarrow both stick to row group/column level IO). For highly targeted searches (as in <.1% of the dataset) on lopsided datasets (like multipolygon geoparquets, where one column accounts for ~95% of the file), setting a small (8x smaller than the default) page/batch size on write got me the following results:

  • Polars, pyarrow - read the filter columns, mapped hit rows to data pages, data pages to row groups -> downloaded the entire row group column chunk regardless of how small the data pages were, ignoring the offset index.
  • arrow-rs, via this very project - read the filter columns, mapped hit rows to data pages -> downloaded just the data page with hits (10% of the row group column chunk).

Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I read this through a couple times and I'm still not entirely sure I understand 😅. This is for efficient random access within a column?

)
.with_batch_size(batch_size.clone())
.with_projection(projection_mask.as_ref().unwrap_or(&ProjectionMask::all()).clone());
builder
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In future PRs it'll be nice to expose the other builder methods as well, for total user control over the reading and parsing.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yep, limit, offset, row groups (for the stream endpoints) look like the low hanging fruit.

Row selection (per-row mask, pretty sure relative to the start of the first row group) would be relatively straightforward (though probably necessary to offer a couple of ways to transfer it - proper arrow BooleanArray vs an anonymous Vec (interpreted as a bit-packed boolean array)).

Row filtering is looking like it needs brittle wrapping structs (even for Rust-Rust. JS integration will almost certainly need expression parsing (along the lines of arquero's black magic) or a DSL).

Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am not eager to add expression parsing or a DSL. For geoparquet wasm (being developed in https://github.com/geoarrow/geoarrow-rs), I think row filtering based on a bounding box is appropriate, but otherwise for now I don't want to add arbitrary row filtering if I can help it, pushing that to the end user in JS

builder
}
async fn inner_read_row_group(&self, reader: &T, meta: &ArrowReaderMetadata, batch_size: &usize, projection_mask: &Option<ProjectionMask>, i: usize) -> Result<Table> {
let builder = Self::generate_builder(reader, meta, batch_size, projection_mask);
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So here, each row group call creates its own builder? I was wondering if having a separate builder for each request would have downsides like no HTTP connection pooling.

Comment on lines 64 to 68
let buffered_stream = stream::iter((0..num_row_groups).map(move |i| {
let builder = Self::generate_builder(&reader, &meta, &batch_size, &projection_mask)
.with_row_groups(vec![i]);
builder.build().unwrap().try_collect::<Vec<_>>()
})).buffered(concurrency);
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here for concurrency you give each row group to a different builder? That's instead of passing all the row groups to a single builder, right?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yep, correct, only way to avoid rewriting ParquetRecordBatchStream (that or convince maintainers to add another with_X to the builder interface for number of prefetched row groups (effectively what the 'concurrency' parameter here is controlling, with the side effect that decoding is coupled with those prefetches).

@kylebarron
Copy link
Owner

I'm moving apartments tomorrow, going to a conference next week, then vacation the next, so I may be slow to respond! I'll get to this soon!

@kylebarron
Copy link
Owner

Ugh sorry I forgot about this pr 😭. Let me read through this again. @H-Plus-Time is this good to merge?

src/arrow1/metadata.rs Outdated Show resolved Hide resolved
@kylebarron
Copy link
Owner

oo I've never successfully pushed to a PR from a fork before! 😅

Comment on lines 336 to 337
unsafe impl Send for WrappedFile {}
unsafe impl Sync for WrappedFile {}
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we add some reasoning here, or a Safety comment? I didn't think any JS objects would be safe to transfer across threads without SharedArrayBuffer, but also it's confusing because we're not working across threads here.

I thought the messaging of oneshot::channel didn't need Send and Sync?

Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We need this because AsyncParquetFile has a Sync bound?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The AsyncFileReader trait, yeah - I've narrowed down the unsafe part to just the unsafe impl Send part (pushing shortly).

The difference between AsyncParquetFile and AsyncParquetLocalFile is the hold on the non-Send value (I've tried quite a few variations, Arc<Mutex<File>> included), the AsyncFileReader just outright bans non-Send readers (even if the async runtime is explicitly single-thread or non-workstealing).

Apparently it's possible to feature flag trait constraints, so situations like this (async traits on anything that holds a JSValue reference) could be solved by opting into a 'non_threadsafe' feature.

I'll add a safety note to both the unsafe block and the AsyncParquetLocalFile along the lines of "Don't use in multi-threaded environments".

}

#[wasm_bindgen]
pub struct AsyncParquetFile {
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fwiw in a future PR I think in the future I'd like to have AsyncParquetDataset as well, which could just be a HashMap<String, AsyncParquetFile>, and would allow managing multi-file remote Parquet stores

.boxed()
}

fn get_metadata(&mut self) -> BoxFuture<'_, parquet::errors::Result<Arc<ParquetMetaData>>> {
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It would be nice if we could deduplicate this with the HTTP metadata fetching a bit.

@kylebarron
Copy link
Owner

I'd like to get this in, then remove arrow2's implementation and work towards an 0.6 release.

I think #407 (comment) is my only pertinent question before merging. Were you able to test with a File object?

@H-Plus-Time
Copy link
Contributor Author

H-Plus-Time commented Jan 25, 2024

I'd like to get this in, then remove arrow2's implementation and work towards an 0.6 release.

I think #407 (comment) is my only pertinent question before merging. Were you able to test with a File object?

Yep, I've tested with a File object, though nothing automated. I've put in a quick manual test, but I'd be happy to add playwright tests (probably in a later PR though).

@kylebarron kylebarron merged commit d1b0237 into kylebarron:main Jan 25, 2024
7 checks passed
@kylebarron
Copy link
Owner

Thanks!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

2 participants