This repository has been archived by the owner on Feb 18, 2024. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 221
/
extension.rs
55 lines (41 loc) · 1.66 KB
/
extension.rs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
use std::io::{Cursor, Seek, Write};
use std::sync::Arc;
use arrow2::array::*;
use arrow2::chunk::Chunk;
use arrow2::datatypes::*;
use arrow2::error::Result;
use arrow2::io::ipc::read;
use arrow2::io::ipc::write;
fn main() -> Result<()> {
// declare an extension.
let extension_type =
DataType::Extension("date16".to_string(), Box::new(DataType::UInt16), None);
// initialize an array with it.
let array = UInt16Array::from_slice([1, 2]).to(extension_type.clone());
// from here on, it works as usual
let buffer = Cursor::new(vec![]);
// write to IPC
let result_buffer = write_ipc(buffer, array)?;
// read it back
let batch = read_ipc(&result_buffer.into_inner())?;
// and verify that the datatype is preserved.
let array = &batch.columns()[0];
assert_eq!(array.data_type(), &extension_type);
// see https://arrow.apache.org/docs/format/Columnar.html#extension-types
// for consuming by other consumers.
Ok(())
}
fn write_ipc<W: Write + Seek>(writer: W, array: impl Array + 'static) -> Result<W> {
let schema = vec![Field::new("a", array.data_type().clone(), false)].into();
let options = write::WriteOptions { compression: None };
let mut writer = write::FileWriter::try_new(writer, &schema, None, options)?;
let batch = Chunk::try_new(vec![Arc::new(array) as Arc<dyn Array>])?;
writer.write(&batch, None)?;
Ok(writer.into_inner())
}
fn read_ipc(buf: &[u8]) -> Result<Chunk<Arc<dyn Array>>> {
let mut cursor = Cursor::new(buf);
let metadata = read::read_file_metadata(&mut cursor)?;
let mut reader = read::FileReader::new(cursor, metadata, None);
reader.next().unwrap()
}