Skip to content
This repository has been archived by the owner on Feb 18, 2024. It is now read-only.

Commit

Permalink
Write 64bit aligned (#1201)
Browse files Browse the repository at this point in the history
  • Loading branch information
jorgecarleitao committed Aug 3, 2022
1 parent 72de99c commit 3db00fc
Show file tree
Hide file tree
Showing 4 changed files with 12 additions and 12 deletions.
4 changes: 2 additions & 2 deletions src/io/ipc/write/common.rs
Expand Up @@ -370,8 +370,8 @@ pub struct EncodedData {

/// Calculate an 8-byte boundary and return the number of bytes needed to pad to 8 bytes
#[inline]
pub(crate) fn pad_to_8(len: usize) -> usize {
(((len + 7) & !7) - len) as usize
pub(crate) fn pad_to_64(len: usize) -> usize {
(((len + 63) & !63) - len) as usize
}

/// An array [`Chunk`] with optional accompanying IPC fields.
Expand Down
8 changes: 4 additions & 4 deletions src/io/ipc/write/common_async.rs
Expand Up @@ -4,7 +4,7 @@ use futures::AsyncWriteExt;
use crate::error::Result;

use super::super::CONTINUATION_MARKER;
use super::common::pad_to_8;
use super::common::pad_to_64;
use super::common::EncodedData;

/// Write a message's IPC data and buffers, returning metadata and buffer data lengths written
Expand All @@ -14,10 +14,10 @@ pub async fn write_message<W: AsyncWrite + Unpin + Send>(
) -> Result<(usize, usize)> {
let arrow_data_len = encoded.arrow_data.len();

let a = 8 - 1;
let a = 64 - 1;
let buffer = encoded.ipc_message;
let flatbuf_size = buffer.len();
let prefix_size = 8;
let prefix_size = 8; // the message length
let aligned_size = (flatbuf_size + prefix_size + a) & !a;
let padding_bytes = aligned_size - flatbuf_size - prefix_size;

Expand Down Expand Up @@ -57,7 +57,7 @@ async fn write_body_buffers<W: AsyncWrite + Unpin + Send>(
data: &[u8],
) -> Result<usize> {
let len = data.len();
let pad_len = pad_to_8(data.len());
let pad_len = pad_to_64(data.len());
let total_len = len + pad_len;

// write body buffer
Expand Down
4 changes: 2 additions & 2 deletions src/io/ipc/write/common_sync.rs
Expand Up @@ -3,7 +3,7 @@ use std::io::Write;
use crate::error::Result;

use super::super::CONTINUATION_MARKER;
use super::common::pad_to_8;
use super::common::pad_to_64;
use super::common::EncodedData;

/// Write a message's IPC data and buffers, returning metadata and buffer data lengths written
Expand Down Expand Up @@ -38,7 +38,7 @@ pub fn write_message<W: Write>(writer: &mut W, encoded: EncodedData) -> Result<(

fn write_body_buffers<W: Write>(mut writer: W, data: &[u8]) -> Result<usize> {
let len = data.len();
let pad_len = pad_to_8(data.len());
let pad_len = pad_to_64(data.len());
let total_len = len + pad_len;

// write body buffer
Expand Down
8 changes: 4 additions & 4 deletions src/io/ipc/write/serialize.rs
Expand Up @@ -7,7 +7,7 @@ use crate::{

use super::super::compression;
use super::super::endianess::is_native_little_endian;
use super::common::{pad_to_8, Compression};
use super::common::{pad_to_64, Compression};

fn write_primitive<T: NativeType>(
array: &PrimitiveArray<T>,
Expand Down Expand Up @@ -564,8 +564,8 @@ pub fn write(
}

#[inline]
fn pad_buffer_to_8(buffer: &mut Vec<u8>, length: usize) {
let pad_len = pad_to_8(length);
fn pad_buffer_to_64(buffer: &mut Vec<u8>, length: usize) {
let pad_len = pad_to_64(length);
buffer.extend_from_slice(&vec![0u8; pad_len]);
}

Expand Down Expand Up @@ -748,7 +748,7 @@ fn write_buffer_from_iter<T: NativeType, I: TrustedLen<Item = T>>(
fn finish_buffer(arrow_data: &mut Vec<u8>, start: usize, offset: &mut i64) -> ipc::Buffer {
let buffer_len = (arrow_data.len() - start) as i64;

pad_buffer_to_8(arrow_data, arrow_data.len() - start);
pad_buffer_to_64(arrow_data, arrow_data.len() - start);
let total_len = (arrow_data.len() - start) as i64;

let buffer = ipc::Buffer {
Expand Down

0 comments on commit 3db00fc

Please sign in to comment.