Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Optional async #174

Merged
merged 1 commit into from Aug 10, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/coverage.yml
Expand Up @@ -24,7 +24,7 @@ jobs:
deactivate
- uses: Swatinem/rust-cache@v1
- name: Generate code coverage
run: cargo llvm-cov --lcov --output-path lcov.info
run: cargo llvm-cov --features full --lcov --output-path lcov.info
- name: Upload coverage to Codecov
uses: codecov/codecov-action@v1
with:
Expand Down
6 changes: 4 additions & 2 deletions .github/workflows/test.yml
Expand Up @@ -23,9 +23,11 @@ jobs:
python tests/write_pyarrow.py
deactivate
- name: Run
run: cargo test
run: |
cargo test --features full
cargo check # compiles without async
- name: Run lz4-flex
run: cargo test --no-default-features --features lz4_flex,bloom_filter,snappy,brotli,zstd,gzip
run: cargo test --no-default-features --features lz4_flex,bloom_filter,snappy,brotli,zstd,gzip,async

clippy:
name: Clippy
Expand Down
8 changes: 5 additions & 3 deletions Cargo.toml
Expand Up @@ -15,12 +15,12 @@ name = "parquet2"
bench = false

[dependencies]
parquet-format-safe = "0.1"
parquet-format-safe = "0.2"
bitpacking = { version = "0.8.2", default-features = false, features = ["bitpacker1x"] }
streaming-decompression = "0.1"

async-stream = { version = "0.3.2" }
futures = { version = "0.3" }
async-stream = { version = "0.3.2", optional = true }
futures = { version = "0.3", optional = true }

snap = { version = "^1.0", optional = true }
brotli = { version = "^3.3", optional = true }
Expand All @@ -38,6 +38,8 @@ rand = "0.8"

[features]
default = ["snappy", "gzip", "lz4", "zstd", "brotli", "bloom_filter"]
full = ["snappy", "gzip", "lz4", "zstd", "brotli", "bloom_filter", "async"]
async = [ "async-stream", "futures", "parquet-format-safe/async" ]
snappy = ["snap"]
gzip = ["flate2/rust_backend"]
gzip_zlib_ng = ["flate2/zlib-ng"]
Expand Down
6 changes: 6 additions & 0 deletions src/read/mod.rs
Expand Up @@ -3,6 +3,7 @@ mod indexes;
pub mod levels;
mod metadata;
mod page;
#[cfg(feature = "async")]
mod stream;

use std::io::{Read, Seek, SeekFrom};
Expand All @@ -11,8 +12,13 @@ use std::vec::IntoIter;

pub use compression::{decompress, BasicDecompressor, Decompressor};
pub use metadata::{deserialize_metadata, read_metadata};
#[cfg(feature = "async")]
#[cfg_attr(docsrs, doc(cfg(feature = "async")))]
pub use page::{get_page_stream, get_page_stream_from_column_start};
pub use page::{IndexedPageReader, PageFilter, PageIterator, PageMetaData, PageReader};

#[cfg(feature = "async")]
#[cfg_attr(docsrs, doc(cfg(feature = "async")))]
pub use stream::read_metadata as read_metadata_async;

use crate::error::Error;
Expand Down
6 changes: 4 additions & 2 deletions src/read/page/mod.rs
@@ -1,5 +1,6 @@
mod indexed_reader;
mod reader;
#[cfg(feature = "async")]
mod stream;

use crate::{error::Error, page::CompressedPage};
Expand All @@ -11,5 +12,6 @@ pub trait PageIterator: Iterator<Item = Result<CompressedPage, Error>> {
fn swap_buffer(&mut self, buffer: &mut Vec<u8>);
}

pub use stream::get_page_stream;
pub use stream::get_page_stream_from_column_start;
#[cfg(feature = "async")]
#[cfg_attr(docsrs, doc(cfg(feature = "async")))]
pub use stream::{get_page_stream, get_page_stream_from_column_start};
17 changes: 12 additions & 5 deletions src/write/column_chunk.rs
@@ -1,12 +1,14 @@
use std::collections::HashSet;
use std::io::Write;

use futures::AsyncWrite;
use parquet_format_safe::thrift::protocol::{
TCompactOutputProtocol, TCompactOutputStreamProtocol, TOutputProtocol, TOutputStreamProtocol,
};
use parquet_format_safe::thrift::protocol::{TCompactOutputProtocol, TOutputProtocol};
use parquet_format_safe::{ColumnChunk, ColumnMetaData, Type};

#[cfg(feature = "async")]
use futures::AsyncWrite;
#[cfg(feature = "async")]
use parquet_format_safe::thrift::protocol::{TCompactOutputStreamProtocol, TOutputStreamProtocol};

use crate::statistics::serialize_statistics;
use crate::FallibleStreamingIterator;
use crate::{
Expand All @@ -17,7 +19,10 @@ use crate::{
page::{CompressedPage, PageType},
};

use super::page::{write_page, write_page_async, PageWriteSpec};
#[cfg(feature = "async")]
use super::page::write_page_async;

use super::page::{write_page, PageWriteSpec};
use super::statistics::reduce;
use super::DynStreamingIterator;

Expand Down Expand Up @@ -58,6 +63,8 @@ where
Ok((column_chunk, specs, bytes_written))
}

#[cfg(feature = "async")]
#[cfg_attr(docsrs, doc(cfg(feature = "async")))]
pub async fn write_column_chunk_async<W, E>(
writer: &mut W,
mut offset: u64,
Expand Down
12 changes: 10 additions & 2 deletions src/write/indexes/write.rs
@@ -1,7 +1,11 @@
use futures::AsyncWrite;
use std::io::Write;

use parquet_format_safe::thrift::protocol::{TCompactOutputProtocol, TCompactOutputStreamProtocol};
#[cfg(feature = "async")]
use futures::AsyncWrite;
#[cfg(feature = "async")]
use parquet_format_safe::thrift::protocol::TCompactOutputStreamProtocol;

use parquet_format_safe::thrift::protocol::TCompactOutputProtocol;

use crate::error::Result;
pub use crate::metadata::KeyValue;
Expand All @@ -16,6 +20,8 @@ pub fn write_column_index<W: Write>(writer: &mut W, pages: &[PageWriteSpec]) ->
Ok(index.write_to_out_protocol(&mut protocol)? as u64)
}

#[cfg(feature = "async")]
#[cfg_attr(docsrs, doc(cfg(feature = "async")))]
pub async fn write_column_index_async<W: AsyncWrite + Unpin + Send>(
writer: &mut W,
pages: &[PageWriteSpec],
Expand All @@ -31,6 +37,8 @@ pub fn write_offset_index<W: Write>(writer: &mut W, pages: &[PageWriteSpec]) ->
Ok(index.write_to_out_protocol(&mut protocol)? as u64)
}

#[cfg(feature = "async")]
#[cfg_attr(docsrs, doc(cfg(feature = "async")))]
pub async fn write_offset_index_async<W: AsyncWrite + Unpin + Send>(
writer: &mut W,
pages: &[PageWriteSpec],
Expand Down
3 changes: 3 additions & 0 deletions src/write/mod.rs
Expand Up @@ -6,7 +6,10 @@ pub(crate) mod page;
mod row_group;
pub(self) mod statistics;

#[cfg(feature = "async")]
mod stream;
#[cfg(feature = "async")]
#[cfg_attr(docsrs, doc(cfg(feature = "async")))]
pub use stream::FileStreamer;

mod dyn_iter;
Expand Down
10 changes: 9 additions & 1 deletion src/write/page.rs
Expand Up @@ -2,8 +2,12 @@ use std::convert::TryInto;
use std::io::Write;
use std::sync::Arc;

#[cfg(feature = "async")]
use futures::{AsyncWrite, AsyncWriteExt};
use parquet_format_safe::thrift::protocol::{TCompactOutputProtocol, TCompactOutputStreamProtocol};
#[cfg(feature = "async")]
use parquet_format_safe::thrift::protocol::TCompactOutputStreamProtocol;

use parquet_format_safe::thrift::protocol::TCompactOutputProtocol;
use parquet_format_safe::{DictionaryPageHeader, Encoding, PageType};

use crate::compression::Compression;
Expand Down Expand Up @@ -91,6 +95,8 @@ pub fn write_page<W: Write>(
})
}

#[cfg(feature = "async")]
#[cfg_attr(docsrs, doc(cfg(feature = "async")))]
pub async fn write_page_async<W: AsyncWrite + Unpin + Send>(
writer: &mut W,
offset: u64,
Expand Down Expand Up @@ -197,6 +203,8 @@ fn write_page_header<W: Write>(mut writer: &mut W, header: &ParquetPageHeader) -
Ok(header.write_to_out_protocol(&mut protocol)? as u64)
}

#[cfg(feature = "async")]
#[cfg_attr(docsrs, doc(cfg(feature = "async")))]
/// writes the page header into `writer`, returning the number of bytes used in the process.
async fn write_page_header_async<W: AsyncWrite + Unpin + Send>(
mut writer: &mut W,
Expand Down
9 changes: 8 additions & 1 deletion src/write/row_group.rs
@@ -1,6 +1,8 @@
use std::io::Write;

#[cfg(feature = "async")]
use futures::AsyncWrite;

use parquet_format_safe::{ColumnChunk, RowGroup};

use crate::{
Expand All @@ -9,8 +11,11 @@ use crate::{
page::CompressedPage,
};

#[cfg(feature = "async")]
use super::column_chunk::write_column_chunk_async;

use super::{
column_chunk::{write_column_chunk, write_column_chunk_async},
column_chunk::write_column_chunk,
page::{is_data_page, PageWriteSpec},
DynIter, DynStreamingIterator,
};
Expand Down Expand Up @@ -137,6 +142,8 @@ where
))
}

#[cfg(feature = "async")]
#[cfg_attr(docsrs, doc(cfg(feature = "async")))]
pub async fn write_row_group_async<
'a,
W,
Expand Down