forked from jorgecarleitao/arrow2
-
Notifications
You must be signed in to change notification settings - Fork 0
/
write_file_async.rs
62 lines (49 loc) · 1.85 KB
/
write_file_async.rs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
use std::io::Cursor;
use arrow2::array::Array;
use arrow2::chunk::Chunk;
use arrow2::datatypes::Schema;
use arrow2::error::Result;
use arrow2::io::ipc::read;
use arrow2::io::ipc::write::file_async::FileSink;
use arrow2::io::ipc::write::WriteOptions;
use arrow2::io::ipc::IpcField;
use futures::io::Cursor as AsyncCursor;
use futures::SinkExt;
use crate::io::ipc::common::read_arrow_stream;
use crate::io::ipc::common::read_gzip_json;
async fn write_(
schema: &Schema,
ipc_fields: &[IpcField],
batches: &[Chunk<Box<dyn Array>>],
) -> Result<Vec<u8>> {
let mut result = AsyncCursor::new(vec![]);
let options = WriteOptions { compression: None };
let mut sink = FileSink::new(&mut result, schema, Some(ipc_fields.to_vec()), options);
for batch in batches {
sink.feed((batch, Some(ipc_fields)).into()).await?;
}
sink.close().await?;
drop(sink);
Ok(result.into_inner())
}
async fn test_file(version: &str, file_name: &str) -> Result<()> {
let (schema, ipc_fields, batches) = read_arrow_stream(version, file_name, None);
let result = write_(&schema, &ipc_fields, &batches).await?;
let mut reader = Cursor::new(result);
let metadata = read::read_file_metadata(&mut reader)?;
let reader = read::FileReader::new(reader, metadata, None);
let schema = &reader.metadata().schema;
let ipc_fields = reader.metadata().ipc_schema.fields.clone();
// read expected JSON output
let (expected_schema, expected_ipc_fields, expected_batches) =
read_gzip_json(version, file_name).unwrap();
assert_eq!(schema, &expected_schema);
assert_eq!(ipc_fields, expected_ipc_fields);
let batches = reader.collect::<Result<Vec<_>>>()?;
assert_eq!(batches, expected_batches);
Ok(())
}
#[tokio::test]
async fn write_async() -> Result<()> {
test_file("1.0.0-littleendian", "generated_primitive").await
}