Merge batches from multiple datafiles in the same Fragment#815
Conversation
4d0d3e3 to
6737601
Compare
6737601 to
bf3b27a
Compare
changhiskhan
left a comment
There was a problem hiding this comment.
minor comments/questions
|
|
||
| def add_columns( | ||
| self, | ||
| value_func: Callable[[pa.RecordBatch], pa.RecordBatch], |
There was a problem hiding this comment.
so the value_func's output will be the augmented Fragment that will then get written as the new version right? Going to assume the API will improve in subsequent PRs
There was a problem hiding this comment.
so the idea is that value_func will take of matching the input data and the merge data?
There was a problem hiding this comment.
the value_func will be "the producer" of the new data. This is similar to pandas udf in spark, so it takes in a "RecordBatch" as input, and create a same size "RecordBatch" as output
| } | ||
|
|
||
| /// Create a new version of [`Dataset`] from a collection of fragments. | ||
| pub async fn create_version_from_fragments( |
There was a problem hiding this comment.
could we use this to manually create a version after distributed writes?
There was a problem hiding this comment.
Yes, this is exactly the purpose of exposing this function as pub.
| ) | ||
| .await | ||
| { | ||
| let file_fragment = FileFragment::new(dataset.clone(), frag.clone()); |
There was a problem hiding this comment.
nice the FileFragment abstraction really simplifies this code
| // TODO: use tokio::async buffer to make parallel reads. | ||
| let mut batches = vec![]; | ||
| for (reader, schema) in self.readers.iter() { | ||
| let batch = reader |
There was a problem hiding this comment.
can these readers be constructed with the schema so you don't need to pass tuples around?
There was a problem hiding this comment.
It can be done via another refactory. Currently FileReader does take the schema. Can make FileReader hold the projection schema?
There was a problem hiding this comment.
it's fine, not a blocker 🤷
No description provided.