Skip to content

Commit

Permalink
fix clippy
Browse files Browse the repository at this point in the history
Signed-off-by: zenghua <huazeng@dmetasoul.com>
  • Loading branch information
zenghua committed May 20, 2024
1 parent c41f4c3 commit ff506fa
Show file tree
Hide file tree
Showing 8 changed files with 30 additions and 30 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,7 @@ impl FileFormat for LakeSoulMetaDataParquetFormat {
let mut column_nullable = HashSet::<String>::new();

for config in &flatten_conf {
let (partition_desc, partition_columnar_value) = partition_desc_from_file_scan_config(&config)?;
let (partition_desc, partition_columnar_value) = partition_desc_from_file_scan_config(config)?;
let partition_columnar_value = Arc::new(partition_columnar_value);

let parquet_exec = Arc::new(ParquetExec::new(config.clone(), predicate.clone(), self.parquet_format.metadata_size_hint(state.config_options())));
Expand Down
10 changes: 5 additions & 5 deletions rust/lakesoul-datafusion/src/datasource/table_provider.rs
Original file line number Diff line number Diff line change
Expand Up @@ -180,15 +180,15 @@ impl LakeSoulTableProvider {
},
})
} else {
return Err(DataFusionError::Plan(
Err(DataFusionError::Plan(
// Return an error if schema of the input query does not match with the table schema.
format!("Expected single column references in output_ordering, got {}", expr)
));
))
}
} else {
return Err(DataFusionError::Plan(
Err(DataFusionError::Plan(
format!("Expected Expr::Sort in output_ordering, but got {}", expr)
));
))
}
})
.collect::<Result<Vec<_>>>()?;
Expand Down Expand Up @@ -314,7 +314,7 @@ impl TableProvider for LakeSoulTableProvider {
None
};

let object_store_url = if let Some(url) = self.listing_table.table_paths().get(0) {
let object_store_url = if let Some(url) = self.listing_table.table_paths().first() {
url.object_store()
} else {
return Ok(Arc::new(EmptyExec::new(false, Arc::new(Schema::empty()))));
Expand Down
2 changes: 1 addition & 1 deletion rust/lakesoul-datafusion/src/lakesoul_table/helpers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,7 @@ where
I: IntoIterator<Item = &'a str>,
{
let mut part_values = vec![];
for (part, pn) in partition_desc.split(",").zip(table_partition_cols) {
for (part, pn) in partition_desc.split(',').zip(table_partition_cols) {
match part.split_once('=') {
Some((name, val)) if name == pn => part_values.push(val),
_ => {
Expand Down
18 changes: 9 additions & 9 deletions rust/lakesoul-datafusion/src/planner/physical_planner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -62,17 +62,17 @@ impl PhysicalPlanner for LakeSoulPhysicalPlanner {
Ok(provider) => {
let physical_input = self.create_physical_plan(input, session_state).await?;

if lakesoul_table.primary_keys().is_empty() {
if !lakesoul_table
if lakesoul_table.primary_keys().is_empty()
&& !lakesoul_table
.schema()
.logically_equivalent_names_and_types(&Schema::from(input.schema().as_ref()))
{
return Err(DataFusionError::Plan(
// Return an error if schema of the input query does not match with the table schema.
"Inserting query must have the same schema with the table.".to_string(),
));
}
}
{
return Err(DataFusionError::Plan(
// Return an error if schema of the input query does not match with the table schema.
"Inserting query must have the same schema with the table.".to_string(),
));
}

let physical_input = if !lakesoul_table.primary_keys().is_empty() || !lakesoul_table.range_partitions().is_empty() {
let input_schema = physical_input.schema();
let input_dfschema = input.as_ref().schema();
Expand Down
4 changes: 2 additions & 2 deletions rust/lakesoul-io-c/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -877,9 +877,9 @@ pub extern "C" fn apply_partition_filter(
let dst = unsafe {
slice::from_raw_parts(filter_addr as *const u8, filter_len as usize)
};
let filter = Plan::decode(&*dst).unwrap();
let filter = Plan::decode(dst).unwrap();

let ffi_schema = schema_addr as *mut FFI_ArrowSchema;
let ffi_schema = schema_addr;
let schema_data = unsafe {
std::ptr::replace(ffi_schema, FFI_ArrowSchema::empty())
};
Expand Down
10 changes: 5 additions & 5 deletions rust/lakesoul-io/src/helpers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -76,13 +76,13 @@ pub fn get_columnar_values(batch: &RecordBatch, range_partitions: Arc<Vec<String
range_partitions
.iter()
.map(|range_col| {
if let Some(array) = batch.column_by_name(&range_col) {
if let Some(array) = batch.column_by_name(range_col) {
match ScalarValue::try_from_array(array, 0) {
Ok(scalar) => Ok((range_col.clone(), scalar)),
Err(e) => Err(e)
}
} else {
Err(datafusion::error::DataFusionError::External(format!("").into()))
Err(datafusion::error::DataFusionError::External(format!("Invalid partition desc of {}", range_col).into()))
}
})
.collect::<datafusion::error::Result<Vec<_>>>()
Expand Down Expand Up @@ -132,7 +132,7 @@ pub fn partition_desc_to_scalar_values(schema: SchemaRef, partition_desc: String
Ok(vec![])
} else {
let mut part_values = Vec::with_capacity(schema.fields().len());
for part in partition_desc.split(",") {
for part in partition_desc.split(',') {
match part.split_once('=') {
Some((name, val)) => {
part_values.push((name, val));
Expand Down Expand Up @@ -169,7 +169,7 @@ pub fn partition_desc_from_file_scan_config(
.iter()
.enumerate()
.map(|(idx, col)| {
format!("{}={}", col.name().clone(), file.partition_values[idx].to_string())
format!("{}={}", col.name().clone(), file.partition_values[idx])
})
.collect::<Vec<_>>()
.join(","),
Expand Down Expand Up @@ -325,7 +325,7 @@ fn batch_from_partition(wrapper: &JniWrapper, schema: SchemaRef, index_field: Fi
let mut fields_with_index = schema
.all_fields()
.into_iter()
.map(|f| f.clone())
.cloned()
.collect::<Vec<_>>();
fields_with_index.push(index_field);
let schema_with_index = SchemaRef::new(Schema::new(fields_with_index));
Expand Down
5 changes: 3 additions & 2 deletions rust/lakesoul-io/src/lakesoul_writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -491,6 +491,7 @@ impl AsyncBatchWriter for SortAsyncWriter {
}
}

type PartitionedWriterInfo = Arc<Mutex<HashMap<String, (Vec<String>, u64)>>>;

impl PartitioningAsyncWriter {
pub fn try_new(
Expand Down Expand Up @@ -625,7 +626,7 @@ impl PartitioningAsyncWriter {
config_builder: LakeSoulIOConfigBuilder,
range_partitions: Arc<Vec<String>>,
write_id: String,
partitioned_file_path_and_row_count: Arc<Mutex<HashMap<String, (Vec<String>, u64)>>>,
partitioned_file_path_and_row_count: PartitionedWriterInfo,
) -> Result<u64> {
let mut data = input.execute(partition, context.clone())?;
let schema_projection_excluding_range =
Expand Down Expand Up @@ -715,7 +716,7 @@ impl PartitioningAsyncWriter {

async fn await_and_summary(
join_handles: Vec<JoinHandle<Result<u64>>>,
partitioned_file_path_and_row_count: Arc<Mutex<HashMap<String, (Vec<String>, u64)>>>,
partitioned_file_path_and_row_count: PartitionedWriterInfo,
) -> Result<Vec<u8>> {
let _ =
futures::future::join_all(join_handles)
Expand Down
9 changes: 4 additions & 5 deletions rust/lakesoul-io/src/repartition/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -152,8 +152,7 @@ impl BatchPartitioner {
hash_buffer.clear();
hash_buffer.resize(batch.num_rows(), 0);

let mut range_buffer = Vec::<u32>::new();
range_buffer.resize(batch.num_rows(), 0);
let mut range_buffer = vec![0; batch.num_rows()];

create_hashes(&hash_arrays, hash_buffer)?;
create_hashes(&range_arrays, &mut range_buffer)?;
Expand All @@ -164,9 +163,9 @@ impl BatchPartitioner {
.collect();

for (index, (hash, range_hash)) in hash_buffer.iter().zip(range_buffer).enumerate() {
if !indices[(*hash % *partitions as u32) as usize].contains_key(&range_hash) {
indices[(*hash % *partitions as u32) as usize].insert(range_hash, UInt64Builder::with_capacity(batch.num_rows()));
}
indices[(*hash % *partitions as u32) as usize]
.entry(range_hash)
.or_insert_with(|| UInt64Builder::with_capacity(batch.num_rows()));
if let Some(entry) = indices[(*hash % *partitions as u32) as usize].get_mut(&range_hash) {
entry.append_value(index as u64);
}
Expand Down

0 comments on commit ff506fa

Please sign in to comment.