This repository has been archived by the owner on Feb 18, 2024. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 221
/
decompress.rs
100 lines (90 loc) · 3.14 KB
/
decompress.rs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
//! APIs to read from Avro format to arrow.
use std::io::Read;
use fallible_streaming_iterator::FallibleStreamingIterator;
use crate::error::{ArrowError, Result};
use super::BlockStreamIterator;
use super::Compression;
/// Decompresses an avro block.
/// Returns whether the buffers where swapped.
fn decompress_block(
block: &mut Vec<u8>,
decompress: &mut Vec<u8>,
compression: Option<Compression>,
) -> Result<bool> {
match compression {
None => {
std::mem::swap(block, decompress);
Ok(true)
}
#[cfg(feature = "io_avro_compression")]
Some(Compression::Deflate) => {
decompress.clear();
let mut decoder = libflate::deflate::Decoder::new(&block[..]);
decoder.read_to_end(decompress)?;
Ok(false)
}
#[cfg(feature = "io_avro_compression")]
Some(Compression::Snappy) => {
let len = snap::raw::decompress_len(&block[..block.len() - 4])
.map_err(|_| ArrowError::ExternalFormat("Failed to decompress snap".to_string()))?;
decompress.clear();
decompress.resize(len, 0);
snap::raw::Decoder::new()
.decompress(&block[..block.len() - 4], decompress)
.map_err(|_| ArrowError::ExternalFormat("Failed to decompress snap".to_string()))?;
Ok(false)
}
#[cfg(not(feature = "io_avro_compression"))]
Some(Compression::Deflate) => Err(ArrowError::Other(
"The avro file is deflate-encoded but feature 'io_avro_compression' is not active."
.to_string(),
)),
#[cfg(not(feature = "io_avro_compression"))]
Some(Compression::Snappy) => Err(ArrowError::Other(
"The avro file is snappy-encoded but feature 'io_avro_compression' is not active."
.to_string(),
)),
}
}
/// [`FallibleStreamingIterator`] of decompressed Avro blocks
pub struct Decompressor<R: Read> {
blocks: BlockStreamIterator<R>,
codec: Option<Compression>,
buf: (Vec<u8>, usize),
was_swapped: bool,
}
impl<R: Read> Decompressor<R> {
/// Creates a new [`Decompressor`].
pub fn new(blocks: BlockStreamIterator<R>, codec: Option<Compression>) -> Self {
Self {
blocks,
codec,
buf: (vec![], 0),
was_swapped: false,
}
}
/// Deconstructs itself into its internal reader
pub fn into_inner(self) -> R {
self.blocks.into_inner().0
}
}
impl<'a, R: Read> FallibleStreamingIterator for Decompressor<R> {
type Error = ArrowError;
type Item = (Vec<u8>, usize);
fn advance(&mut self) -> Result<()> {
if self.was_swapped {
std::mem::swap(self.blocks.buffer(), &mut self.buf.0);
}
self.blocks.advance()?;
self.was_swapped = decompress_block(self.blocks.buffer(), &mut self.buf.0, self.codec)?;
self.buf.1 = self.blocks.get().map(|(_, rows)| *rows).unwrap_or_default();
Ok(())
}
fn get(&self) -> Option<&Self::Item> {
if self.buf.1 > 0 {
Some(&self.buf)
} else {
None
}
}
}