Skip to content
This repository has been archived by the owner on Feb 18, 2024. It is now read-only.

Commit

Permalink
Simplified datatypes (#728)
Browse files Browse the repository at this point in the history
  • Loading branch information
jorgecarleitao committed Jan 5, 2022
1 parent 9ed6aba commit 9b7118b
Show file tree
Hide file tree
Showing 66 changed files with 258 additions and 617 deletions.
6 changes: 3 additions & 3 deletions arrow-parquet-integration-testing/src/main.rs
Expand Up @@ -116,7 +116,7 @@ fn main() -> Result<()> {

let schema = if let Some(projection) = &projection {
let fields = schema
.fields()
.fields
.iter()
.enumerate()
.filter_map(|(i, f)| {
Expand All @@ -127,7 +127,7 @@ fn main() -> Result<()> {
}
})
.collect::<Vec<_>>();
Schema::new(fields)
Schema::from(fields)
} else {
schema
};
Expand Down Expand Up @@ -168,7 +168,7 @@ fn main() -> Result<()> {
};

let encodings = schema
.fields()
.fields
.iter()
.map(|x| match x.data_type() {
DataType::Dictionary(..) => Encoding::RleDictionary,
Expand Down
2 changes: 1 addition & 1 deletion benches/avro_read.rs
Expand Up @@ -51,7 +51,7 @@ fn read_batch(buffer: &[u8], size: usize) -> Result<()> {
codec,
),
avro_schema,
schema.fields().clone(),
schema.fields,
);

let mut rows = 0;
Expand Down
7 changes: 3 additions & 4 deletions benches/filter_kernels.rs
@@ -1,5 +1,3 @@
use std::sync::Arc;

// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
Expand Down Expand Up @@ -125,8 +123,9 @@ fn add_benchmark(c: &mut Criterion) {

let data_array = create_primitive_array::<f32>(size, 0.0);

let columns = Chunk::try_new(vec![Arc::new(data_array) as ArrayRef]).unwrap();
c.bench_function("filter single record batch", |b| {
let columns = Chunk::try_new(vec![&data_array as &dyn Array]).unwrap();

c.bench_function("filter single chunk", |b| {
b.iter(|| filter_chunk(&columns, &filter_array))
});
}
Expand Down
4 changes: 2 additions & 2 deletions benches/write_ipc.rs
Expand Up @@ -3,14 +3,14 @@ use std::io::Cursor;

use arrow2::array::*;
use arrow2::chunk::Chunk;
use arrow2::datatypes::{Field, Schema};
use arrow2::datatypes::Field;
use arrow2::error::Result;
use arrow2::io::ipc::write::*;
use arrow2::util::bench_util::{create_boolean_array, create_primitive_array, create_string_array};

fn write(array: &dyn Array) -> Result<()> {
let field = Field::new("c1", array.data_type().clone(), true);
let schema = Schema::new(vec![field]);
let schema = vec![field].into();
let columns = Chunk::try_new(vec![clone(array).into()])?;

let writer = Cursor::new(vec![]);
Expand Down
4 changes: 2 additions & 2 deletions benches/write_parquet.rs
@@ -1,19 +1,19 @@
use std::io::Cursor;
use std::sync::Arc;

use arrow2::datatypes::{Field, Schema};
use criterion::{criterion_group, criterion_main, Criterion};

use arrow2::array::{clone, Array};
use arrow2::chunk::Chunk;
use arrow2::datatypes::{Field, Schema};
use arrow2::error::Result;
use arrow2::io::parquet::write::*;
use arrow2::util::bench_util::{create_boolean_array, create_primitive_array, create_string_array};

type ChunkArc = Chunk<Arc<dyn Array>>;

fn write(array: &dyn Array, encoding: Encoding) -> Result<()> {
let schema = Schema::new(vec![Field::new("c1", array.data_type().clone(), true)]);
let schema = Schema::from(vec![Field::new("c1", array.data_type().clone(), true)]);
let columns: ChunkArc = Chunk::new(vec![clone(array).into()]);

let options = WriteOptions {
Expand Down
2 changes: 1 addition & 1 deletion examples/avro_read_async.rs
Expand Up @@ -31,7 +31,7 @@ async fn main() -> Result<()> {
let handle = tokio::task::spawn_blocking(move || {
let mut decompressed = Block::new(0, vec![]);
decompress_block(&mut block, &mut decompressed, compression)?;
deserialize(&decompressed, schema.fields(), &avro_schemas)
deserialize(&decompressed, &schema.fields, &avro_schemas)
});
let batch = handle.await.unwrap()?;
assert!(!batch.is_empty());
Expand Down
2 changes: 1 addition & 1 deletion examples/avro_write.rs
Expand Up @@ -51,7 +51,7 @@ fn main() -> Result<()> {
Some(6),
]);
let field = Field::new("c1", array.data_type().clone(), true);
let schema = Schema::new(vec![field]);
let schema = vec![field].into();

let mut file = File::create(path)?;
write_avro(&mut file, &[(&array) as &dyn Array], &schema, None)?;
Expand Down
15 changes: 5 additions & 10 deletions examples/csv_read.rs
Expand Up @@ -10,9 +10,9 @@ fn read_path(path: &str, projection: Option<&[usize]>) -> Result<Chunk<Arc<dyn A
// thus owns the read head.
let mut reader = read::ReaderBuilder::new().from_path(path)?;

// Infers the schema using the default inferer. The inferer is just a function that maps a string
// Infers the fields using the default inferer. The inferer is just a function that maps bytes
// to a `DataType`.
let schema = read::infer_schema(&mut reader, None, true, &read::infer)?;
let fields = read::infer_schema(&mut reader, None, true, &read::infer)?;

// allocate space to read from CSV to. The size of this vec denotes how many rows are read.
let mut rows = vec![read::ByteRecord::default(); 100];
Expand All @@ -23,15 +23,10 @@ fn read_path(path: &str, projection: Option<&[usize]>) -> Result<Chunk<Arc<dyn A
let rows_read = read::read_rows(&mut reader, 0, &mut rows)?;
let rows = &rows[..rows_read];

// parse the batches into a `RecordBatch`. This is CPU-intensive, has no IO,
// parse the rows into a `Chunk`. This is CPU-intensive, has no IO,
// and can be performed on a different thread by passing `rows` through a channel.
read::deserialize_batch(
rows,
schema.fields(),
projection,
0,
read::deserialize_column,
)
// `deserialize_column` is a function that maps rows and a column index to an Array
read::deserialize_batch(rows, &fields, projection, 0, read::deserialize_column)
}

fn main() -> Result<()> {
Expand Down
12 changes: 2 additions & 10 deletions examples/csv_read_async.rs
@@ -1,5 +1,3 @@
use std::sync::Arc;

use tokio::fs::File;
use tokio_util::compat::*;

Expand All @@ -17,18 +15,12 @@ async fn main() -> Result<()> {

let mut reader = AsyncReaderBuilder::new().create_reader(file);

let schema = Arc::new(infer_schema(&mut reader, None, true, &infer).await?);
let fields = infer_schema(&mut reader, None, true, &infer).await?;

let mut rows = vec![ByteRecord::default(); 100];
let rows_read = read_rows(&mut reader, 0, &mut rows).await?;

let columns = deserialize_batch(
&rows[..rows_read],
schema.fields(),
None,
0,
deserialize_column,
)?;
let columns = deserialize_batch(&rows[..rows_read], &fields, None, 0, deserialize_column)?;
println!("{:?}", columns.arrays()[0]);
Ok(())
}
8 changes: 4 additions & 4 deletions examples/csv_read_parallel.rs
Expand Up @@ -17,8 +17,8 @@ fn parallel_read(path: &str) -> Result<Vec<Chunk<Arc<dyn Array>>>> {
let (tx, rx) = unbounded();

let mut reader = read::ReaderBuilder::new().from_path(path)?;
let schema = read::infer_schema(&mut reader, Some(batch_size * 10), has_header, &read::infer)?;
let schema = Arc::new(schema);
let fields = read::infer_schema(&mut reader, Some(batch_size * 10), has_header, &read::infer)?;
let fields = Arc::new(fields);

let start = SystemTime::now();
// spawn a thread to produce `Vec<ByteRecords>` (IO bounded)
Expand All @@ -39,14 +39,14 @@ fn parallel_read(path: &str) -> Result<Vec<Chunk<Arc<dyn Array>>>> {
// use 3 consumers of to decompress, decode and deserialize.
for _ in 0..3 {
let rx_consumer = rx.clone();
let consumer_schema = schema.clone();
let consumer_fields = fields.clone();
let child = thread::spawn(move || {
let (rows, line_number) = rx_consumer.recv().unwrap();
let start = SystemTime::now();
println!("consumer start - {}", line_number);
let batch = read::deserialize_batch(
&rows,
consumer_schema.fields(),
&consumer_fields,
projection,
0,
read::deserialize_column,
Expand Down
2 changes: 1 addition & 1 deletion examples/extension.rs
Expand Up @@ -35,7 +35,7 @@ fn main() -> Result<()> {
}

fn write_ipc<W: Write + Seek>(writer: W, array: impl Array + 'static) -> Result<W> {
let schema = Schema::new(vec![Field::new("a", array.data_type().clone(), false)]);
let schema = vec![Field::new("a", array.data_type().clone(), false)].into();

let options = write::WriteOptions { compression: None };
let mut writer = write::FileWriter::try_new(writer, &schema, None, options)?;
Expand Down
2 changes: 1 addition & 1 deletion examples/ipc_file_read.rs
Expand Up @@ -30,7 +30,7 @@ fn main() -> Result<()> {
let file_path = &args[1];

let (schema, batches) = read_batches(file_path)?;
let names = schema.fields().iter().map(|f| f.name()).collect::<Vec<_>>();
let names = schema.fields.iter().map(|f| &f.name).collect::<Vec<_>>();
println!("{}", print::write(&batches, &names));
Ok(())
}
2 changes: 1 addition & 1 deletion examples/ipc_file_write.rs
Expand Up @@ -26,7 +26,7 @@ fn main() -> Result<()> {
let file_path = &args[1];

// create a batch
let schema = Schema::new(vec![
let schema = Schema::from(vec![
Field::new("a", DataType::Int32, false),
Field::new("b", DataType::Utf8, false),
]);
Expand Down
2 changes: 1 addition & 1 deletion examples/json_read.rs
Expand Up @@ -16,7 +16,7 @@ fn read_path(path: &str, projection: Option<Vec<&str>>) -> Result<Chunk<Arc<dyn
let fields = if let Some(projection) = projection {
fields
.into_iter()
.filter(|field| projection.contains(&field.name().as_ref()))
.filter(|field| projection.contains(&field.name.as_ref()))
.collect()
} else {
fields
Expand Down
10 changes: 4 additions & 6 deletions examples/metadata.rs
@@ -1,5 +1,3 @@
use std::collections::HashMap;

use arrow2::datatypes::{DataType, Field, Metadata, Schema};

fn main() {
Expand All @@ -20,17 +18,17 @@ fn main() {
let field1 = field1.with_metadata(metadata);

// a schema (a table)
let schema = Schema::new(vec![field1, field2]);
let schema = Schema::from(vec![field1, field2]);

assert_eq!(schema.fields().len(), 2);
assert_eq!(schema.fields.len(), 2);

// which can also contain extra metadata:
let mut metadata = HashMap::new();
let mut metadata = Metadata::new();
metadata.insert(
"Office Space".to_string(),
"Deals with real issues in the workplace.".to_string(),
);
let schema = schema.with_metadata(metadata);

assert_eq!(schema.fields().len(), 2);
assert_eq!(schema.fields.len(), 2);
}
2 changes: 1 addition & 1 deletion examples/parquet_read.rs
Expand Up @@ -22,7 +22,7 @@ fn read_field(path: &str, row_group: usize, field: usize) -> Result<Box<dyn Arra
let columns = read::get_column_iterator(&mut file, &metadata, row_group, field, None, vec![]);

// get the columns' field
let field = &arrow_schema.fields()[field];
let field = &arrow_schema.fields[field];

// This is the actual work. In this case, pages are read and
// decompressed, decoded and deserialized to arrow.
Expand Down
2 changes: 1 addition & 1 deletion examples/parquet_read_parallel.rs
Expand Up @@ -65,7 +65,7 @@ fn parallel_read(path: &str, row_group: usize) -> Result<Chunk<Arc<dyn Array>>>
let mut arrays = vec![];
while let Ok((field_i, parquet_field, column_chunks)) = rx_consumer.recv() {
let start = SystemTime::now();
let field = &arrow_schema_consumer.fields()[field_i];
let field = &arrow_schema_consumer.fields[field_i];
println!("consumer {} start - {}", i, field_i);

let columns = read::ReadColumnIterator::new(parquet_field, column_chunks);
Expand Down
4 changes: 2 additions & 2 deletions examples/parquet_write.rs
Expand Up @@ -5,7 +5,7 @@ use arrow2::error::ArrowError;
use arrow2::io::parquet::write::to_parquet_schema;
use arrow2::{
array::{Array, Int32Array},
datatypes::{Field, Schema},
datatypes::Field,
error::Result,
io::parquet::write::{
array_to_pages, write_file, Compression, Compressor, DynIter, DynStreamingIterator,
Expand All @@ -14,7 +14,7 @@ use arrow2::{
};

fn write_single_array(path: &str, array: &dyn Array, field: Field) -> Result<()> {
let schema = Schema::new(vec![field]);
let schema = vec![field].into();

let options = WriteOptions {
write_statistics: true,
Expand Down
2 changes: 1 addition & 1 deletion examples/parquet_write_record.rs
Expand Up @@ -50,7 +50,7 @@ fn main() -> Result<()> {
Some(6),
]);
let field = Field::new("c1", array.data_type().clone(), true);
let schema = Schema::new(vec![field]);
let schema = Schema::from(vec![field]);
let columns = Chunk::new(vec![Arc::new(array) as Arc<dyn Array>]);

write_batch("test.parquet", schema, columns)
Expand Down
2 changes: 1 addition & 1 deletion integration-testing/src/bin/arrow-json-integration-test.rs
Expand Up @@ -95,7 +95,7 @@ fn arrow_to_json(arrow_name: &str, json_name: &str, verbose: bool) -> Result<()>
.schema
.fields
.iter()
.map(|f| f.name())
.map(|f| &f.name)
.collect::<Vec<_>>();

let schema = json_write::serialize_schema(&metadata.schema, &metadata.ipc_schema.fields);
Expand Down
Expand Up @@ -221,7 +221,7 @@ async fn consume_flight_location(

for (counter, expected_batch) in expected_data.iter().enumerate() {
let data =
receive_batch_flight_data(&mut resp, schema.fields(), ipc_schema, &mut dictionaries)
receive_batch_flight_data(&mut resp, &schema.fields, ipc_schema, &mut dictionaries)
.await
.unwrap_or_else(|| {
panic!(
Expand All @@ -234,7 +234,7 @@ async fn consume_flight_location(
let metadata = counter.to_string().into_bytes();
assert_eq!(metadata, data.app_metadata);

let actual_batch = deserialize_batch(&data, schema.fields(), ipc_schema, &dictionaries)
let actual_batch = deserialize_batch(&data, &schema.fields, ipc_schema, &dictionaries)
.expect("Unable to convert flight data to Arrow batch");

assert_eq!(expected_batch.columns().len(), actual_batch.columns().len());
Expand All @@ -245,8 +245,7 @@ async fn consume_flight_location(
.zip(actual_batch.columns().iter())
.enumerate()
{
let field = schema.field(i);
let field_name = field.name();
let field_name = &schema.fields[i].name;
assert_eq!(expected, actual, "Data for field {}", field_name);
}
}
Expand Down
Expand Up @@ -299,8 +299,7 @@ async fn record_batch_from_message(
0,
);

arrow_batch_result
.map_err(|e| Status::internal(format!("Could not convert to Chunk: {:?}", e)))
arrow_batch_result.map_err(|e| Status::internal(format!("Could not convert to Chunk: {:?}", e)))
}

async fn dictionary_from_message(
Expand Down Expand Up @@ -351,7 +350,7 @@ async fn save_uploaded_chunks(
let batch = record_batch_from_message(
message,
&data.data_body,
schema.fields(),
&schema.fields,
&ipc_schema,
&mut dictionaries,
)
Expand All @@ -363,7 +362,7 @@ async fn save_uploaded_chunks(
dictionary_from_message(
message,
&data.data_body,
schema.fields(),
&schema.fields,
&ipc_schema,
&mut dictionaries,
)
Expand Down
2 changes: 1 addition & 1 deletion src/array/display.rs
Expand Up @@ -184,7 +184,7 @@ pub fn get_value_display<'a>(array: &'a dyn Array) -> Box<dyn Fn(usize) -> Strin
Box::new(move |row: usize| {
let mut string = displays
.iter()
.zip(a.fields().iter().map(|f| f.name()))
.zip(a.fields().iter().map(|f| &f.name))
.map(|(f, name)| (f(row), name))
.fold("{".to_string(), |mut acc, (v, name)| {
acc.push_str(&format!("{}: {}, ", name, v));
Expand Down
2 changes: 1 addition & 1 deletion src/array/struct_/mod.rs
Expand Up @@ -218,7 +218,7 @@ impl std::fmt::Debug for StructArray {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
writeln!(f, "StructArray{{")?;
for (field, column) in self.fields().iter().zip(self.values()) {
writeln!(f, "{}: {:?},", field.name(), column)?;
writeln!(f, "{}: {:?},", field.name, column)?;
}
write!(f, "}}")
}
Expand Down

0 comments on commit 9b7118b

Please sign in to comment.