Skip to content
This repository has been archived by the owner on Feb 18, 2024. It is now read-only.

Commit

Permalink
Arc->Box (#1042)
Browse files Browse the repository at this point in the history
  • Loading branch information
jorgecarleitao committed Jun 12, 2022
1 parent d2f5935 commit e0f4cee
Show file tree
Hide file tree
Showing 158 changed files with 855 additions and 913 deletions.
1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ bench = false
[dependencies]
either = "1.6"
num-traits = "0.2"
dyn-clone = "1"
bytemuck = { version = "1", features = ["derive"] }
chrono = { version = "0.4", default_features = false, features = ["std"] }
chrono-tz = { version = "0.6", optional = true }
Expand Down
8 changes: 4 additions & 4 deletions arrow-parquet-integration-testing/src/main.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
use std::fs::File;
use std::{collections::HashMap, io::Read};

use arrow2::array::Array;
use arrow2::io::ipc::IpcField;
use arrow2::{
Expand All @@ -15,15 +18,12 @@ use arrow2::{
};
use clap::Parser;
use flate2::read::GzDecoder;
use std::fs::File;
use std::sync::Arc;
use std::{collections::HashMap, io::Read};

/// Read gzipped JSON file
pub fn read_gzip_json(
version: &str,
file_name: &str,
) -> Result<(Schema, Vec<IpcField>, Vec<Chunk<Arc<dyn Array>>>)> {
) -> Result<(Schema, Vec<IpcField>, Vec<Chunk<Box<dyn Array>>>)> {
let path = format!(
"../testing/arrow-testing/data/arrow-ipc-stream/integration/{}/{}.json.gz",
version, file_name
Expand Down
8 changes: 3 additions & 5 deletions arrow-pyarrow-integration-testing/src/c_stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,15 +34,13 @@ pub fn from_rust_iterator(py: Python) -> PyResult<PyObject> {
let array = Int32Array::from(&[Some(2), None, Some(1), None]);
let array = StructArray::from_data(
DataType::Struct(vec![Field::new("a", array.data_type().clone(), true)]),
vec![Arc::new(array)],
vec![array.boxed()],
None,
);
)
.boxed();
// and a field with its datatype
let field = Field::new("a", array.data_type().clone(), true);

// Arc it, since it will be shared with an external program
let array: Arc<dyn Array> = Arc::new(array.clone());

// create an iterator of arrays
let arrays = vec![array.clone(), array.clone(), array];
let iter = Box::new(arrays.clone().into_iter().map(Ok)) as _;
Expand Down
5 changes: 2 additions & 3 deletions arrow-pyarrow-integration-testing/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ mod c_stream;

use std::error;
use std::fmt;
use std::sync::Arc;

use pyo3::exceptions::PyOSError;
use pyo3::ffi::Py_uintptr_t;
Expand Down Expand Up @@ -50,7 +49,7 @@ impl From<PyO3Error> for PyErr {
}
}

fn to_rust_array(ob: PyObject, py: Python) -> PyResult<Arc<dyn Array>> {
fn to_rust_array(ob: PyObject, py: Python) -> PyResult<Box<dyn Array>> {
// prepare a pointer to receive the Array struct
let array = Box::new(ffi::ArrowArray::empty());
let schema = Box::new(ffi::ArrowSchema::empty());
Expand All @@ -73,7 +72,7 @@ fn to_rust_array(ob: PyObject, py: Python) -> PyResult<Arc<dyn Array>> {
Ok(array.into())
}

fn to_py_array(array: Arc<dyn Array>, py: Python) -> PyResult<PyObject> {
fn to_py_array(array: Box<dyn Array>, py: Python) -> PyResult<PyObject> {
let array_ptr = Box::new(ffi::ArrowArray::empty());
let schema_ptr = Box::new(ffi::ArrowSchema::empty());

Expand Down
3 changes: 1 addition & 2 deletions benches/iter_list.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
use std::iter::FromIterator;
use std::sync::Arc;

use criterion::{criterion_group, criterion_main, Criterion};

Expand Down Expand Up @@ -28,7 +27,7 @@ fn add_benchmark(c: &mut Criterion) {
let array = ListArray::<i32>::from_data(
data_type,
offsets.into(),
Arc::new(values),
Box::new(values),
Some(validity),
);

Expand Down
10 changes: 4 additions & 6 deletions benches/write_csv.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,3 @@
use std::sync::Arc;

use criterion::{criterion_group, criterion_main, Criterion};

use arrow2::array::*;
Expand All @@ -8,9 +6,9 @@ use arrow2::error::Result;
use arrow2::io::csv::write;
use arrow2::util::bench_util::*;

type ChunkArc = Chunk<Arc<dyn Array>>;
type ChunkBox = Chunk<Box<dyn Array>>;

fn write_batch(columns: &ChunkArc) -> Result<()> {
fn write_batch(columns: &ChunkBox) -> Result<()> {
let mut writer = vec![];

assert_eq!(columns.arrays().len(), 1);
Expand All @@ -20,8 +18,8 @@ fn write_batch(columns: &ChunkArc) -> Result<()> {
write::write_chunk(&mut writer, columns, &options)
}

fn make_chunk(array: impl Array + 'static) -> Chunk<Arc<dyn Array>> {
Chunk::new(vec![Arc::new(array)])
fn make_chunk(array: impl Array + 'static) -> Chunk<Box<dyn Array>> {
Chunk::new(vec![Box::new(array)])
}

fn add_benchmark(c: &mut Criterion) {
Expand Down
2 changes: 1 addition & 1 deletion benches/write_ipc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ use arrow2::util::bench_util::{create_boolean_array, create_primitive_array, cre
fn write(array: &dyn Array) -> Result<()> {
let field = Field::new("c1", array.data_type().clone(), true);
let schema = vec![field].into();
let columns = Chunk::try_new(vec![clone(array).into()])?;
let columns = Chunk::try_new(vec![clone(array)])?;

let writer = Cursor::new(vec![]);
let mut writer = FileWriter::try_new(writer, &schema, None, Default::default())?;
Expand Down
6 changes: 2 additions & 4 deletions benches/write_parquet.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,3 @@
use std::sync::Arc;

use criterion::{criterion_group, criterion_main, Criterion};

use arrow2::array::{clone, Array};
Expand All @@ -9,11 +7,11 @@ use arrow2::error::Result;
use arrow2::io::parquet::write::*;
use arrow2::util::bench_util::{create_boolean_array, create_primitive_array, create_string_array};

type ChunkArc = Chunk<Arc<dyn Array>>;
type ChunkBox = Chunk<Box<dyn Array>>;

fn write(array: &dyn Array, encoding: Encoding) -> Result<()> {
let schema = Schema::from(vec![Field::new("c1", array.data_type().clone(), true)]);
let columns: ChunkArc = Chunk::new(vec![clone(array).into()]);
let columns: ChunkBox = Chunk::new(vec![clone(array)]);

let options = WriteOptions {
write_statistics: false,
Expand Down
6 changes: 2 additions & 4 deletions examples/avro_read_async.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,3 @@
use std::sync::Arc;

use futures::pin_mut;
use futures::StreamExt;
use tokio::fs::File;
Expand All @@ -19,8 +17,8 @@ async fn main() -> Result<()> {
let mut reader = File::open(file_path).await?.compat();

let (avro_schemas, schema, compression, marker) = read_metadata(&mut reader).await?;
let avro_schemas = Arc::new(avro_schemas);
let projection = Arc::new(schema.fields.iter().map(|_| true).collect::<Vec<_>>());
let avro_schemas = Box::new(avro_schemas);
let projection = Box::new(schema.fields.iter().map(|_| true).collect::<Vec<_>>());

let blocks = block_stream(&mut reader, marker).await;

Expand Down
20 changes: 20 additions & 0 deletions examples/cow.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
// This example demos how to operate on arrays in-place.
use arrow2::array::{Array, PrimitiveArray};

fn main() {
// say we have have received an array
let mut array: Box<dyn Array> = PrimitiveArray::from_vec(vec![1i32, 2]).boxed();

// we can apply a transformation to its values without allocating a new array as follows:
// 1. downcast it to the correct type (known via `array.data_type().to_physical_type()`)
let array = array
.as_any_mut()
.downcast_mut::<PrimitiveArray<i32>>()
.unwrap();

// 2. call `apply_values` with the function to apply over the values
array.apply_values(|x| x.iter_mut().for_each(|x| *x *= 10));

// confirm that it gives the right result :)
assert_eq!(array, &PrimitiveArray::from_vec(vec![10i32, 20]));
}
4 changes: 1 addition & 3 deletions examples/csv_read.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,9 @@
use std::sync::Arc;

use arrow2::array::Array;
use arrow2::chunk::Chunk;
use arrow2::error::Result;
use arrow2::io::csv::read;

fn read_path(path: &str, projection: Option<&[usize]>) -> Result<Chunk<Arc<dyn Array>>> {
fn read_path(path: &str, projection: Option<&[usize]>) -> Result<Chunk<Box<dyn Array>>> {
// Create a CSV reader. This is typically created on the thread that reads the file and
// thus owns the read head.
let mut reader = read::ReaderBuilder::new().from_path(path)?;
Expand Down
5 changes: 2 additions & 3 deletions examples/csv_read_parallel.rs
Original file line number Diff line number Diff line change
@@ -1,14 +1,13 @@
use crossbeam_channel::unbounded;

use std::sync::Arc;
use std::thread;
use std::time::SystemTime;

use arrow2::array::Array;
use arrow2::chunk::Chunk;
use arrow2::{error::Result, io::csv::read};

fn parallel_read(path: &str) -> Result<Vec<Chunk<Arc<dyn Array>>>> {
fn parallel_read(path: &str) -> Result<Vec<Chunk<Box<dyn Array>>>> {
let batch_size = 100;
let has_header = true;
let projection = None;
Expand All @@ -19,7 +18,7 @@ fn parallel_read(path: &str) -> Result<Vec<Chunk<Arc<dyn Array>>>> {
let mut reader = read::ReaderBuilder::new().from_path(path)?;
let (fields, _) =
read::infer_schema(&mut reader, Some(batch_size * 10), has_header, &read::infer)?;
let fields = Arc::new(fields);
let fields = Box::new(fields);

let start = SystemTime::now();
// spawn a thread to produce `Vec<ByteRecords>` (IO bounded)
Expand Down
5 changes: 2 additions & 3 deletions examples/csv_write_parallel.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
use std::io::Write;
use std::sync::mpsc;
use std::sync::mpsc::{Receiver, Sender};
use std::sync::Arc;
use std::thread;

use arrow2::{
Expand All @@ -11,7 +10,7 @@ use arrow2::{
io::csv::write,
};

fn parallel_write(path: &str, batches: [Chunk<Arc<dyn Array>>; 2]) -> Result<()> {
fn parallel_write(path: &str, batches: [Chunk<Box<dyn Array>>; 2]) -> Result<()> {
let options = write::SerializeOptions::default();

// write a header
Expand Down Expand Up @@ -59,7 +58,7 @@ fn main() -> Result<()> {
Some(5),
Some(6),
]);
let columns = Chunk::new(vec![array.arced()]);
let columns = Chunk::new(vec![array.boxed()]);

parallel_write("example.csv", [columns.clone(), columns])
}
5 changes: 2 additions & 3 deletions examples/extension.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
use std::io::{Cursor, Seek, Write};
use std::sync::Arc;

use arrow2::array::*;
use arrow2::chunk::Chunk;
Expand Down Expand Up @@ -40,7 +39,7 @@ fn write_ipc<W: Write + Seek>(writer: W, array: impl Array + 'static) -> Result<
let options = write::WriteOptions { compression: None };
let mut writer = write::FileWriter::new(writer, schema, None, options);

let batch = Chunk::try_new(vec![Arc::new(array) as Arc<dyn Array>])?;
let batch = Chunk::try_new(vec![Box::new(array) as Box<dyn Array>])?;

writer.start()?;
writer.write(&batch, None)?;
Expand All @@ -49,7 +48,7 @@ fn write_ipc<W: Write + Seek>(writer: W, array: impl Array + 'static) -> Result<
Ok(writer.into_inner())
}

fn read_ipc(buf: &[u8]) -> Result<Chunk<Arc<dyn Array>>> {
fn read_ipc(buf: &[u8]) -> Result<Chunk<Box<dyn Array>>> {
let mut cursor = Cursor::new(buf);
let metadata = read::read_file_metadata(&mut cursor)?;
let mut reader = read::FileReader::new(cursor, metadata, None);
Expand Down
5 changes: 2 additions & 3 deletions examples/ffi.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,9 @@ use arrow2::array::{Array, PrimitiveArray};
use arrow2::datatypes::Field;
use arrow2::error::Result;
use arrow2::ffi;
use std::sync::Arc;

unsafe fn export(
array: Arc<dyn Array>,
array: Box<dyn Array>,
array_ptr: *mut ffi::ArrowArray,
schema_ptr: *mut ffi::ArrowSchema,
) {
Expand All @@ -22,7 +21,7 @@ unsafe fn import(array: Box<ffi::ArrowArray>, schema: &ffi::ArrowSchema) -> Resu

fn main() -> Result<()> {
// let's assume that we have an array:
let array = PrimitiveArray::<i32>::from([Some(1), None, Some(123)]).arced();
let array = PrimitiveArray::<i32>::from([Some(1), None, Some(123)]).boxed();

// the goal is to export this array and import it back via FFI.
// to import, we initialize the structs that will receive the data
Expand Down
5 changes: 2 additions & 3 deletions examples/ipc_file_read.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
use std::fs::File;
use std::sync::Arc;

use arrow2::array::Array;
use arrow2::chunk::Chunk;
Expand All @@ -9,7 +8,7 @@ use arrow2::io::ipc::read;
use arrow2::io::print;

/// Simplest way: read all record batches from the file. This can be used e.g. for random access.
fn read_batches(path: &str) -> Result<(Schema, Vec<Chunk<Arc<dyn Array>>>)> {
fn read_batches(path: &str) -> Result<(Schema, Vec<Chunk<Box<dyn Array>>>)> {
let mut file = File::open(path)?;

// read the files' metadata. At this point, we can distribute the read whatever we like.
Expand All @@ -25,7 +24,7 @@ fn read_batches(path: &str) -> Result<(Schema, Vec<Chunk<Arc<dyn Array>>>)> {
}

/// Random access way: read a single record batch from the file. This can be used e.g. for random access.
fn read_batch(path: &str) -> Result<(Schema, Chunk<Arc<dyn Array>>)> {
fn read_batch(path: &str) -> Result<(Schema, Chunk<Box<dyn Array>>)> {
let mut file = File::open(path)?;

// read the files' metadata. At this point, we can distribute the read whatever we like.
Expand Down
5 changes: 2 additions & 3 deletions examples/ipc_file_write.rs
Original file line number Diff line number Diff line change
@@ -1,13 +1,12 @@
use std::fs::File;
use std::sync::Arc;

use arrow2::array::{Array, Int32Array, Utf8Array};
use arrow2::chunk::Chunk;
use arrow2::datatypes::{DataType, Field, Schema};
use arrow2::error::Result;
use arrow2::io::ipc::write;

fn write_batches(path: &str, schema: Schema, columns: &[Chunk<Arc<dyn Array>>]) -> Result<()> {
fn write_batches(path: &str, schema: Schema, columns: &[Chunk<Box<dyn Array>>]) -> Result<()> {
let file = File::create(path)?;

let options = write::WriteOptions { compression: None };
Expand Down Expand Up @@ -35,7 +34,7 @@ fn main() -> Result<()> {
let a = Int32Array::from_slice(&[1, 2, 3, 4, 5]);
let b = Utf8Array::<i32>::from_slice(&["a", "b", "c", "d", "e"]);

let batch = Chunk::try_new(vec![a.arced(), b.arced()])?;
let batch = Chunk::try_new(vec![a.boxed(), b.boxed()])?;

// write it
write_batches(file_path, schema, &[batch])?;
Expand Down
3 changes: 1 addition & 2 deletions examples/json_read.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,11 @@
/// Example of reading a JSON file.
use std::fs;
use std::sync::Arc;

use arrow2::array::Array;
use arrow2::error::Result;
use arrow2::io::json::read;

fn read_path(path: &str) -> Result<Arc<dyn Array>> {
fn read_path(path: &str) -> Result<Box<dyn Array>> {
// read the file into memory (IO-bounded)
let data = fs::read(path)?;

Expand Down
3 changes: 1 addition & 2 deletions examples/ndjson_read.rs
Original file line number Diff line number Diff line change
@@ -1,13 +1,12 @@
use std::fs::File;
use std::io::{BufReader, Seek};
use std::sync::Arc;

use arrow2::array::Array;
use arrow2::error::Result;
use arrow2::io::ndjson::read;
use arrow2::io::ndjson::read::FallibleStreamingIterator;

fn read_path(path: &str) -> Result<Vec<Arc<dyn Array>>> {
fn read_path(path: &str) -> Result<Vec<Box<dyn Array>>> {
let batch_size = 1024; // number of rows per array
let mut reader = BufReader::new(File::open(path)?);

Expand Down
3 changes: 1 addition & 2 deletions examples/parquet_read_async.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
use std::sync::Arc;
use std::time::SystemTime;

use futures::future::BoxFuture;
Expand All @@ -15,7 +14,7 @@ async fn main() -> Result<()> {

use std::env;
let args: Vec<String> = env::args().collect();
let file_path = Arc::new(args[1].clone());
let file_path = Box::new(args[1].clone());

// # Read metadata
let mut reader = BufReader::new(File::open(file_path.as_ref()).await?).compat();
Expand Down
Loading

0 comments on commit e0f4cee

Please sign in to comment.