From 3bed3357d6a2d305d69a4df414c5756bf8f88691 Mon Sep 17 00:00:00 2001 From: Lei Xu Date: Mon, 1 May 2023 21:08:50 -0700 Subject: [PATCH] do not use lifetime on FileWriter --- rust/src/dataset.rs | 10 +++--- rust/src/encodings/plain.rs | 14 +++++--- rust/src/index/vector/graph/persisted.rs | 2 +- rust/src/io/reader.rs | 22 +++++++----- rust/src/io/writer.rs | 45 ++++++++++++++---------- 5 files changed, 55 insertions(+), 38 deletions(-) diff --git a/rust/src/dataset.rs b/rust/src/dataset.rs index 3dfafc06fe4..42598cae521 100644 --- a/rust/src/dataset.rs +++ b/rust/src/dataset.rs @@ -83,16 +83,16 @@ impl From<&Manifest> for Version { } /// Create a new [FileWriter] with the related `data_file_path` under ``. -async fn new_file_writer<'a>( - object_store: &'a ObjectStore, +async fn new_file_writer( + object_store: &ObjectStore, data_file_path: &str, - schema: &'a Schema, -) -> Result> { + schema: &Schema, +) -> Result { let full_path = object_store .base_path() .child(DATA_DIR) .child(data_file_path); - FileWriter::try_new(object_store, &full_path, schema).await + FileWriter::try_new(object_store, &full_path, schema.clone()).await } /// Get the manifest file path for a version. diff --git a/rust/src/encodings/plain.rs b/rust/src/encodings/plain.rs index 45ab86e0da8..41f3e305d5e 100644 --- a/rust/src/encodings/plain.rs +++ b/rust/src/encodings/plain.rs @@ -715,7 +715,7 @@ mod tests { false, )])); let schema = Schema::try_from(arrow_schema.as_ref()).unwrap(); - let mut file_writer = FileWriter::try_new(&store, &path, &schema).await.unwrap(); + let mut file_writer = FileWriter::try_new(&store, &path, schema).await.unwrap(); let array = Int32Array::from_iter_values(0..1000); @@ -746,7 +746,7 @@ mod tests { false, )])); let schema = Schema::try_from(arrow_schema.as_ref()).unwrap(); - let mut file_writer = FileWriter::try_new(&store, &path, &schema).await.unwrap(); + let mut file_writer = FileWriter::try_new(&store, &path, schema).await.unwrap(); let array = BooleanArray::from((0..120).map(|v| v % 5 == 0).collect::>()); for i in 0..10 { @@ -775,7 +775,7 @@ mod tests { false, )])); let schema = Schema::try_from(arrow_schema.as_ref()).unwrap(); - let mut file_writer = FileWriter::try_new(&store, &path, &schema).await.unwrap(); + let mut file_writer = FileWriter::try_new(&store, &path, schema).await.unwrap(); for i in (0..100).step_by(4) { let slice: FixedSizeListArray = fixed_size_list.slice(i, 4); @@ -806,7 +806,9 @@ mod tests { false, )])); let schema = Schema::try_from(arrow_schema.as_ref()).unwrap(); - let mut file_writer = FileWriter::try_new(&store, &path, &schema).await.unwrap(); + let mut file_writer = FileWriter::try_new(&store, &path, schema.clone()) + .await + .unwrap(); let array = BooleanArray::from((0..120).map(|v| v % 5 == 0).collect::>()); let batch = @@ -835,7 +837,9 @@ mod tests { false, )])); let schema = Schema::try_from(arrow_schema.as_ref()).unwrap(); - let mut file_writer = FileWriter::try_new(&store, &path, &schema).await.unwrap(); + let mut file_writer = FileWriter::try_new(&store, &path, schema.clone()) + .await + .unwrap(); let array = BooleanArray::from((0..5000).map(|v| v % 5 == 0).collect::>()); let batch = diff --git a/rust/src/index/vector/graph/persisted.rs b/rust/src/index/vector/graph/persisted.rs index adbd8c42bd9..5b5aff4d967 100644 --- a/rust/src/index/vector/graph/persisted.rs +++ b/rust/src/index/vector/graph/persisted.rs @@ -219,7 +219,7 @@ pub(crate) async fn write_graph( ])); let schema = Schema::try_from(arrow_schema.as_ref())?; - let mut writer = FileWriter::try_new(object_store, path, &schema).await?; + let mut writer = FileWriter::try_new(object_store, path, schema).await?; for nodes in graph.nodes.as_slice().chunks(params.batch_size) { let mut vertex_builder = FixedSizeBinaryBuilder::with_capacity(nodes.len(), binary_size as i32); diff --git a/rust/src/io/reader.rs b/rust/src/io/reader.rs index 86423cba8e8..8167df7e703 100644 --- a/rust/src/io/reader.rs +++ b/rust/src/io/reader.rs @@ -591,7 +591,7 @@ mod tests { let path = Path::from("/foo"); // Write 10 batches. - let mut file_writer = FileWriter::try_new(&store, &path, &schema).await.unwrap(); + let mut file_writer = FileWriter::try_new(&store, &path, schema).await.unwrap(); for batch_id in 0..10 { let value_range = batch_id * 10..batch_id * 10 + 10; let columns: Vec = vec![ @@ -668,7 +668,7 @@ mod tests { } schema.set_dictionary(&batches[0]).unwrap(); - let mut file_writer = FileWriter::try_new(&store, &path, &schema).await.unwrap(); + let mut file_writer = FileWriter::try_new(&store, &path, schema).await.unwrap(); for batch in batches.iter() { file_writer.write(&[&batch]).await.unwrap(); } @@ -722,7 +722,7 @@ mod tests { )])); let batch = RecordBatch::try_new(arrow_schema.clone(), vec![struct_arr]).unwrap(); - let mut file_writer = FileWriter::try_new(&store, &path, &schema).await.unwrap(); + let mut file_writer = FileWriter::try_new(&store, &path, schema).await.unwrap(); file_writer.write(&[&batch]).await.unwrap(); file_writer.finish().await.unwrap(); @@ -766,7 +766,7 @@ mod tests { .collect::>(); let batches_ref = batches.iter().collect::>(); - let mut file_writer = FileWriter::try_new(&store, &path, &schema).await.unwrap(); + let mut file_writer = FileWriter::try_new(&store, &path, schema).await.unwrap(); file_writer.write(batches_ref.as_slice()).await.unwrap(); file_writer.finish().await.unwrap(); @@ -786,7 +786,7 @@ mod tests { let schema: Schema = Schema::try_from(arrow_schema.as_ref()).unwrap(); let batch = RecordBatch::try_new(arrow_schema.clone(), vec![struct_array.clone()]).unwrap(); - let mut file_writer = FileWriter::try_new(&store, &path, &schema).await.unwrap(); + let mut file_writer = FileWriter::try_new(&store, &path, schema).await.unwrap(); file_writer.write(&[&batch]).await.unwrap(); file_writer.finish().await.unwrap(); @@ -964,7 +964,9 @@ mod tests { // write to a lance file let store = ObjectStore::memory(); let path = Path::from("/takes"); - let mut file_writer = FileWriter::try_new(&store, &path, &schema).await.unwrap(); + let mut file_writer = FileWriter::try_new(&store, &path, schema.clone()) + .await + .unwrap(); file_writer.write(&[&batch]).await.unwrap(); file_writer.finish().await.unwrap(); @@ -1061,7 +1063,9 @@ mod tests { let store = ObjectStore::memory(); let path = Path::from("/take_list"); let schema: Schema = (&arrow_schema).try_into().unwrap(); - let mut file_writer = FileWriter::try_new(&store, &path, &schema).await.unwrap(); + let mut file_writer = FileWriter::try_new(&store, &path, schema.clone()) + .await + .unwrap(); file_writer.write(&[&batch]).await.unwrap(); file_writer.finish().await.unwrap(); @@ -1129,7 +1133,7 @@ mod tests { .unwrap(); let schema: Schema = (&arrow_schema).try_into().unwrap(); - let mut file_writer = FileWriter::try_new(&store, &path, &schema).await.unwrap(); + let mut file_writer = FileWriter::try_new(&store, &path, schema).await.unwrap(); file_writer.write(&[&batch.clone()]).await.unwrap(); file_writer.finish().await.unwrap(); @@ -1153,7 +1157,7 @@ mod tests { // write to a lance file let store = ObjectStore::memory(); let path = Path::from("/read_range"); - let mut file_writer = FileWriter::try_new(&store, &path, &schema).await.unwrap(); + let mut file_writer = FileWriter::try_new(&store, &path, schema).await.unwrap(); file_writer.write(&[&batch]).await.unwrap(); file_writer.finish().await.unwrap(); diff --git a/rust/src/io/writer.rs b/rust/src/io/writer.rs index 004b23ff358..2981455218d 100644 --- a/rust/src/io/writer.rs +++ b/rust/src/io/writer.rs @@ -101,20 +101,20 @@ pub async fn write_manifest( /// // Need to close file writer to flush buffer and footer. /// file_writer.shutdown(); /// ``` -pub struct FileWriter<'a> { +pub struct FileWriter { object_writer: ObjectWriter, - schema: &'a Schema, + schema: Schema, batch_id: i32, page_table: PageTable, metadata: Metadata, } -impl<'a> FileWriter<'a> { +impl FileWriter { pub async fn try_new( object_store: &ObjectStore, path: &Path, - schema: &'a Schema, - ) -> Result> { + schema: Schema, + ) -> Result { let object_writer = object_store.create(path).await?; Ok(Self { object_writer, @@ -129,18 +129,27 @@ impl<'a> FileWriter<'a> { /// All RecordBatch will be treated as one RecordBatch on disk /// /// Returns [Err] if the schema does not match with the batch. - pub async fn write(&mut self, batch: &[&RecordBatch]) -> Result<()> { - for field in self.schema.fields.iter() { - let arrs: Result> = batch + pub async fn write(&mut self, batches: &[&RecordBatch]) -> Result<()> { + // Copy a list of fields to avoid borrow checker error. + let fields = self + .schema + .fields + .iter() + .map(|f| f.clone()) + .collect::>(); + for field in fields.iter() { + let arrs = batches .iter() - .map(|b| { - let column_id = b.schema().index_of(&field.name)?; - Ok(b.column(column_id)) + .map(|batch| { + batch.column_by_name(&field.name).ok_or_else(|| { + Error::IO(format!("FileWriter::write: Field {} not found", field.name)) + }) }) - .collect(); - self.write_array(field, arrs?.as_slice()).await?; + .collect::>>()?; + + self.write_array(field, &arrs).await?; } - let batch_length = batch.iter().map(|b| b.num_rows() as i32).sum(); + let batch_length = batches.iter().map(|b| b.num_rows() as i32).sum(); self.metadata.push_batch_length(batch_length); self.batch_id += 1; Ok(()) @@ -346,7 +355,7 @@ impl<'a> FileWriter<'a> { self.metadata.page_table_position = pos; // Step 2. Write manifest and dictionary values. - let mut manifest = Manifest::new(self.schema, Arc::new(vec![])); + let mut manifest = Manifest::new(&self.schema, Arc::new(vec![])); let pos = write_manifest(&mut self.object_writer, &mut manifest, None).await?; // Step 3. Write metadata. @@ -562,7 +571,7 @@ mod tests { let store = ObjectStore::memory(); let path = Path::from("/foo"); - let mut file_writer = FileWriter::try_new(&store, &path, &schema).await.unwrap(); + let mut file_writer = FileWriter::try_new(&store, &path, schema).await.unwrap(); file_writer.write(&[&batch]).await.unwrap(); file_writer.finish().await.unwrap(); @@ -592,7 +601,7 @@ mod tests { let store = ObjectStore::memory(); let path = Path::from("/foo"); - let mut file_writer = FileWriter::try_new(&store, &path, &schema).await.unwrap(); + let mut file_writer = FileWriter::try_new(&store, &path, schema).await.unwrap(); file_writer.write(&[&batch]).await.unwrap(); file_writer.finish().await.unwrap(); @@ -627,7 +636,7 @@ mod tests { let schema = Schema::try_from(arrow_schema.as_ref()).unwrap(); let store = ObjectStore::memory(); let path = Path::from("/foo"); - let mut file_writer = FileWriter::try_new(&store, &path, &schema).await.unwrap(); + let mut file_writer = FileWriter::try_new(&store, &path, schema).await.unwrap(); file_writer.write(&[&batch]).await.unwrap(); file_writer.finish().await.unwrap();