Skip to content
This repository has been archived by the owner on Feb 18, 2024. It is now read-only.

Simplified Schema and Field #728

Merged
merged 1 commit into from Jan 5, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
6 changes: 3 additions & 3 deletions arrow-parquet-integration-testing/src/main.rs
Expand Up @@ -116,7 +116,7 @@ fn main() -> Result<()> {

let schema = if let Some(projection) = &projection {
let fields = schema
.fields()
.fields
.iter()
.enumerate()
.filter_map(|(i, f)| {
Expand All @@ -127,7 +127,7 @@ fn main() -> Result<()> {
}
})
.collect::<Vec<_>>();
Schema::new(fields)
Schema::from(fields)
} else {
schema
};
Expand Down Expand Up @@ -168,7 +168,7 @@ fn main() -> Result<()> {
};

let encodings = schema
.fields()
.fields
.iter()
.map(|x| match x.data_type() {
DataType::Dictionary(..) => Encoding::RleDictionary,
Expand Down
2 changes: 1 addition & 1 deletion benches/avro_read.rs
Expand Up @@ -51,7 +51,7 @@ fn read_batch(buffer: &[u8], size: usize) -> Result<()> {
codec,
),
avro_schema,
schema.fields().clone(),
schema.fields,
);

let mut rows = 0;
Expand Down
7 changes: 3 additions & 4 deletions benches/filter_kernels.rs
@@ -1,5 +1,3 @@
use std::sync::Arc;

// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
Expand Down Expand Up @@ -125,8 +123,9 @@ fn add_benchmark(c: &mut Criterion) {

let data_array = create_primitive_array::<f32>(size, 0.0);

let columns = Chunk::try_new(vec![Arc::new(data_array) as ArrayRef]).unwrap();
c.bench_function("filter single record batch", |b| {
let columns = Chunk::try_new(vec![&data_array as &dyn Array]).unwrap();

c.bench_function("filter single chunk", |b| {
b.iter(|| filter_chunk(&columns, &filter_array))
});
}
Expand Down
4 changes: 2 additions & 2 deletions benches/write_ipc.rs
Expand Up @@ -3,14 +3,14 @@ use std::io::Cursor;

use arrow2::array::*;
use arrow2::chunk::Chunk;
use arrow2::datatypes::{Field, Schema};
use arrow2::datatypes::Field;
use arrow2::error::Result;
use arrow2::io::ipc::write::*;
use arrow2::util::bench_util::{create_boolean_array, create_primitive_array, create_string_array};

fn write(array: &dyn Array) -> Result<()> {
let field = Field::new("c1", array.data_type().clone(), true);
let schema = Schema::new(vec![field]);
let schema = vec![field].into();
let columns = Chunk::try_new(vec![clone(array).into()])?;

let writer = Cursor::new(vec![]);
Expand Down
4 changes: 2 additions & 2 deletions benches/write_parquet.rs
@@ -1,19 +1,19 @@
use std::io::Cursor;
use std::sync::Arc;

use arrow2::datatypes::{Field, Schema};
use criterion::{criterion_group, criterion_main, Criterion};

use arrow2::array::{clone, Array};
use arrow2::chunk::Chunk;
use arrow2::datatypes::{Field, Schema};
use arrow2::error::Result;
use arrow2::io::parquet::write::*;
use arrow2::util::bench_util::{create_boolean_array, create_primitive_array, create_string_array};

type ChunkArc = Chunk<Arc<dyn Array>>;

fn write(array: &dyn Array, encoding: Encoding) -> Result<()> {
let schema = Schema::new(vec![Field::new("c1", array.data_type().clone(), true)]);
let schema = Schema::from(vec![Field::new("c1", array.data_type().clone(), true)]);
let columns: ChunkArc = Chunk::new(vec![clone(array).into()]);

let options = WriteOptions {
Expand Down
2 changes: 1 addition & 1 deletion examples/avro_read_async.rs
Expand Up @@ -31,7 +31,7 @@ async fn main() -> Result<()> {
let handle = tokio::task::spawn_blocking(move || {
let mut decompressed = Block::new(0, vec![]);
decompress_block(&mut block, &mut decompressed, compression)?;
deserialize(&decompressed, schema.fields(), &avro_schemas)
deserialize(&decompressed, &schema.fields, &avro_schemas)
});
let batch = handle.await.unwrap()?;
assert!(!batch.is_empty());
Expand Down
2 changes: 1 addition & 1 deletion examples/avro_write.rs
Expand Up @@ -51,7 +51,7 @@ fn main() -> Result<()> {
Some(6),
]);
let field = Field::new("c1", array.data_type().clone(), true);
let schema = Schema::new(vec![field]);
let schema = vec![field].into();

let mut file = File::create(path)?;
write_avro(&mut file, &[(&array) as &dyn Array], &schema, None)?;
Expand Down
15 changes: 5 additions & 10 deletions examples/csv_read.rs
Expand Up @@ -10,9 +10,9 @@ fn read_path(path: &str, projection: Option<&[usize]>) -> Result<Chunk<Arc<dyn A
// thus owns the read head.
let mut reader = read::ReaderBuilder::new().from_path(path)?;

// Infers the schema using the default inferer. The inferer is just a function that maps a string
// Infers the fields using the default inferer. The inferer is just a function that maps bytes
// to a `DataType`.
let schema = read::infer_schema(&mut reader, None, true, &read::infer)?;
let fields = read::infer_schema(&mut reader, None, true, &read::infer)?;

// allocate space to read from CSV to. The size of this vec denotes how many rows are read.
let mut rows = vec![read::ByteRecord::default(); 100];
Expand All @@ -23,15 +23,10 @@ fn read_path(path: &str, projection: Option<&[usize]>) -> Result<Chunk<Arc<dyn A
let rows_read = read::read_rows(&mut reader, 0, &mut rows)?;
let rows = &rows[..rows_read];

// parse the batches into a `RecordBatch`. This is CPU-intensive, has no IO,
// parse the rows into a `Chunk`. This is CPU-intensive, has no IO,
// and can be performed on a different thread by passing `rows` through a channel.
read::deserialize_batch(
rows,
schema.fields(),
projection,
0,
read::deserialize_column,
)
// `deserialize_column` is a function that maps rows and a column index to an Array
read::deserialize_batch(rows, &fields, projection, 0, read::deserialize_column)
}

fn main() -> Result<()> {
Expand Down
12 changes: 2 additions & 10 deletions examples/csv_read_async.rs
@@ -1,5 +1,3 @@
use std::sync::Arc;

use tokio::fs::File;
use tokio_util::compat::*;

Expand All @@ -17,18 +15,12 @@ async fn main() -> Result<()> {

let mut reader = AsyncReaderBuilder::new().create_reader(file);

let schema = Arc::new(infer_schema(&mut reader, None, true, &infer).await?);
let fields = infer_schema(&mut reader, None, true, &infer).await?;

let mut rows = vec![ByteRecord::default(); 100];
let rows_read = read_rows(&mut reader, 0, &mut rows).await?;

let columns = deserialize_batch(
&rows[..rows_read],
schema.fields(),
None,
0,
deserialize_column,
)?;
let columns = deserialize_batch(&rows[..rows_read], &fields, None, 0, deserialize_column)?;
println!("{:?}", columns.arrays()[0]);
Ok(())
}
8 changes: 4 additions & 4 deletions examples/csv_read_parallel.rs
Expand Up @@ -17,8 +17,8 @@ fn parallel_read(path: &str) -> Result<Vec<Chunk<Arc<dyn Array>>>> {
let (tx, rx) = unbounded();

let mut reader = read::ReaderBuilder::new().from_path(path)?;
let schema = read::infer_schema(&mut reader, Some(batch_size * 10), has_header, &read::infer)?;
let schema = Arc::new(schema);
let fields = read::infer_schema(&mut reader, Some(batch_size * 10), has_header, &read::infer)?;
let fields = Arc::new(fields);

let start = SystemTime::now();
// spawn a thread to produce `Vec<ByteRecords>` (IO bounded)
Expand All @@ -39,14 +39,14 @@ fn parallel_read(path: &str) -> Result<Vec<Chunk<Arc<dyn Array>>>> {
// use 3 consumers of to decompress, decode and deserialize.
for _ in 0..3 {
let rx_consumer = rx.clone();
let consumer_schema = schema.clone();
let consumer_fields = fields.clone();
let child = thread::spawn(move || {
let (rows, line_number) = rx_consumer.recv().unwrap();
let start = SystemTime::now();
println!("consumer start - {}", line_number);
let batch = read::deserialize_batch(
&rows,
consumer_schema.fields(),
&consumer_fields,
projection,
0,
read::deserialize_column,
Expand Down
2 changes: 1 addition & 1 deletion examples/extension.rs
Expand Up @@ -35,7 +35,7 @@ fn main() -> Result<()> {
}

fn write_ipc<W: Write + Seek>(writer: W, array: impl Array + 'static) -> Result<W> {
let schema = Schema::new(vec![Field::new("a", array.data_type().clone(), false)]);
let schema = vec![Field::new("a", array.data_type().clone(), false)].into();

let options = write::WriteOptions { compression: None };
let mut writer = write::FileWriter::try_new(writer, &schema, None, options)?;
Expand Down
2 changes: 1 addition & 1 deletion examples/ipc_file_read.rs
Expand Up @@ -30,7 +30,7 @@ fn main() -> Result<()> {
let file_path = &args[1];

let (schema, batches) = read_batches(file_path)?;
let names = schema.fields().iter().map(|f| f.name()).collect::<Vec<_>>();
let names = schema.fields.iter().map(|f| &f.name).collect::<Vec<_>>();
println!("{}", print::write(&batches, &names));
Ok(())
}
2 changes: 1 addition & 1 deletion examples/ipc_file_write.rs
Expand Up @@ -26,7 +26,7 @@ fn main() -> Result<()> {
let file_path = &args[1];

// create a batch
let schema = Schema::new(vec![
let schema = Schema::from(vec![
Field::new("a", DataType::Int32, false),
Field::new("b", DataType::Utf8, false),
]);
Expand Down
2 changes: 1 addition & 1 deletion examples/json_read.rs
Expand Up @@ -16,7 +16,7 @@ fn read_path(path: &str, projection: Option<Vec<&str>>) -> Result<Chunk<Arc<dyn
let fields = if let Some(projection) = projection {
fields
.into_iter()
.filter(|field| projection.contains(&field.name().as_ref()))
.filter(|field| projection.contains(&field.name.as_ref()))
.collect()
} else {
fields
Expand Down
10 changes: 4 additions & 6 deletions examples/metadata.rs
@@ -1,5 +1,3 @@
use std::collections::HashMap;

use arrow2::datatypes::{DataType, Field, Metadata, Schema};

fn main() {
Expand All @@ -20,17 +18,17 @@ fn main() {
let field1 = field1.with_metadata(metadata);

// a schema (a table)
let schema = Schema::new(vec![field1, field2]);
let schema = Schema::from(vec![field1, field2]);

assert_eq!(schema.fields().len(), 2);
assert_eq!(schema.fields.len(), 2);

// which can also contain extra metadata:
let mut metadata = HashMap::new();
let mut metadata = Metadata::new();
metadata.insert(
"Office Space".to_string(),
"Deals with real issues in the workplace.".to_string(),
);
let schema = schema.with_metadata(metadata);

assert_eq!(schema.fields().len(), 2);
assert_eq!(schema.fields.len(), 2);
}
2 changes: 1 addition & 1 deletion examples/parquet_read.rs
Expand Up @@ -22,7 +22,7 @@ fn read_field(path: &str, row_group: usize, field: usize) -> Result<Box<dyn Arra
let columns = read::get_column_iterator(&mut file, &metadata, row_group, field, None, vec![]);

// get the columns' field
let field = &arrow_schema.fields()[field];
let field = &arrow_schema.fields[field];

// This is the actual work. In this case, pages are read and
// decompressed, decoded and deserialized to arrow.
Expand Down
2 changes: 1 addition & 1 deletion examples/parquet_read_parallel.rs
Expand Up @@ -65,7 +65,7 @@ fn parallel_read(path: &str, row_group: usize) -> Result<Chunk<Arc<dyn Array>>>
let mut arrays = vec![];
while let Ok((field_i, parquet_field, column_chunks)) = rx_consumer.recv() {
let start = SystemTime::now();
let field = &arrow_schema_consumer.fields()[field_i];
let field = &arrow_schema_consumer.fields[field_i];
println!("consumer {} start - {}", i, field_i);

let columns = read::ReadColumnIterator::new(parquet_field, column_chunks);
Expand Down
4 changes: 2 additions & 2 deletions examples/parquet_write.rs
Expand Up @@ -5,7 +5,7 @@ use arrow2::error::ArrowError;
use arrow2::io::parquet::write::to_parquet_schema;
use arrow2::{
array::{Array, Int32Array},
datatypes::{Field, Schema},
datatypes::Field,
error::Result,
io::parquet::write::{
array_to_pages, write_file, Compression, Compressor, DynIter, DynStreamingIterator,
Expand All @@ -14,7 +14,7 @@ use arrow2::{
};

fn write_single_array(path: &str, array: &dyn Array, field: Field) -> Result<()> {
let schema = Schema::new(vec![field]);
let schema = vec![field].into();

let options = WriteOptions {
write_statistics: true,
Expand Down
2 changes: 1 addition & 1 deletion examples/parquet_write_record.rs
Expand Up @@ -50,7 +50,7 @@ fn main() -> Result<()> {
Some(6),
]);
let field = Field::new("c1", array.data_type().clone(), true);
let schema = Schema::new(vec![field]);
let schema = Schema::from(vec![field]);
let columns = Chunk::new(vec![Arc::new(array) as Arc<dyn Array>]);

write_batch("test.parquet", schema, columns)
Expand Down
2 changes: 1 addition & 1 deletion integration-testing/src/bin/arrow-json-integration-test.rs
Expand Up @@ -95,7 +95,7 @@ fn arrow_to_json(arrow_name: &str, json_name: &str, verbose: bool) -> Result<()>
.schema
.fields
.iter()
.map(|f| f.name())
.map(|f| &f.name)
.collect::<Vec<_>>();

let schema = json_write::serialize_schema(&metadata.schema, &metadata.ipc_schema.fields);
Expand Down
Expand Up @@ -221,7 +221,7 @@ async fn consume_flight_location(

for (counter, expected_batch) in expected_data.iter().enumerate() {
let data =
receive_batch_flight_data(&mut resp, schema.fields(), ipc_schema, &mut dictionaries)
receive_batch_flight_data(&mut resp, &schema.fields, ipc_schema, &mut dictionaries)
.await
.unwrap_or_else(|| {
panic!(
Expand All @@ -234,7 +234,7 @@ async fn consume_flight_location(
let metadata = counter.to_string().into_bytes();
assert_eq!(metadata, data.app_metadata);

let actual_batch = deserialize_batch(&data, schema.fields(), ipc_schema, &dictionaries)
let actual_batch = deserialize_batch(&data, &schema.fields, ipc_schema, &dictionaries)
.expect("Unable to convert flight data to Arrow batch");

assert_eq!(expected_batch.columns().len(), actual_batch.columns().len());
Expand All @@ -245,8 +245,7 @@ async fn consume_flight_location(
.zip(actual_batch.columns().iter())
.enumerate()
{
let field = schema.field(i);
let field_name = field.name();
let field_name = &schema.fields[i].name;
assert_eq!(expected, actual, "Data for field {}", field_name);
}
}
Expand Down
Expand Up @@ -299,8 +299,7 @@ async fn record_batch_from_message(
0,
);

arrow_batch_result
.map_err(|e| Status::internal(format!("Could not convert to Chunk: {:?}", e)))
arrow_batch_result.map_err(|e| Status::internal(format!("Could not convert to Chunk: {:?}", e)))
}

async fn dictionary_from_message(
Expand Down Expand Up @@ -351,7 +350,7 @@ async fn save_uploaded_chunks(
let batch = record_batch_from_message(
message,
&data.data_body,
schema.fields(),
&schema.fields,
&ipc_schema,
&mut dictionaries,
)
Expand All @@ -363,7 +362,7 @@ async fn save_uploaded_chunks(
dictionary_from_message(
message,
&data.data_body,
schema.fields(),
&schema.fields,
&ipc_schema,
&mut dictionaries,
)
Expand Down
2 changes: 1 addition & 1 deletion src/array/display.rs
Expand Up @@ -184,7 +184,7 @@ pub fn get_value_display<'a>(array: &'a dyn Array) -> Box<dyn Fn(usize) -> Strin
Box::new(move |row: usize| {
let mut string = displays
.iter()
.zip(a.fields().iter().map(|f| f.name()))
.zip(a.fields().iter().map(|f| &f.name))
.map(|(f, name)| (f(row), name))
.fold("{".to_string(), |mut acc, (v, name)| {
acc.push_str(&format!("{}: {}, ", name, v));
Expand Down
2 changes: 1 addition & 1 deletion src/array/struct_/mod.rs
Expand Up @@ -218,7 +218,7 @@ impl std::fmt::Debug for StructArray {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
writeln!(f, "StructArray{{")?;
for (field, column) in self.fields().iter().zip(self.values()) {
writeln!(f, "{}: {:?},", field.name(), column)?;
writeln!(f, "{}: {:?},", field.name, column)?;
}
write!(f, "}}")
}
Expand Down