This repository has been archived by the owner on Feb 18, 2024. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 221
/
common_sync.rs
61 lines (49 loc) · 1.74 KB
/
common_sync.rs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
use std::io::Write;
use crate::error::Result;
use super::super::CONTINUATION_MARKER;
use super::common::pad_to_64;
use super::common::EncodedData;
/// Write a message's IPC data and buffers, returning metadata and buffer data lengths written
pub fn write_message<W: Write>(writer: &mut W, encoded: EncodedData) -> Result<(usize, usize)> {
let arrow_data_len = encoded.arrow_data.len();
let a = 8 - 1;
let buffer = encoded.ipc_message;
let flatbuf_size = buffer.len();
let prefix_size = 8;
let aligned_size = (flatbuf_size + prefix_size + a) & !a;
let padding_bytes = aligned_size - flatbuf_size - prefix_size;
write_continuation(writer, (aligned_size - prefix_size) as i32)?;
// write the flatbuf
if flatbuf_size > 0 {
writer.write_all(&buffer)?;
}
// write padding
writer.write_all(&vec![0; padding_bytes])?;
// write arrow data
let body_len = if arrow_data_len > 0 {
write_body_buffers(writer, &encoded.arrow_data)?
} else {
0
};
Ok((aligned_size, body_len))
}
fn write_body_buffers<W: Write>(mut writer: W, data: &[u8]) -> Result<usize> {
let len = data.len();
let pad_len = pad_to_64(data.len());
let total_len = len + pad_len;
// write body buffer
writer.write_all(data)?;
if pad_len > 0 {
writer.write_all(&vec![0u8; pad_len][..])?;
}
writer.flush()?;
Ok(total_len)
}
/// Write a record batch to the writer, writing the message size before the message
/// if the record batch is being written to a stream
pub fn write_continuation<W: Write>(writer: &mut W, total_len: i32) -> Result<usize> {
writer.write_all(&CONTINUATION_MARKER)?;
writer.write_all(&total_len.to_le_bytes()[..])?;
writer.flush()?;
Ok(8)
}