Skip to content

Commit

Permalink
Optional async (#174)
Browse files Browse the repository at this point in the history
Made async optional
  • Loading branch information
jorgecarleitao committed Aug 10, 2022
1 parent e556d78 commit da24ddc
Show file tree
Hide file tree
Showing 10 changed files with 62 additions and 17 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/coverage.yml
Expand Up @@ -24,7 +24,7 @@ jobs:
deactivate
- uses: Swatinem/rust-cache@v1
- name: Generate code coverage
run: cargo llvm-cov --lcov --output-path lcov.info
run: cargo llvm-cov --features full --lcov --output-path lcov.info
- name: Upload coverage to Codecov
uses: codecov/codecov-action@v1
with:
Expand Down
6 changes: 4 additions & 2 deletions .github/workflows/test.yml
Expand Up @@ -23,9 +23,11 @@ jobs:
python tests/write_pyarrow.py
deactivate
- name: Run
run: cargo test
run: |
cargo test --features full
cargo check # compiles without async
- name: Run lz4-flex
run: cargo test --no-default-features --features lz4_flex,bloom_filter,snappy,brotli,zstd,gzip
run: cargo test --no-default-features --features lz4_flex,bloom_filter,snappy,brotli,zstd,gzip,async

clippy:
name: Clippy
Expand Down
8 changes: 5 additions & 3 deletions Cargo.toml
Expand Up @@ -15,12 +15,12 @@ name = "parquet2"
bench = false

[dependencies]
parquet-format-safe = "0.1"
parquet-format-safe = "0.2"
bitpacking = { version = "0.8.2", default-features = false, features = ["bitpacker1x"] }
streaming-decompression = "0.1"

async-stream = { version = "0.3.2" }
futures = { version = "0.3" }
async-stream = { version = "0.3.2", optional = true }
futures = { version = "0.3", optional = true }

snap = { version = "^1.0", optional = true }
brotli = { version = "^3.3", optional = true }
Expand All @@ -38,6 +38,8 @@ rand = "0.8"

[features]
default = ["snappy", "gzip", "lz4", "zstd", "brotli", "bloom_filter"]
full = ["snappy", "gzip", "lz4", "zstd", "brotli", "bloom_filter", "async"]
async = [ "async-stream", "futures", "parquet-format-safe/async" ]
snappy = ["snap"]
gzip = ["flate2/rust_backend"]
gzip_zlib_ng = ["flate2/zlib-ng"]
Expand Down
6 changes: 6 additions & 0 deletions src/read/mod.rs
Expand Up @@ -3,6 +3,7 @@ mod indexes;
pub mod levels;
mod metadata;
mod page;
#[cfg(feature = "async")]
mod stream;

use std::io::{Read, Seek, SeekFrom};
Expand All @@ -11,8 +12,13 @@ use std::vec::IntoIter;

pub use compression::{decompress, BasicDecompressor, Decompressor};
pub use metadata::{deserialize_metadata, read_metadata};
#[cfg(feature = "async")]
#[cfg_attr(docsrs, doc(cfg(feature = "async")))]
pub use page::{get_page_stream, get_page_stream_from_column_start};
pub use page::{IndexedPageReader, PageFilter, PageIterator, PageMetaData, PageReader};

#[cfg(feature = "async")]
#[cfg_attr(docsrs, doc(cfg(feature = "async")))]
pub use stream::read_metadata as read_metadata_async;

use crate::error::Error;
Expand Down
6 changes: 4 additions & 2 deletions src/read/page/mod.rs
@@ -1,5 +1,6 @@
mod indexed_reader;
mod reader;
#[cfg(feature = "async")]
mod stream;

use crate::{error::Error, page::CompressedPage};
Expand All @@ -11,5 +12,6 @@ pub trait PageIterator: Iterator<Item = Result<CompressedPage, Error>> {
fn swap_buffer(&mut self, buffer: &mut Vec<u8>);
}

pub use stream::get_page_stream;
pub use stream::get_page_stream_from_column_start;
#[cfg(feature = "async")]
#[cfg_attr(docsrs, doc(cfg(feature = "async")))]
pub use stream::{get_page_stream, get_page_stream_from_column_start};
17 changes: 12 additions & 5 deletions src/write/column_chunk.rs
@@ -1,12 +1,14 @@
use std::collections::HashSet;
use std::io::Write;

use futures::AsyncWrite;
use parquet_format_safe::thrift::protocol::{
TCompactOutputProtocol, TCompactOutputStreamProtocol, TOutputProtocol, TOutputStreamProtocol,
};
use parquet_format_safe::thrift::protocol::{TCompactOutputProtocol, TOutputProtocol};
use parquet_format_safe::{ColumnChunk, ColumnMetaData, Type};

#[cfg(feature = "async")]
use futures::AsyncWrite;
#[cfg(feature = "async")]
use parquet_format_safe::thrift::protocol::{TCompactOutputStreamProtocol, TOutputStreamProtocol};

use crate::statistics::serialize_statistics;
use crate::FallibleStreamingIterator;
use crate::{
Expand All @@ -17,7 +19,10 @@ use crate::{
page::{CompressedPage, PageType},
};

use super::page::{write_page, write_page_async, PageWriteSpec};
#[cfg(feature = "async")]
use super::page::write_page_async;

use super::page::{write_page, PageWriteSpec};
use super::statistics::reduce;
use super::DynStreamingIterator;

Expand Down Expand Up @@ -58,6 +63,8 @@ where
Ok((column_chunk, specs, bytes_written))
}

#[cfg(feature = "async")]
#[cfg_attr(docsrs, doc(cfg(feature = "async")))]
pub async fn write_column_chunk_async<W, E>(
writer: &mut W,
mut offset: u64,
Expand Down
12 changes: 10 additions & 2 deletions src/write/indexes/write.rs
@@ -1,7 +1,11 @@
use futures::AsyncWrite;
use std::io::Write;

use parquet_format_safe::thrift::protocol::{TCompactOutputProtocol, TCompactOutputStreamProtocol};
#[cfg(feature = "async")]
use futures::AsyncWrite;
#[cfg(feature = "async")]
use parquet_format_safe::thrift::protocol::TCompactOutputStreamProtocol;

use parquet_format_safe::thrift::protocol::TCompactOutputProtocol;

use crate::error::Result;
pub use crate::metadata::KeyValue;
Expand All @@ -16,6 +20,8 @@ pub fn write_column_index<W: Write>(writer: &mut W, pages: &[PageWriteSpec]) ->
Ok(index.write_to_out_protocol(&mut protocol)? as u64)
}

#[cfg(feature = "async")]
#[cfg_attr(docsrs, doc(cfg(feature = "async")))]
pub async fn write_column_index_async<W: AsyncWrite + Unpin + Send>(
writer: &mut W,
pages: &[PageWriteSpec],
Expand All @@ -31,6 +37,8 @@ pub fn write_offset_index<W: Write>(writer: &mut W, pages: &[PageWriteSpec]) ->
Ok(index.write_to_out_protocol(&mut protocol)? as u64)
}

#[cfg(feature = "async")]
#[cfg_attr(docsrs, doc(cfg(feature = "async")))]
pub async fn write_offset_index_async<W: AsyncWrite + Unpin + Send>(
writer: &mut W,
pages: &[PageWriteSpec],
Expand Down
3 changes: 3 additions & 0 deletions src/write/mod.rs
Expand Up @@ -6,7 +6,10 @@ pub(crate) mod page;
mod row_group;
pub(self) mod statistics;

#[cfg(feature = "async")]
mod stream;
#[cfg(feature = "async")]
#[cfg_attr(docsrs, doc(cfg(feature = "async")))]
pub use stream::FileStreamer;

mod dyn_iter;
Expand Down
10 changes: 9 additions & 1 deletion src/write/page.rs
Expand Up @@ -2,8 +2,12 @@ use std::convert::TryInto;
use std::io::Write;
use std::sync::Arc;

#[cfg(feature = "async")]
use futures::{AsyncWrite, AsyncWriteExt};
use parquet_format_safe::thrift::protocol::{TCompactOutputProtocol, TCompactOutputStreamProtocol};
#[cfg(feature = "async")]
use parquet_format_safe::thrift::protocol::TCompactOutputStreamProtocol;

use parquet_format_safe::thrift::protocol::TCompactOutputProtocol;
use parquet_format_safe::{DictionaryPageHeader, Encoding, PageType};

use crate::compression::Compression;
Expand Down Expand Up @@ -91,6 +95,8 @@ pub fn write_page<W: Write>(
})
}

#[cfg(feature = "async")]
#[cfg_attr(docsrs, doc(cfg(feature = "async")))]
pub async fn write_page_async<W: AsyncWrite + Unpin + Send>(
writer: &mut W,
offset: u64,
Expand Down Expand Up @@ -197,6 +203,8 @@ fn write_page_header<W: Write>(mut writer: &mut W, header: &ParquetPageHeader) -
Ok(header.write_to_out_protocol(&mut protocol)? as u64)
}

#[cfg(feature = "async")]
#[cfg_attr(docsrs, doc(cfg(feature = "async")))]
/// writes the page header into `writer`, returning the number of bytes used in the process.
async fn write_page_header_async<W: AsyncWrite + Unpin + Send>(
mut writer: &mut W,
Expand Down
9 changes: 8 additions & 1 deletion src/write/row_group.rs
@@ -1,6 +1,8 @@
use std::io::Write;

#[cfg(feature = "async")]
use futures::AsyncWrite;

use parquet_format_safe::{ColumnChunk, RowGroup};

use crate::{
Expand All @@ -9,8 +11,11 @@ use crate::{
page::CompressedPage,
};

#[cfg(feature = "async")]
use super::column_chunk::write_column_chunk_async;

use super::{
column_chunk::{write_column_chunk, write_column_chunk_async},
column_chunk::write_column_chunk,
page::{is_data_page, PageWriteSpec},
DynIter, DynStreamingIterator,
};
Expand Down Expand Up @@ -137,6 +142,8 @@ where
))
}

#[cfg(feature = "async")]
#[cfg_attr(docsrs, doc(cfg(feature = "async")))]
pub async fn write_row_group_async<
'a,
W,
Expand Down

0 comments on commit da24ddc

Please sign in to comment.