Skip to content

Commit

Permalink
Faster parquet bloom filter writer (apache#3320)
Browse files Browse the repository at this point in the history
  • Loading branch information
tustvold committed Dec 12, 2022
1 parent ad94368 commit bc12b27
Showing 1 changed file with 17 additions and 5 deletions.
22 changes: 17 additions & 5 deletions parquet/src/bloom_filter/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ use crate::format::{
};
use bytes::{Buf, Bytes};
use std::hash::Hasher;
use std::io::{BufWriter, Write};
use std::io::Write;
use std::sync::Arc;
use thrift::protocol::{
TCompactInputProtocol, TCompactOutputProtocol, TOutputProtocol, TSerializable,
Expand Down Expand Up @@ -177,22 +177,33 @@ impl Sbbf {
}

/// Write the bloom filter data (header and then bitset) to the output
pub(crate) fn write<W: Write>(&self, writer: W) -> Result<(), ParquetError> {
// Use a BufWriter to avoid costs of writing individual blocks
let mut writer = BufWriter::new(writer);
pub(crate) fn write<W: Write>(&self, mut writer: W) -> Result<(), ParquetError> {
let mut protocol = TCompactOutputProtocol::new(&mut writer);
let header = self.header();
header.write_to_out_protocol(&mut protocol).map_err(|e| {
ParquetError::General(format!("Could not write bloom filter header: {}", e))
})?;
protocol.flush()?;
self.write_bitset(&mut writer)?;
writer.flush()?;
Ok(())
}

/// Write the bitset in serialized form to the writer.
#[cfg(target_endian = "little")]
fn write_bitset<W: Write>(&self, mut writer: W) -> Result<(), ParquetError> {
// SAFETY:
// `Block` can be safely transmuted to `u8`
let (prefix, aligned, suffix) = unsafe { self.0.as_slice().align_to::<u8>() };
assert!(prefix.is_empty() && suffix.is_empty());

writer.write_all(&aligned).map_err(|e| {
ParquetError::General(format!("Could not write bloom filter bit set: {}", e))
})
}

#[cfg(not(target_endian = "little"))]
fn write_bitset<W: Write>(&self, mut writer: W) -> Result<(), ParquetError> {
let mut writer = std::io::BufWriter::new(writer);
for block in &self.0 {
for word in block {
writer.write_all(&word.to_le_bytes()).map_err(|e| {
Expand All @@ -203,6 +214,7 @@ impl Sbbf {
})?;
}
}
writer.flush()?;
Ok(())
}

Expand Down

0 comments on commit bc12b27

Please sign in to comment.