Skip to content

Commit

Permalink
Support empty RecordBatch (apache#1536)
Browse files Browse the repository at this point in the history
  • Loading branch information
tustvold committed Apr 12, 2022
1 parent 68038f5 commit d1dd90d
Show file tree
Hide file tree
Showing 2 changed files with 62 additions and 28 deletions.
89 changes: 61 additions & 28 deletions arrow/src/record_batch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ use crate::error::{ArrowError, Result};
pub struct RecordBatch {
schema: SchemaRef,
columns: Vec<Arc<dyn Array>>,
row_count: usize,
}

impl RecordBatch {
Expand Down Expand Up @@ -77,8 +78,7 @@ impl RecordBatch {
/// ```
pub fn try_new(schema: SchemaRef, columns: Vec<ArrayRef>) -> Result<Self> {
let options = RecordBatchOptions::default();
Self::validate_new_batch(&schema, columns.as_slice(), &options)?;
Ok(RecordBatch { schema, columns })
Self::try_new_impl(schema, columns, &options)
}

/// Creates a `RecordBatch` from a schema and columns, with additional options,
Expand All @@ -90,8 +90,7 @@ impl RecordBatch {
columns: Vec<ArrayRef>,
options: &RecordBatchOptions,
) -> Result<Self> {
Self::validate_new_batch(&schema, columns.as_slice(), options)?;
Ok(RecordBatch { schema, columns })
Self::try_new_impl(schema, columns, options)
}

/// Creates a new empty [`RecordBatch`].
Expand All @@ -101,23 +100,21 @@ impl RecordBatch {
.iter()
.map(|field| new_empty_array(field.data_type()))
.collect();
RecordBatch { schema, columns }

RecordBatch {
schema,
columns,
row_count: 0,
}
}

/// Validate the schema and columns using [`RecordBatchOptions`]. Returns an error
/// if any validation check fails.
fn validate_new_batch(
schema: &SchemaRef,
columns: &[ArrayRef],
/// if any validation check fails, otherwise returns the created [`RecordBatch`]
fn try_new_impl(
schema: SchemaRef,
columns: Vec<ArrayRef>,
options: &RecordBatchOptions,
) -> Result<()> {
// check that there are some columns
if columns.is_empty() {
return Err(ArrowError::InvalidArgumentError(
"at least one column must be defined to create a record batch"
.to_string(),
));
}
) -> Result<RecordBatch> {
// check that number of fields in schema match column length
if schema.fields().len() != columns.len() {
return Err(ArrowError::InvalidArgumentError(format!(
Expand All @@ -128,7 +125,13 @@ impl RecordBatch {
}

// check that all columns have the same row count
let row_count = columns[0].data().len();
let row_count = options
.row_count
.or(columns.first().map(|col| col.len()))
.ok_or(ArrowError::InvalidArgumentError(
"must either specify a row count or at least one column".to_string(),
))?;

if columns.iter().any(|c| c.len() != row_count) {
return Err(ArrowError::InvalidArgumentError(
"all columns in a record batch must have the same length".to_string(),
Expand Down Expand Up @@ -163,7 +166,11 @@ impl RecordBatch {
i)));
}

Ok(())
Ok(RecordBatch {
schema,
columns,
row_count,
})
}

/// Returns the [`Schema`](crate::datatypes::Schema) of the record batch.
Expand Down Expand Up @@ -218,10 +225,6 @@ impl RecordBatch {

/// Returns the number of rows in each column.
///
/// # Panics
///
/// Panics if the `RecordBatch` contains no columns.
///
/// # Example
///
/// ```
Expand All @@ -243,7 +246,7 @@ impl RecordBatch {
/// # }
/// ```
pub fn num_rows(&self) -> usize {
self.columns[0].data().len()
self.row_count
}

/// Get a reference to a column's array by index.
Expand All @@ -267,10 +270,6 @@ impl RecordBatch {
///
/// Panics if `offset` with `length` is greater than column length.
pub fn slice(&self, offset: usize, length: usize) -> RecordBatch {
if self.schema.fields().is_empty() {
assert!((offset + length) == 0);
return RecordBatch::new_empty(self.schema.clone());
}
assert!((offset + length) <= self.num_rows());

let columns = self
Expand All @@ -282,6 +281,7 @@ impl RecordBatch {
Self {
schema: self.schema.clone(),
columns,
row_count: length,
}
}

Expand Down Expand Up @@ -402,15 +402,20 @@ impl RecordBatch {

/// Options that control the behaviour used when creating a [`RecordBatch`].
#[derive(Debug)]
#[non_exhaustive]
pub struct RecordBatchOptions {
/// Match field names of structs and lists. If set to `true`, the names must match.
pub match_field_names: bool,

/// Optional row count, useful for specifying a row count for a RecordBatch with no columns
pub row_count: Option<usize>,
}

impl Default for RecordBatchOptions {
fn default() -> Self {
Self {
match_field_names: true,
row_count: None,
}
}
}
Expand All @@ -426,6 +431,7 @@ impl From<&StructArray> for RecordBatch {
let columns = struct_array.boxed_fields.clone();
RecordBatch {
schema: Arc::new(schema),
row_count: struct_array.len(),
columns,
}
} else {
Expand Down Expand Up @@ -644,6 +650,7 @@ mod tests {
// creating the batch without field name validation should pass
let options = RecordBatchOptions {
match_field_names: false,
row_count: None,
};
let batch = RecordBatch::try_new_with_options(schema, vec![a], &options);
assert!(batch.is_ok());
Expand Down Expand Up @@ -934,4 +941,30 @@ mod tests {

assert_eq!(expected, record_batch.project(&[0, 2]).unwrap());
}

#[test]
fn test_no_column_record_batch() {
let schema = Arc::new(Schema::new(vec![]));

let err = RecordBatch::try_new(schema.clone(), vec![]).unwrap_err();
assert!(err
.to_string()
.contains("must either specify a row count or at least one column"));

let mut options = RecordBatchOptions::default();
options.row_count = Some(10);

let ok =
RecordBatch::try_new_with_options(schema.clone(), vec![], &options).unwrap();
assert_eq!(ok.num_rows(), 10);

let a = ok.slice(2, 5);
assert_eq!(a.num_rows(), 5);

let b = ok.slice(5, 0);
assert_eq!(b.num_rows(), 0);

assert_ne!(a, b);
assert_eq!(b, RecordBatch::new_empty(schema))
}
}
1 change: 1 addition & 0 deletions arrow/src/util/data_gen.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ pub fn create_random_batch(
columns,
&RecordBatchOptions {
match_field_names: false,
row_count: None,
},
)
}
Expand Down

0 comments on commit d1dd90d

Please sign in to comment.