Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 5 additions & 5 deletions rust/src/dataset.rs
Original file line number Diff line number Diff line change
Expand Up @@ -83,16 +83,16 @@ impl From<&Manifest> for Version {
}

/// Create a new [FileWriter] with the related `data_file_path` under `<DATA_DIR>`.
async fn new_file_writer<'a>(
object_store: &'a ObjectStore,
async fn new_file_writer(
object_store: &ObjectStore,
data_file_path: &str,
schema: &'a Schema,
) -> Result<FileWriter<'a>> {
schema: &Schema,
) -> Result<FileWriter> {
let full_path = object_store
.base_path()
.child(DATA_DIR)
.child(data_file_path);
FileWriter::try_new(object_store, &full_path, schema).await
FileWriter::try_new(object_store, &full_path, schema.clone()).await
}

/// Get the manifest file path for a version.
Expand Down
14 changes: 9 additions & 5 deletions rust/src/encodings/plain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -715,7 +715,7 @@ mod tests {
false,
)]));
let schema = Schema::try_from(arrow_schema.as_ref()).unwrap();
let mut file_writer = FileWriter::try_new(&store, &path, &schema).await.unwrap();
let mut file_writer = FileWriter::try_new(&store, &path, schema).await.unwrap();

let array = Int32Array::from_iter_values(0..1000);

Expand Down Expand Up @@ -746,7 +746,7 @@ mod tests {
false,
)]));
let schema = Schema::try_from(arrow_schema.as_ref()).unwrap();
let mut file_writer = FileWriter::try_new(&store, &path, &schema).await.unwrap();
let mut file_writer = FileWriter::try_new(&store, &path, schema).await.unwrap();

let array = BooleanArray::from((0..120).map(|v| v % 5 == 0).collect::<Vec<_>>());
for i in 0..10 {
Expand Down Expand Up @@ -775,7 +775,7 @@ mod tests {
false,
)]));
let schema = Schema::try_from(arrow_schema.as_ref()).unwrap();
let mut file_writer = FileWriter::try_new(&store, &path, &schema).await.unwrap();
let mut file_writer = FileWriter::try_new(&store, &path, schema).await.unwrap();

for i in (0..100).step_by(4) {
let slice: FixedSizeListArray = fixed_size_list.slice(i, 4);
Expand Down Expand Up @@ -806,7 +806,9 @@ mod tests {
false,
)]));
let schema = Schema::try_from(arrow_schema.as_ref()).unwrap();
let mut file_writer = FileWriter::try_new(&store, &path, &schema).await.unwrap();
let mut file_writer = FileWriter::try_new(&store, &path, schema.clone())
.await
.unwrap();

let array = BooleanArray::from((0..120).map(|v| v % 5 == 0).collect::<Vec<_>>());
let batch =
Expand Down Expand Up @@ -835,7 +837,9 @@ mod tests {
false,
)]));
let schema = Schema::try_from(arrow_schema.as_ref()).unwrap();
let mut file_writer = FileWriter::try_new(&store, &path, &schema).await.unwrap();
let mut file_writer = FileWriter::try_new(&store, &path, schema.clone())
.await
.unwrap();

let array = BooleanArray::from((0..5000).map(|v| v % 5 == 0).collect::<Vec<_>>());
let batch =
Expand Down
2 changes: 1 addition & 1 deletion rust/src/index/vector/graph/persisted.rs
Original file line number Diff line number Diff line change
Expand Up @@ -219,7 +219,7 @@ pub(crate) async fn write_graph<V: Vertex + Clone>(
]));
let schema = Schema::try_from(arrow_schema.as_ref())?;

let mut writer = FileWriter::try_new(object_store, path, &schema).await?;
let mut writer = FileWriter::try_new(object_store, path, schema).await?;
for nodes in graph.nodes.as_slice().chunks(params.batch_size) {
let mut vertex_builder =
FixedSizeBinaryBuilder::with_capacity(nodes.len(), binary_size as i32);
Expand Down
22 changes: 13 additions & 9 deletions rust/src/io/reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -591,7 +591,7 @@ mod tests {
let path = Path::from("/foo");

// Write 10 batches.
let mut file_writer = FileWriter::try_new(&store, &path, &schema).await.unwrap();
let mut file_writer = FileWriter::try_new(&store, &path, schema).await.unwrap();
for batch_id in 0..10 {
let value_range = batch_id * 10..batch_id * 10 + 10;
let columns: Vec<ArrayRef> = vec![
Expand Down Expand Up @@ -668,7 +668,7 @@ mod tests {
}
schema.set_dictionary(&batches[0]).unwrap();

let mut file_writer = FileWriter::try_new(&store, &path, &schema).await.unwrap();
let mut file_writer = FileWriter::try_new(&store, &path, schema).await.unwrap();
for batch in batches.iter() {
file_writer.write(&[&batch]).await.unwrap();
}
Expand Down Expand Up @@ -722,7 +722,7 @@ mod tests {
)]));
let batch = RecordBatch::try_new(arrow_schema.clone(), vec![struct_arr]).unwrap();

let mut file_writer = FileWriter::try_new(&store, &path, &schema).await.unwrap();
let mut file_writer = FileWriter::try_new(&store, &path, schema).await.unwrap();
file_writer.write(&[&batch]).await.unwrap();
file_writer.finish().await.unwrap();

Expand Down Expand Up @@ -766,7 +766,7 @@ mod tests {
.collect::<Vec<_>>();
let batches_ref = batches.iter().collect::<Vec<_>>();

let mut file_writer = FileWriter::try_new(&store, &path, &schema).await.unwrap();
let mut file_writer = FileWriter::try_new(&store, &path, schema).await.unwrap();
file_writer.write(batches_ref.as_slice()).await.unwrap();
file_writer.finish().await.unwrap();

Expand All @@ -786,7 +786,7 @@ mod tests {
let schema: Schema = Schema::try_from(arrow_schema.as_ref()).unwrap();
let batch = RecordBatch::try_new(arrow_schema.clone(), vec![struct_array.clone()]).unwrap();

let mut file_writer = FileWriter::try_new(&store, &path, &schema).await.unwrap();
let mut file_writer = FileWriter::try_new(&store, &path, schema).await.unwrap();
file_writer.write(&[&batch]).await.unwrap();
file_writer.finish().await.unwrap();

Expand Down Expand Up @@ -964,7 +964,9 @@ mod tests {
// write to a lance file
let store = ObjectStore::memory();
let path = Path::from("/takes");
let mut file_writer = FileWriter::try_new(&store, &path, &schema).await.unwrap();
let mut file_writer = FileWriter::try_new(&store, &path, schema.clone())
.await
.unwrap();
file_writer.write(&[&batch]).await.unwrap();
file_writer.finish().await.unwrap();

Expand Down Expand Up @@ -1061,7 +1063,9 @@ mod tests {
let store = ObjectStore::memory();
let path = Path::from("/take_list");
let schema: Schema = (&arrow_schema).try_into().unwrap();
let mut file_writer = FileWriter::try_new(&store, &path, &schema).await.unwrap();
let mut file_writer = FileWriter::try_new(&store, &path, schema.clone())
.await
.unwrap();
file_writer.write(&[&batch]).await.unwrap();
file_writer.finish().await.unwrap();

Expand Down Expand Up @@ -1129,7 +1133,7 @@ mod tests {
.unwrap();

let schema: Schema = (&arrow_schema).try_into().unwrap();
let mut file_writer = FileWriter::try_new(&store, &path, &schema).await.unwrap();
let mut file_writer = FileWriter::try_new(&store, &path, schema).await.unwrap();
file_writer.write(&[&batch.clone()]).await.unwrap();
file_writer.finish().await.unwrap();

Expand All @@ -1153,7 +1157,7 @@ mod tests {
// write to a lance file
let store = ObjectStore::memory();
let path = Path::from("/read_range");
let mut file_writer = FileWriter::try_new(&store, &path, &schema).await.unwrap();
let mut file_writer = FileWriter::try_new(&store, &path, schema).await.unwrap();
file_writer.write(&[&batch]).await.unwrap();
file_writer.finish().await.unwrap();

Expand Down
45 changes: 27 additions & 18 deletions rust/src/io/writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -101,20 +101,20 @@ pub async fn write_manifest(
/// // Need to close file writer to flush buffer and footer.
/// file_writer.shutdown();
/// ```
pub struct FileWriter<'a> {
pub struct FileWriter {
object_writer: ObjectWriter,
schema: &'a Schema,
schema: Schema,
batch_id: i32,
page_table: PageTable,
metadata: Metadata,
}

impl<'a> FileWriter<'a> {
impl FileWriter {
pub async fn try_new(
object_store: &ObjectStore,
path: &Path,
schema: &'a Schema,
) -> Result<FileWriter<'a>> {
schema: Schema,
) -> Result<FileWriter> {
let object_writer = object_store.create(path).await?;
Ok(Self {
object_writer,
Expand All @@ -129,18 +129,27 @@ impl<'a> FileWriter<'a> {
/// All RecordBatch will be treated as one RecordBatch on disk
///
/// Returns [Err] if the schema does not match with the batch.
pub async fn write(&mut self, batch: &[&RecordBatch]) -> Result<()> {
for field in self.schema.fields.iter() {
let arrs: Result<Vec<_>> = batch
pub async fn write(&mut self, batches: &[&RecordBatch]) -> Result<()> {
// Copy a list of fields to avoid borrow checker error.
let fields = self
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't understand why tho the code doesn't seem to modify the fields?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh, so self.schema has a immuntable reference to self, but later it needs mutable reference self to call self.write_array

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Leave a comment here, in case some Rust guru knows how to do it better later.

.schema
.fields
.iter()
.map(|f| f.clone())
.collect::<Vec<_>>();
for field in fields.iter() {
let arrs = batches
.iter()
.map(|b| {
let column_id = b.schema().index_of(&field.name)?;
Ok(b.column(column_id))
.map(|batch| {
batch.column_by_name(&field.name).ok_or_else(|| {
Error::IO(format!("FileWriter::write: Field {} not found", field.name))
})
})
.collect();
self.write_array(field, arrs?.as_slice()).await?;
.collect::<Result<Vec<_>>>()?;

self.write_array(field, &arrs).await?;
}
let batch_length = batch.iter().map(|b| b.num_rows() as i32).sum();
let batch_length = batches.iter().map(|b| b.num_rows() as i32).sum();
self.metadata.push_batch_length(batch_length);
self.batch_id += 1;
Ok(())
Expand Down Expand Up @@ -346,7 +355,7 @@ impl<'a> FileWriter<'a> {
self.metadata.page_table_position = pos;

// Step 2. Write manifest and dictionary values.
let mut manifest = Manifest::new(self.schema, Arc::new(vec![]));
let mut manifest = Manifest::new(&self.schema, Arc::new(vec![]));
let pos = write_manifest(&mut self.object_writer, &mut manifest, None).await?;

// Step 3. Write metadata.
Expand Down Expand Up @@ -562,7 +571,7 @@ mod tests {

let store = ObjectStore::memory();
let path = Path::from("/foo");
let mut file_writer = FileWriter::try_new(&store, &path, &schema).await.unwrap();
let mut file_writer = FileWriter::try_new(&store, &path, schema).await.unwrap();
file_writer.write(&[&batch]).await.unwrap();
file_writer.finish().await.unwrap();

Expand Down Expand Up @@ -592,7 +601,7 @@ mod tests {

let store = ObjectStore::memory();
let path = Path::from("/foo");
let mut file_writer = FileWriter::try_new(&store, &path, &schema).await.unwrap();
let mut file_writer = FileWriter::try_new(&store, &path, schema).await.unwrap();
file_writer.write(&[&batch]).await.unwrap();
file_writer.finish().await.unwrap();

Expand Down Expand Up @@ -627,7 +636,7 @@ mod tests {
let schema = Schema::try_from(arrow_schema.as_ref()).unwrap();
let store = ObjectStore::memory();
let path = Path::from("/foo");
let mut file_writer = FileWriter::try_new(&store, &path, &schema).await.unwrap();
let mut file_writer = FileWriter::try_new(&store, &path, schema).await.unwrap();
file_writer.write(&[&batch]).await.unwrap();
file_writer.finish().await.unwrap();

Expand Down