diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 93dac8d4bc..0027271cb1 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -309,12 +309,14 @@ jobs: os: [ubuntu-latest] rust: [stable] rust-target: [x86_64-unknown-linux-gnu] + compression: [none, gzip, snappy, lz4] env: RUST_BACKTRACE: full RUSTV: ${{ matrix.rust }} TARGET: ${{ matrix.rust-target }} RELEASE: true RELEASE_NAME: release + FLV_CLIENT_DEFAULT_COMPRESSION_CODEC: ${{ matrix.compression }} steps: - uses: actions/checkout@v2 - name: Install Rust ${{ matrix.rust }} @@ -444,6 +446,7 @@ jobs: FLUVIO_BIN: ~/bin/fluvio TEST_BIN: ~/bin/fluvio-test SERVER_LOG: fluvio=debug + FLV_CLIENT_DEFAULT_COMPRESSION_CODEC: ${{ matrix.compression }} strategy: fail-fast: false matrix: @@ -461,7 +464,7 @@ jobs: batch-failure, batch, ] - + compression: [none, gzip, snappy, lz4] steps: - uses: actions/checkout@v2 - name: Download artifact - fluvio @@ -604,6 +607,7 @@ jobs: TEST_BIN: ~/bin/fluvio-test UNINSTALL: noclean SERVER_LOG: fluvio=debug + FLV_CLIENT_DEFAULT_COMPRESSION_CODEC: ${{ matrix.compression }} strategy: fail-fast: false matrix: @@ -613,6 +617,7 @@ jobs: test: [smoke-test-k8, smoke-test-k8-tls, smoke-test-k8-tls-root-unclean] k8: [k3d, minikube] spu: [2] + compression: [none, gzip, snappy, lz4] steps: - uses: actions/checkout@v2 # Download artifacts @@ -717,11 +722,14 @@ jobs: name: Upgrade cluster test on (${{ matrix.run }}) needs: build_image runs-on: ${{ matrix.os }} + env: + FLV_CLIENT_DEFAULT_COMPRESSION_CODEC: ${{ matrix.compression }} strategy: fail-fast: false matrix: os: [ubuntu-latest] run: [r1] + compression: [none, gzip, snappy, lz4] steps: - uses: actions/checkout@v2 - name: Setup K3d @@ -822,10 +830,13 @@ jobs: needs: build_image #if: ${{ false }} runs-on: ${{ matrix.os }} + env: + FLV_CLIENT_DEFAULT_COMPRESSION_CODEC: ${{ matrix.compression }} strategy: fail-fast: false matrix: os: [ubuntu-latest] + compression: [none, gzip, snappy, lz4] steps: - uses: actions/checkout@v2 - name: Install Rust ${{ matrix.rust }} @@ -1038,4 +1049,4 @@ jobs: runs-on: ubuntu-latest steps: - name: Done - run: echo "Done!" \ No newline at end of file + run: echo "Done!" diff --git a/CHANGELOG.md b/CHANGELOG.md index 5c18f11efa..83babe4579 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -8,6 +8,7 @@ * Report SPU error codes to FutureRecordMetadata ([#2228](https://github.com/infinyon/fluvio/issues/2228)) * Optimize partition size computation ([#2230](https://github.com/infinyon/fluvio/issues/2230)) * Fix fluvio-test configuration to support data generator ([#2237](https://github.com/infinyon/fluvio/pull/2237)) +* Add compression support. ([#2082](https://github.com/infinyon/fluvio/issues/2082)) ## Platform Version 0.9.20 - 2022-02-17 * Add `connector update -c config` to update the running configuration of a given existing managed connector ([#2188](https://github.com/infinyon/fluvio/pull/2188)) diff --git a/Cargo.lock b/Cargo.lock index c78c0972c9..8455d9dbd5 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1535,6 +1535,7 @@ dependencies = [ "derive_builder", "dirs 4.0.0", "event-listener", + "fluvio-compression", "fluvio-dataplane-protocol", "fluvio-future", "fluvio-protocol", @@ -1747,6 +1748,17 @@ dependencies = [ "tracing", ] +[[package]] +name = "fluvio-compression" +version = "0.1.0" +dependencies = [ + "flate2", + "lz4_flex", + "serde", + "snap", + "thiserror", +] + [[package]] name = "fluvio-controlplane" version = "0.0.0" @@ -1786,6 +1798,7 @@ dependencies = [ "crc32c", "derive_builder", "eyre", + "fluvio-compression", "fluvio-dataplane-protocol", "fluvio-future", "fluvio-protocol", @@ -1990,7 +2003,7 @@ dependencies = [ [[package]] name = "fluvio-smartengine" -version = "0.2.5" +version = "0.2.6" dependencies = [ "anyhow", "flate2", @@ -2065,6 +2078,7 @@ dependencies = [ "event-listener", "flate2", "fluvio", + "fluvio-compression", "fluvio-controlplane", "fluvio-controlplane-metadata", "fluvio-dataplane-protocol", @@ -3169,6 +3183,15 @@ dependencies = [ "value-bag", ] +[[package]] +name = "lz4_flex" +version = "0.9.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "42c51df9d8d4842336c835df1d85ed447c4813baa237d033d95128bf5552ad8a" +dependencies = [ + "twox-hash", +] + [[package]] name = "mach" version = "0.3.2" @@ -4467,6 +4490,12 @@ version = "1.8.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "f2dd574626839106c320a323308629dcb1acfc96e32a8cba364ddc61ac23ee83" +[[package]] +name = "snap" +version = "1.0.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "45456094d1983e2ee2a18fdfebce3189fa451699d0502cb8e3b49dba5ba41451" + [[package]] name = "socket2" version = "0.4.4" @@ -5018,6 +5047,16 @@ dependencies = [ "unicode-width", ] +[[package]] +name = "twox-hash" +version = "1.6.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4ee73e6e4924fe940354b8d4d98cad5231175d615cd855b758adc658c0aac6a0" +dependencies = [ + "cfg-if", + "static_assertions", +] + [[package]] name = "typenum" version = "1.15.0" diff --git a/Cargo.toml b/Cargo.toml index 6377378b2a..97f92d2921 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -13,6 +13,7 @@ members = [ "crates/fluvio-cli", "crates/fluvio-cli-common", "crates/fluvio-cluster", + "crates/fluvio-compression", "crates/fluvio-controlplane", "crates/fluvio-controlplane-metadata", "crates/fluvio-dataplane-protocol", diff --git a/crates/fluvio-compression/Cargo.toml b/crates/fluvio-compression/Cargo.toml new file mode 100644 index 0000000000..e510a4ceb2 --- /dev/null +++ b/crates/fluvio-compression/Cargo.toml @@ -0,0 +1,20 @@ +[package] +name = "fluvio-compression" +version = "0.1.0" +edition = "2021" +license = "Apache-2.0" +authors = ["Fluvio Contributors "] +categories = ["compression"] +keywords = ["fluvio", "compression"] +repository = "https://github.com/infinyon/fluvio" +description = "Fluvio Compression library" + +[features] +default = [] + +[dependencies] +serde = { version = "1.0.110", features = ['derive'] } +flate2 = { version = "1.0.20"} +snap = { version = "1" } +lz4_flex = { version = "0.9", default-features = false, features = ["safe-decode", "safe-encode", "frame"] } +thiserror = "1.0.30" diff --git a/crates/fluvio-compression/LICENSE-APACHE b/crates/fluvio-compression/LICENSE-APACHE new file mode 100644 index 0000000000..261eeb9e9f --- /dev/null +++ b/crates/fluvio-compression/LICENSE-APACHE @@ -0,0 +1,201 @@ + Apache License + Version 2.0, January 2004 + http://www.apache.org/licenses/ + + TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION + + 1. Definitions. + + "License" shall mean the terms and conditions for use, reproduction, + and distribution as defined by Sections 1 through 9 of this document. + + "Licensor" shall mean the copyright owner or entity authorized by + the copyright owner that is granting the License. + + "Legal Entity" shall mean the union of the acting entity and all + other entities that control, are controlled by, or are under common + control with that entity. For the purposes of this definition, + "control" means (i) the power, direct or indirect, to cause the + direction or management of such entity, whether by contract or + otherwise, or (ii) ownership of fifty percent (50%) or more of the + outstanding shares, or (iii) beneficial ownership of such entity. + + "You" (or "Your") shall mean an individual or Legal Entity + exercising permissions granted by this License. + + "Source" form shall mean the preferred form for making modifications, + including but not limited to software source code, documentation + source, and configuration files. + + "Object" form shall mean any form resulting from mechanical + transformation or translation of a Source form, including but + not limited to compiled object code, generated documentation, + and conversions to other media types. + + "Work" shall mean the work of authorship, whether in Source or + Object form, made available under the License, as indicated by a + copyright notice that is included in or attached to the work + (an example is provided in the Appendix below). + + "Derivative Works" shall mean any work, whether in Source or Object + form, that is based on (or derived from) the Work and for which the + editorial revisions, annotations, elaborations, or other modifications + represent, as a whole, an original work of authorship. For the purposes + of this License, Derivative Works shall not include works that remain + separable from, or merely link (or bind by name) to the interfaces of, + the Work and Derivative Works thereof. + + "Contribution" shall mean any work of authorship, including + the original version of the Work and any modifications or additions + to that Work or Derivative Works thereof, that is intentionally + submitted to Licensor for inclusion in the Work by the copyright owner + or by an individual or Legal Entity authorized to submit on behalf of + the copyright owner. For the purposes of this definition, "submitted" + means any form of electronic, verbal, or written communication sent + to the Licensor or its representatives, including but not limited to + communication on electronic mailing lists, source code control systems, + and issue tracking systems that are managed by, or on behalf of, the + Licensor for the purpose of discussing and improving the Work, but + excluding communication that is conspicuously marked or otherwise + designated in writing by the copyright owner as "Not a Contribution." + + "Contributor" shall mean Licensor and any individual or Legal Entity + on behalf of whom a Contribution has been received by Licensor and + subsequently incorporated within the Work. + + 2. Grant of Copyright License. Subject to the terms and conditions of + this License, each Contributor hereby grants to You a perpetual, + worldwide, non-exclusive, no-charge, royalty-free, irrevocable + copyright license to reproduce, prepare Derivative Works of, + publicly display, publicly perform, sublicense, and distribute the + Work and such Derivative Works in Source or Object form. + + 3. Grant of Patent License. Subject to the terms and conditions of + this License, each Contributor hereby grants to You a perpetual, + worldwide, non-exclusive, no-charge, royalty-free, irrevocable + (except as stated in this section) patent license to make, have made, + use, offer to sell, sell, import, and otherwise transfer the Work, + where such license applies only to those patent claims licensable + by such Contributor that are necessarily infringed by their + Contribution(s) alone or by combination of their Contribution(s) + with the Work to which such Contribution(s) was submitted. If You + institute patent litigation against any entity (including a + cross-claim or counterclaim in a lawsuit) alleging that the Work + or a Contribution incorporated within the Work constitutes direct + or contributory patent infringement, then any patent licenses + granted to You under this License for that Work shall terminate + as of the date such litigation is filed. + + 4. Redistribution. You may reproduce and distribute copies of the + Work or Derivative Works thereof in any medium, with or without + modifications, and in Source or Object form, provided that You + meet the following conditions: + + (a) You must give any other recipients of the Work or + Derivative Works a copy of this License; and + + (b) You must cause any modified files to carry prominent notices + stating that You changed the files; and + + (c) You must retain, in the Source form of any Derivative Works + that You distribute, all copyright, patent, trademark, and + attribution notices from the Source form of the Work, + excluding those notices that do not pertain to any part of + the Derivative Works; and + + (d) If the Work includes a "NOTICE" text file as part of its + distribution, then any Derivative Works that You distribute must + include a readable copy of the attribution notices contained + within such NOTICE file, excluding those notices that do not + pertain to any part of the Derivative Works, in at least one + of the following places: within a NOTICE text file distributed + as part of the Derivative Works; within the Source form or + documentation, if provided along with the Derivative Works; or, + within a display generated by the Derivative Works, if and + wherever such third-party notices normally appear. The contents + of the NOTICE file are for informational purposes only and + do not modify the License. You may add Your own attribution + notices within Derivative Works that You distribute, alongside + or as an addendum to the NOTICE text from the Work, provided + that such additional attribution notices cannot be construed + as modifying the License. + + You may add Your own copyright statement to Your modifications and + may provide additional or different license terms and conditions + for use, reproduction, or distribution of Your modifications, or + for any such Derivative Works as a whole, provided Your use, + reproduction, and distribution of the Work otherwise complies with + the conditions stated in this License. + + 5. Submission of Contributions. Unless You explicitly state otherwise, + any Contribution intentionally submitted for inclusion in the Work + by You to the Licensor shall be under the terms and conditions of + this License, without any additional terms or conditions. + Notwithstanding the above, nothing herein shall supersede or modify + the terms of any separate license agreement you may have executed + with Licensor regarding such Contributions. + + 6. Trademarks. This License does not grant permission to use the trade + names, trademarks, service marks, or product names of the Licensor, + except as required for reasonable and customary use in describing the + origin of the Work and reproducing the content of the NOTICE file. + + 7. Disclaimer of Warranty. Unless required by applicable law or + agreed to in writing, Licensor provides the Work (and each + Contributor provides its Contributions) on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + implied, including, without limitation, any warranties or conditions + of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A + PARTICULAR PURPOSE. You are solely responsible for determining the + appropriateness of using or redistributing the Work and assume any + risks associated with Your exercise of permissions under this License. + + 8. Limitation of Liability. In no event and under no legal theory, + whether in tort (including negligence), contract, or otherwise, + unless required by applicable law (such as deliberate and grossly + negligent acts) or agreed to in writing, shall any Contributor be + liable to You for damages, including any direct, indirect, special, + incidental, or consequential damages of any character arising as a + result of this License or out of the use or inability to use the + Work (including but not limited to damages for loss of goodwill, + work stoppage, computer failure or malfunction, or any and all + other commercial damages or losses), even if such Contributor + has been advised of the possibility of such damages. + + 9. Accepting Warranty or Additional Liability. While redistributing + the Work or Derivative Works thereof, You may choose to offer, + and charge a fee for, acceptance of support, warranty, indemnity, + or other liability obligations and/or rights consistent with this + License. However, in accepting such obligations, You may act only + on Your own behalf and on Your sole responsibility, not on behalf + of any other Contributor, and only if You agree to indemnify, + defend, and hold each Contributor harmless for any liability + incurred by, or claims asserted against, such Contributor by reason + of your accepting any such warranty or additional liability. + + END OF TERMS AND CONDITIONS + + APPENDIX: How to apply the Apache License to your work. + + To apply the Apache License to your work, attach the following + boilerplate notice, with the fields enclosed by brackets "[]" + replaced with your own identifying information. (Don't include + the brackets!) The text should be enclosed in the appropriate + comment syntax for the file format. We also recommend that a + file or class name and description of purpose be included on the + same "printed page" as the copyright notice for easier + identification within third-party archives. + + Copyright [yyyy] [name of copyright owner] + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. diff --git a/crates/fluvio-compression/README.md b/crates/fluvio-compression/README.md new file mode 100644 index 0000000000..e49e03dd43 --- /dev/null +++ b/crates/fluvio-compression/README.md @@ -0,0 +1,7 @@ +# Fluvio Compression + +Library with handlers to compress and uncompress data in the fluvio protocol. + +In fluvio, compression is done in producer side, then consumers and SPU when it is using SmartModules, uncompress the data using the compression information that is in the attributes of the batch. + +Currently, the supported compressions codecs are None (default), Gzip, Snappy and LZ4. diff --git a/crates/fluvio-compression/src/error.rs b/crates/fluvio-compression/src/error.rs new file mode 100644 index 0000000000..4cf151fd4e --- /dev/null +++ b/crates/fluvio-compression/src/error.rs @@ -0,0 +1,15 @@ +use snap::write::{IntoInnerError, FrameEncoder}; + +#[derive(thiserror::Error, Debug)] +pub enum CompressionError { + #[error(transparent)] + IoError(#[from] std::io::Error), + #[error("unknown compression format: {0}")] + UnknownCompressionFormat(String), + #[error("error flushing Snap encoder: {0}")] + SnapError(#[from] Box>>>), + #[error("error flushing Snap encoder: {0}")] + Lz4Error(#[from] lz4_flex::frame::Error), + #[error("Unreachable error")] + UnreachableError, +} diff --git a/crates/fluvio-compression/src/gzip.rs b/crates/fluvio-compression/src/gzip.rs new file mode 100644 index 0000000000..5a49c03417 --- /dev/null +++ b/crates/fluvio-compression/src/gzip.rs @@ -0,0 +1,37 @@ +use std::io::{Read, Write}; + +use flate2::Compression; +use flate2::read::GzDecoder; +use flate2::write::GzEncoder; + +use crate::error::CompressionError; + +pub fn compress(src: &[u8]) -> Result, CompressionError> { + let mut encoder = GzEncoder::new(Vec::new(), Compression::default()); + encoder.write_all(src)?; + Ok(encoder.finish()?) +} + +pub fn uncompress(src: T) -> Result, CompressionError> { + let mut decoder = GzDecoder::new(src); + let mut buffer: Vec = Vec::new(); + decoder.read_to_end(&mut buffer)?; + Ok(buffer) +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_compress_decompress() { + let text = "FLUVIO_AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA"; + let compressed = compress(text.as_bytes()).unwrap(); + + assert!(compressed.len() < text.as_bytes().len()); + + let uncompressed = String::from_utf8(uncompress(compressed.as_slice()).unwrap()).unwrap(); + + assert_eq!(uncompressed, text); + } +} diff --git a/crates/fluvio-compression/src/lib.rs b/crates/fluvio-compression/src/lib.rs new file mode 100644 index 0000000000..2374779a92 --- /dev/null +++ b/crates/fluvio-compression/src/lib.rs @@ -0,0 +1,88 @@ +use std::str::FromStr; + +mod error; + +mod gzip; +mod snappy; +mod lz4; + +pub use error::CompressionError; +use serde::{Serialize, Deserialize}; + +/// The compression algorithm used to compress and decompress records in fluvio batches +#[derive(Copy, Clone, Debug, PartialEq, Serialize, Deserialize)] +#[serde(rename_all = "lowercase")] +#[repr(i8)] +pub enum Compression { + None = 0, + Gzip = 1, + Snappy = 2, + Lz4 = 3, +} + +impl Default for Compression { + fn default() -> Self { + Compression::None + } +} + +impl TryFrom for Compression { + type Error = CompressionError; + fn try_from(v: i8) -> Result { + match v { + 0 => Ok(Compression::None), + 1 => Ok(Compression::Gzip), + 2 => Ok(Compression::Snappy), + 3 => Ok(Compression::Lz4), + _ => Err(CompressionError::UnknownCompressionFormat(format!( + "i8 representation: {}", + v + ))), + } + } +} + +impl FromStr for Compression { + type Err = CompressionError; + + fn from_str(s: &str) -> Result { + match s.to_lowercase().as_str() { + "none" => Ok(Compression::None), + "gzip" => Ok(Compression::Gzip), + "snappy" => Ok(Compression::Snappy), + "lz4" => Ok(Compression::Lz4), + _ => Err(CompressionError::UnknownCompressionFormat(s.into())), + } + } +} + +impl Compression { + /// Compress the given data, returning the compressed data + pub fn compress(&self, src: &[u8]) -> Result, CompressionError> { + match *self { + Compression::None => Ok(src.to_vec()), + Compression::Gzip => gzip::compress(src), + Compression::Snappy => snappy::compress(src), + Compression::Lz4 => lz4::compress(src), + } + } + + /// Uncompresss the given data, returning the uncompressed data if any compression was applied, otherwise returns None + pub fn uncompress(&self, src: &[u8]) -> Result>, CompressionError> { + match *self { + Compression::None => Ok(None), + Compression::Gzip => { + let output = gzip::uncompress(src)?; + Ok(Some(output)) + } + Compression::Snappy => { + let output = snappy::uncompress(src)?; + Ok(Some(output)) + } + Compression::Lz4 => { + let output = lz4::uncompress(src)?; + Ok(Some(output)) + } + } + } +} diff --git a/crates/fluvio-compression/src/lz4.rs b/crates/fluvio-compression/src/lz4.rs new file mode 100644 index 0000000000..3dc8aeeba5 --- /dev/null +++ b/crates/fluvio-compression/src/lz4.rs @@ -0,0 +1,35 @@ +use std::io::{Read, Write}; + +use crate::error::CompressionError; +use lz4_flex::frame::{FrameDecoder, FrameEncoder}; + +pub fn compress(src: &[u8]) -> Result, CompressionError> { + let buf = Vec::with_capacity(src.len()); + let mut encoder = FrameEncoder::new(buf); + encoder.write_all(src)?; + Ok(encoder.finish()?) +} + +pub fn uncompress(src: T) -> Result, CompressionError> { + let mut buffer: Vec = Vec::new(); + FrameDecoder::new(src).read_to_end(&mut buffer)?; + + Ok(buffer) +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_compress_decompress() { + let text = "FLUVIO_AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA"; + let compressed = compress(text.as_bytes()).unwrap(); + + assert!(compressed.len() < text.as_bytes().len()); + + let uncompressed = String::from_utf8(uncompress(compressed.as_slice()).unwrap()).unwrap(); + + assert_eq!(uncompressed, text); + } +} diff --git a/crates/fluvio-compression/src/snappy.rs b/crates/fluvio-compression/src/snappy.rs new file mode 100644 index 0000000000..a053fc1d20 --- /dev/null +++ b/crates/fluvio-compression/src/snappy.rs @@ -0,0 +1,35 @@ +use std::io::{Read, Write}; + +use crate::error::CompressionError; +use snap::{read::FrameDecoder, write::FrameEncoder}; + +pub fn compress(src: &[u8]) -> Result, CompressionError> { + let buf = Vec::with_capacity(src.len()); + let mut encoder = FrameEncoder::new(buf); + encoder.write_all(src)?; + Ok(encoder.into_inner().map_err(Box::new)?) +} + +pub fn uncompress(src: T) -> Result, CompressionError> { + let mut buffer: Vec = Vec::new(); + FrameDecoder::new(src).read_to_end(&mut buffer)?; + + Ok(buffer) +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_compress_decompress() { + let text = "FLUVIO_AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA"; + let compressed = compress(text.as_bytes()).unwrap(); + + assert!(compressed.len() < text.as_bytes().len()); + + let uncompressed = String::from_utf8(uncompress(compressed.as_slice()).unwrap()).unwrap(); + + assert_eq!(uncompressed, text); + } +} diff --git a/crates/fluvio-dataplane-protocol/Cargo.toml b/crates/fluvio-dataplane-protocol/Cargo.toml index f7e84229ce..406f66148d 100644 --- a/crates/fluvio-dataplane-protocol/Cargo.toml +++ b/crates/fluvio-dataplane-protocol/Cargo.toml @@ -26,6 +26,7 @@ eyre = { version = "0.6", default-features = false } thiserror = "1" # Fluvio dependencies +fluvio-compression = { version = "0.1.0", path = "../fluvio-compression" } fluvio-future = { version = "0.3.1" } fluvio-protocol = { path = "../fluvio-protocol", version = "0.7", features = [ "derive", diff --git a/crates/fluvio-dataplane-protocol/src/batch.rs b/crates/fluvio-dataplane-protocol/src/batch.rs index f2f3e5245a..3ac06970dc 100644 --- a/crates/fluvio-dataplane-protocol/src/batch.rs +++ b/crates/fluvio-dataplane-protocol/src/batch.rs @@ -2,8 +2,11 @@ use std::io::Error; use std::mem::size_of; use std::fmt::Debug; +use fluvio_compression::CompressionError; use tracing::trace; +use fluvio_compression::Compression; + use crate::core::bytes::Buf; use crate::core::bytes::BufMut; @@ -15,6 +18,8 @@ use crate::Offset; use crate::Size; use crate::record::Record; +pub const COMPRESSION_CODEC_MASK: i16 = 0x07; + pub trait BatchRecords: Default + Debug + Encoder + Decoder + Send + Sync { /// how many bytes does record wants to process #[deprecated] @@ -26,8 +31,36 @@ pub trait BatchRecords: Default + Debug + Encoder + Decoder + Send + Sync { /// A type describing in-memory records pub type MemoryRecords = Vec; +/// A type describing Raw records +/// This structs decodes and encode its bytes as it is. Just the raw bytes of its internal vector. +/// When decoding, please be sure that your src buffer have the exact number of bytes. +#[derive(Debug, Default)] +pub struct RawRecords(pub Vec); + +impl Encoder for RawRecords { + fn write_size(&self, _version: Version) -> usize { + self.0.len() + } + + fn encode(&self, buf: &mut T, _version: Version) -> Result<(), Error> { + buf.put_slice(&self.0); + Ok(()) + } +} + +impl Decoder for RawRecords { + fn decode(&mut self, buf: &mut T, _version: Version) -> Result<(), Error> { + let len = buf.remaining(); + + self.0.resize(len, 0); + buf.copy_to_slice(&mut self.0); + Ok(()) + } +} impl BatchRecords for MemoryRecords {} +impl BatchRecords for RawRecords {} + /// size of the offset and length pub const BATCH_PREAMBLE_SIZE: usize = size_of::() // Offset + size_of::(); // i32 @@ -104,6 +137,10 @@ impl Batch { self.get_header().last_offset_delta } + pub fn get_compression(&self) -> Result { + self.get_header().get_compression() + } + /// decode from buf stored in the file /// read all excluding records pub fn decode_from_file_buf(&mut self, src: &mut T, version: Version) -> Result<(), Error> @@ -118,6 +155,33 @@ impl Batch { } } +impl TryFrom> for Batch { + type Error = CompressionError; + fn try_from(batch: Batch) -> Result { + let records = batch.memory_records()?; + Ok(Batch { + base_offset: batch.base_offset, + batch_len: (BATCH_HEADER_SIZE + records.write_size(0)) as i32, + header: batch.header, + records, + }) + } +} + +impl TryFrom for Batch { + type Error = CompressionError; + fn try_from(f: Batch) -> Result { + let mut buf = Vec::new(); + f.records.encode(&mut buf, 0)?; + let records = RawRecords(buf); + Ok(Batch { + base_offset: f.base_offset, + batch_len: f.batch_len, + header: f.header, + records, + }) + } +} impl Batch where R: Encoder, @@ -158,6 +222,22 @@ impl Batch { self.header.last_offset_delta = self.records().len() as i32 - 1; } } +impl Batch { + pub fn memory_records(&self) -> Result { + let compression = self.get_compression()?; + + let mut records: MemoryRecords = Default::default(); + if let Compression::None = compression { + records.decode(&mut &self.records.0[..], 0)?; + } else { + let decompressed = compression + .uncompress(&self.records.0[..])? + .ok_or(CompressionError::UnreachableError)?; + records.decode(&mut &decompressed[..], 0)?; + } + Ok(records) + } +} impl> From for Batch { fn from(records: T) -> Self { @@ -190,7 +270,21 @@ where { trace!("decoding batch"); self.decode_from_file_buf(src, version)?; - self.records.decode(src, version)?; + + let batch_len = self.batch_len as usize - BATCH_HEADER_SIZE; + let mut buf = src.take(batch_len); + if buf.remaining() < batch_len { + return Err(Error::new( + std::io::ErrorKind::UnexpectedEof, + format!( + "not enough buf records, expected: {}, found: {}", + batch_len, + buf.remaining() + ), + )); + } + + self.records.decode(&mut buf, version)?; Ok(()) } } @@ -252,6 +346,17 @@ pub struct BatchHeader { pub first_sequence: i32, } +impl BatchHeader { + fn get_compression(&self) -> Result { + let compression_bits = self.attributes & COMPRESSION_CODEC_MASK; + Compression::try_from(compression_bits as i8) + } + + pub fn set_compression(&mut self, compression: Compression) { + let compression_bits = compression as i16 & COMPRESSION_CODEC_MASK; + self.attributes = (self.attributes & !COMPRESSION_CODEC_MASK) | compression_bits; + } +} impl Default for BatchHeader { fn default() -> Self { BatchHeader { diff --git a/crates/fluvio-dataplane-protocol/src/error_code.rs b/crates/fluvio-dataplane-protocol/src/error_code.rs index de5c25f563..33f37acc35 100644 --- a/crates/fluvio-dataplane-protocol/src/error_code.rs +++ b/crates/fluvio-dataplane-protocol/src/error_code.rs @@ -166,6 +166,11 @@ pub enum ErrorCode { DerivedStreamInvalid(String), #[error("can't do recursive derivedstream yet: {0}->{1}")] DerivedStreamRecursion(String, String), + + // Compression errors + #[fluvio(tag = 9000)] + #[error("a compression error occurred in the SPU")] + CompressionError, } impl Default for ErrorCode { diff --git a/crates/fluvio-dataplane-protocol/src/produce/request.rs b/crates/fluvio-dataplane-protocol/src/produce/request.rs index 6b36341aa3..d079916592 100644 --- a/crates/fluvio-dataplane-protocol/src/produce/request.rs +++ b/crates/fluvio-dataplane-protocol/src/produce/request.rs @@ -1,6 +1,7 @@ use std::fmt::Debug; use std::marker::PhantomData; +use crate::batch::RawRecords; use crate::core::Encoder; use crate::core::Decoder; use crate::derive::FluvioDefault; @@ -10,9 +11,9 @@ use crate::record::RecordSet; use super::ProduceResponse; -pub type DefaultProduceRequest = ProduceRequest; -pub type DefaultPartitionRequest = PartitionProduceData; -pub type DefaultTopicRequest = TopicProduceData; +pub type DefaultProduceRequest = ProduceRequest>; +pub type DefaultPartitionRequest = PartitionProduceData>; +pub type DefaultTopicRequest = TopicProduceData>; #[derive(Encoder, Decoder, FluvioDefault, Debug)] pub struct ProduceRequest diff --git a/crates/fluvio-dataplane-protocol/src/record.rs b/crates/fluvio-dataplane-protocol/src/record.rs index 1c1614e34c..4be4f0a3fb 100644 --- a/crates/fluvio-dataplane-protocol/src/record.rs +++ b/crates/fluvio-dataplane-protocol/src/record.rs @@ -14,6 +14,7 @@ use bytes::BufMut; use crate::batch::BatchRecords; use crate::batch::MemoryRecords; +use crate::batch::RawRecords; use crate::core::{Encoder, Decoder}; use crate::core::DecoderVarInt; use crate::core::EncoderVarInt; @@ -21,6 +22,7 @@ use crate::core::Version; use crate::batch::Batch; use crate::Offset; +use fluvio_compression::CompressionError; /// maximum text to display static MAX_STRING_DISPLAY: Lazy = Lazy::new(|| { @@ -209,6 +211,18 @@ pub struct RecordSet { pub batches: Vec>, } +impl TryFrom for RecordSet { + type Error = CompressionError; + fn try_from(set: RecordSet) -> Result { + let batches: Result, _> = set + .batches + .into_iter() + .map(|batch| batch.try_into()) + .collect(); + Ok(Self { batches: batches? }) + } +} + impl fmt::Display for RecordSet { fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { write!(f, "{} batches", self.batches.len()) @@ -230,7 +244,10 @@ impl RecordSet { /// total records pub fn total_records(&self) -> usize { - self.batches.iter().map(|batch| batch.records_len()).sum() + self.batches + .iter() + .map(|batches| batches.records_len()) + .sum() } /// return base offset diff --git a/crates/fluvio-protocol/Cargo.toml b/crates/fluvio-protocol/Cargo.toml index ca9a738ee8..a8857b4a32 100644 --- a/crates/fluvio-protocol/Cargo.toml +++ b/crates/fluvio-protocol/Cargo.toml @@ -4,7 +4,7 @@ edition = "2021" version = "0.7.4" authors = ["Fluvio Contributors "] description = "Fluvio streaming protocol" -repository = "https://github.com/infinyon/fluvio-protocol" +repository = "https://github.com/infinyon/fluvio" license = "Apache-2.0" categories = ["encoding", "api-bindings"] diff --git a/crates/fluvio-protocol/src/core/decoder.rs b/crates/fluvio-protocol/src/core/decoder.rs index a93b238c96..7db6781edd 100644 --- a/crates/fluvio-protocol/src/core/decoder.rs +++ b/crates/fluvio-protocol/src/core/decoder.rs @@ -14,7 +14,7 @@ use crate::Version; // trait for encoding and decoding using Kafka Protocol pub trait Decoder: Sized + Default { - /// decode Kafka compliant protocol values from buf + /// decode Fluvio compliant protocol values from buf fn decode_from(src: &mut T, version: Version) -> Result where T: Buf, diff --git a/crates/fluvio-protocol/src/core/encoder.rs b/crates/fluvio-protocol/src/core/encoder.rs index d990202934..185d3ca72c 100644 --- a/crates/fluvio-protocol/src/core/encoder.rs +++ b/crates/fluvio-protocol/src/core/encoder.rs @@ -15,7 +15,7 @@ use crate::Version; use super::varint::variant_encode; use super::varint::variant_size; -// trait for encoding and decoding using Kafka Protocol +// trait for encoding and decoding using Fluvio Protocol pub trait Encoder { /// size of this object in bytes fn write_size(&self, version: Version) -> usize; diff --git a/crates/fluvio-smartengine/Cargo.toml b/crates/fluvio-smartengine/Cargo.toml index ffac735732..587fa61b96 100644 --- a/crates/fluvio-smartengine/Cargo.toml +++ b/crates/fluvio-smartengine/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "fluvio-smartengine" -version = "0.2.5" +version = "0.2.6" edition = "2021" license = "Apache-2.0" authors = ["Fluvio Contributors "] diff --git a/crates/fluvio-smartengine/src/smartmodule/file_batch.rs b/crates/fluvio-smartengine/src/smartmodule/file_batch.rs index 798f796616..0238e7576e 100644 --- a/crates/fluvio-smartengine/src/smartmodule/file_batch.rs +++ b/crates/fluvio-smartengine/src/smartmodule/file_batch.rs @@ -78,7 +78,7 @@ impl Iterator for FileBatchIterator { ))); } - let mut batch = Batch::default(); + let mut batch: Batch = Batch::default(); if let Err(err) = batch.decode_from_file_buf(&mut Cursor::new(header), 0) { return Some(Err(IoError::new( ErrorKind::Other, @@ -94,29 +94,50 @@ impl Iterator for FileBatchIterator { "fbatch header" ); - let mut records = vec![0u8; remainder]; + let mut raw_records = vec![0u8; remainder]; self.offset += BATCH_FILE_HEADER_SIZE as i64; - let bytes_read = match pread(self.fd, &mut records, self.offset) + let bytes_read = match pread(self.fd, &mut raw_records, self.offset) .map_err(|err| IoError::new(ErrorKind::Other, format!("pread error {}", err))) { Ok(bytes) => bytes, Err(err) => return Some(Err(err)), }; - if bytes_read < records.len() { - warn!(bytes_read, record_len = records.len()); + if bytes_read < raw_records.len() { + warn!(bytes_read, record_len = raw_records.len()); return Some(Err(IoError::new( ErrorKind::UnexpectedEof, format!( "not enough for batch records {} out of {}", bytes_read, - records.len() + raw_records.len() ), ))); } + let compression = match batch.get_compression() { + Ok(compression) => compression, + Err(err) => { + return Some(Err(IoError::new( + ErrorKind::Other, + format!("unknown compression value for batch {}", err), + ))) + } + }; + + let records = match compression.uncompress(&raw_records) { + Ok(Some(records)) => records, + Ok(None) => raw_records, + Err(err) => { + return Some(Err(IoError::new( + ErrorKind::Other, + format!("uncompress error {}", err), + ))) + } + }; + self.offset += bytes_read as i64; debug!(file_offset = self.offset, "fbatch end"); diff --git a/crates/fluvio-smartmodule/examples/Cargo.lock b/crates/fluvio-smartmodule/examples/Cargo.lock index 173b15b7df..049558f6dd 100644 --- a/crates/fluvio-smartmodule/examples/Cargo.lock +++ b/crates/fluvio-smartmodule/examples/Cargo.lock @@ -2,6 +2,12 @@ # It is not intended for manual editing. version = 3 +[[package]] +name = "adler" +version = "1.0.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f26201604c87b1e01bd3d98f8d5d9a8fcbb815e8cedb41ffccbeb4bf593a35fe" + [[package]] name = "aho-corasick" version = "0.7.18" @@ -70,6 +76,15 @@ dependencies = [ "rustc_version 0.2.3", ] +[[package]] +name = "crc32fast" +version = "1.3.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b540bd8bc810d3885c6ea91e2018302f68baba2129ab3e88f32389ee9370880d" +dependencies = [ + "cfg-if", +] + [[package]] name = "dtoa" version = "0.4.8" @@ -86,6 +101,29 @@ dependencies = [ "once_cell", ] +[[package]] +name = "flate2" +version = "1.0.22" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1e6988e897c1c9c485f43b47a529cef42fde0547f9d8d41a7062518f1d8fc53f" +dependencies = [ + "cfg-if", + "crc32fast", + "libc", + "miniz_oxide", +] + +[[package]] +name = "fluvio-compression" +version = "0.0.0" +dependencies = [ + "flate2", + "lz4_flex", + "serde", + "snap", + "thiserror", +] + [[package]] name = "fluvio-dataplane-protocol" version = "0.10.0" @@ -95,6 +133,7 @@ dependencies = [ "content_inspector", "crc32c", "eyre", + "fluvio-compression", "fluvio-future", "fluvio-protocol", "flv-util", @@ -520,12 +559,31 @@ dependencies = [ "cfg-if", ] +[[package]] +name = "lz4_flex" +version = "0.9.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "42c51df9d8d4842336c835df1d85ed447c4813baa237d033d95128bf5552ad8a" +dependencies = [ + "twox-hash", +] + [[package]] name = "memchr" version = "2.4.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "308cc39be01b73d0d18f82a0e7b2a3df85245f84af96fdddc5d202d27e47b86a" +[[package]] +name = "miniz_oxide" +version = "0.4.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a92518e98c078586bc6c934028adcca4c92a53d6a958196de835170a01d84e4b" +dependencies = [ + "adler", + "autocfg", +] + [[package]] name = "once_cell" version = "1.8.0" @@ -783,6 +841,18 @@ version = "1.7.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "1ecab6c735a6bb4139c0caafd0cc3635748bbb3acf4550e8138122099251f309" +[[package]] +name = "snap" +version = "1.0.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "45456094d1983e2ee2a18fdfebce3189fa451699d0502cb8e3b49dba5ba41451" + +[[package]] +name = "static_assertions" +version = "1.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a2eb9349b6444b326872e140eb1cf5e7c522154d69e7a0ffb0fb81c06b37543f" + [[package]] name = "syn" version = "1.0.81" @@ -846,6 +916,16 @@ dependencies = [ "lazy_static", ] +[[package]] +name = "twox-hash" +version = "1.6.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4ee73e6e4924fe940354b8d4d98cad5231175d615cd855b758adc658c0aac6a0" +dependencies = [ + "cfg-if", + "static_assertions", +] + [[package]] name = "ucd-trie" version = "0.1.3" diff --git a/crates/fluvio-spu-schema/src/server/stream_fetch.rs b/crates/fluvio-spu-schema/src/server/stream_fetch.rs index 90fb08a87a..5f0608f45f 100644 --- a/crates/fluvio-spu-schema/src/server/stream_fetch.rs +++ b/crates/fluvio-spu-schema/src/server/stream_fetch.rs @@ -9,6 +9,7 @@ use std::marker::PhantomData; use std::io::{self, Read}; use std::borrow::Cow; +use dataplane::batch::RawRecords; use dataplane::core::{Encoder, Decoder}; use dataplane::api::Request; use dataplane::fetch::FetchablePartitionResponse; @@ -21,9 +22,8 @@ use flate2::{ bufread::{GzEncoder, GzDecoder}, }; -pub type DefaultStreamFetchResponse = StreamFetchResponse; - -pub type DefaultStreamFetchRequest = StreamFetchRequest; +pub type DefaultStreamFetchResponse = StreamFetchResponse>; +pub type DefaultStreamFetchRequest = StreamFetchRequest>; use super::SpuServerApiKey; diff --git a/crates/fluvio-spu/Cargo.toml b/crates/fluvio-spu/Cargo.toml index 04efd44585..dc0eb3bdde 100644 --- a/crates/fluvio-spu/Cargo.toml +++ b/crates/fluvio-spu/Cargo.toml @@ -46,6 +46,7 @@ fluvio-types = { features = [ "events", ], path = "../fluvio-types" } fluvio-storage = { path = "../fluvio-storage" } +fluvio-compression = { version = "0.1.0", path = "../fluvio-compression" } fluvio-controlplane = { path = "../fluvio-controlplane" } fluvio-controlplane-metadata = { path = "../fluvio-controlplane-metadata" } fluvio-spu-schema = { path = "../fluvio-spu-schema", features = [ diff --git a/crates/fluvio-spu/src/replication/follower/state.rs b/crates/fluvio-spu/src/replication/follower/state.rs index 2442488541..331be23ffb 100644 --- a/crates/fluvio-spu/src/replication/follower/state.rs +++ b/crates/fluvio-spu/src/replication/follower/state.rs @@ -3,6 +3,7 @@ use std::fmt::Debug; use std::collections::{HashMap, hash_map::Entry}; use std::ops::{Deref, DerefMut}; +use dataplane::batch::BatchRecords; use fluvio_storage::config::ReplicaConfig; use tracing::{debug, warn, instrument}; use async_rwlock::{RwLock}; @@ -202,9 +203,9 @@ where } /// update from leader with new record set - pub async fn update_from_leader( + pub async fn update_from_leader( &self, - records: &mut RecordSet, + records: &mut RecordSet, leader_hw: Offset, ) -> Result { let mut changes = false; @@ -242,7 +243,10 @@ where /// try to write records /// ensure records has correct baseoffset - async fn write_recordsets(&self, records: &mut RecordSet) -> Result { + async fn write_recordsets( + &self, + records: &mut RecordSet, + ) -> Result { let storage_leo = self.leo(); if records.base_offset() != storage_leo { // this could happend if records were sent from leader before hw was sync diff --git a/crates/fluvio-spu/src/replication/follower/sync.rs b/crates/fluvio-spu/src/replication/follower/sync.rs index 79c0e42b3c..b62b396aca 100644 --- a/crates/fluvio-spu/src/replication/follower/sync.rs +++ b/crates/fluvio-spu/src/replication/follower/sync.rs @@ -7,6 +7,7 @@ use std::io::Error as IoError; use std::marker::PhantomData; use bytes::BytesMut; +use dataplane::batch::RawRecords; use tracing::trace; use dataplane::core::{Encoder, Decoder, Version}; @@ -19,7 +20,7 @@ use dataplane::store::FileWrite; use super::api_key::FollowerPeerApiEnum; pub type FileSyncRequest = SyncRequest; -pub type DefaultSyncRequest = SyncRequest; +pub type DefaultSyncRequest = SyncRequest>; pub type PeerFilePartitionResponse = PeerFetchablePartitionResponse; pub type PeerFileTopicResponse = PeerFetchableTopicResponse; diff --git a/crates/fluvio-spu/src/replication/leader/replica_state.rs b/crates/fluvio-spu/src/replication/leader/replica_state.rs index 0cfa6bdd71..b80606dec3 100644 --- a/crates/fluvio-spu/src/replication/leader/replica_state.rs +++ b/crates/fluvio-spu/src/replication/leader/replica_state.rs @@ -11,8 +11,9 @@ use tracing::{debug, error, warn}; use tracing::instrument; use async_rwlock::{RwLock}; -use dataplane::{record::RecordSet}; +use dataplane::record::RecordSet; use dataplane::{Offset, Isolation, ReplicaKey}; +use dataplane::batch::BatchRecords; use fluvio_controlplane_metadata::partition::{Replica, ReplicaStatus, PartitionStatus}; use fluvio_controlplane::LrsRequest; use fluvio_storage::{FileReplica, StorageError, ReplicaStorage, OffsetInfo, ReplicaStorageConfig}; @@ -311,9 +312,9 @@ where /// write records to storage /// then update our follower's leo #[instrument(skip(self, records, notifiers))] - pub async fn write_record_set( + pub async fn write_record_set( &self, - records: &mut RecordSet, + records: &mut RecordSet, notifiers: &FollowerNotifier, ) -> Result { let base_offset = self diff --git a/crates/fluvio-spu/src/services/public/produce_handler.rs b/crates/fluvio-spu/src/services/public/produce_handler.rs index c1dcde5975..28a1a490e8 100644 --- a/crates/fluvio-spu/src/services/public/produce_handler.rs +++ b/crates/fluvio-spu/src/services/public/produce_handler.rs @@ -1,13 +1,14 @@ use std::io::Error; +use dataplane::batch::BatchRecords; use fluvio_storage::StorageError; use tracing::{debug, trace, error}; use tracing::instrument; use dataplane::ErrorCode; use dataplane::produce::{ - DefaultProduceRequest, ProduceResponse, TopicProduceResponse, PartitionProduceResponse, - PartitionProduceData, TopicProduceData, + ProduceResponse, TopicProduceResponse, PartitionProduceResponse, PartitionProduceData, + DefaultProduceRequest, DefaultTopicRequest, }; use dataplane::api::RequestMessage; use dataplane::api::ResponseMessage; @@ -46,7 +47,7 @@ pub async fn handle_produce_request( )] async fn handle_produce_topic( ctx: &DefaultSharedGlobalContext, - topic_request: TopicProduceData, + topic_request: DefaultTopicRequest, ) -> Result { trace!("Handling produce request for topic:"); let topic = &topic_request.name; @@ -70,10 +71,10 @@ async fn handle_produce_topic( skip(ctx, replica_id, partition_request), fields(%replica_id), )] -async fn handle_produce_partition( +async fn handle_produce_partition( ctx: &DefaultSharedGlobalContext, replica_id: ReplicaKey, - mut partition_request: PartitionProduceData, + mut partition_request: PartitionProduceData>, ) -> Result { trace!("Handling produce request for partition:"); diff --git a/crates/fluvio-spu/src/services/public/stream_fetch.rs b/crates/fluvio-spu/src/services/public/stream_fetch.rs index 17d225b661..c0470476fa 100644 --- a/crates/fluvio-spu/src/services/public/stream_fetch.rs +++ b/crates/fluvio-spu/src/services/public/stream_fetch.rs @@ -6,7 +6,7 @@ use futures_util::StreamExt; use tracing::{debug, error, instrument, trace}; use tokio::select; -use dataplane::record::FileRecordSet; +use dataplane::{record::FileRecordSet, batch::RawRecords}; use fluvio_types::event::{StickyEvent, offsets::OffsetPublisher}; use fluvio_future::task::spawn; use fluvio_socket::{ExclusiveFlvSink, SocketError}; @@ -18,6 +18,7 @@ use dataplane::{ }; use dataplane::{Offset, Isolation, ReplicaKey}; use dataplane::fetch::FilePartitionResponse; +use fluvio_compression::CompressionError; use fluvio_spu_schema::server::stream_fetch::{ DefaultStreamFetchRequest, FileStreamFetchRequest, StreamFetchRequest, StreamFetchResponse, }; @@ -185,6 +186,18 @@ impl StreamFetchHandler { Ok(()) } StreamFetchError::Socket(err) => Err(err), + StreamFetchError::Compression(err) => { + error!(%err, "compression error"); + send_back_error( + &sink, + &replica, + &header, + stream_id, + ErrorCode::CompressionError, + ) + .await?; + Ok(()) + } } } else { Ok(()) @@ -425,13 +438,15 @@ impl StreamFetchHandler { return Ok((starting_offset, false)); } - let records = &file_partition_response.records; - let mut file_batch_iterator = FileBatchIterator::from_raw_slice(records.raw_slice()); - - // If a SmartModule is provided, we need to read records from file to memory - // In-memory records are then processed by SmartModule and returned to consumer let output = match smartmodule_instance { Some(smartmodule_instance) => { + // If a SmartModule is provided, we need to read records from file to memory + // In-memory records are then processed by SmartModule and returned to consumer + + let records = &file_partition_response.records; + let mut file_batch_iterator = + FileBatchIterator::from_raw_slice(records.raw_slice()); + let (batch, smartmodule_error) = smartmodule_instance .process_batch( &mut file_batch_iterator, @@ -492,8 +507,8 @@ impl StreamFetchHandler { mut next_offset: Offset, batch: Batch, smartmodule_error: Option, - ) -> Result<(Offset, bool), SocketError> { - type DefaultPartitionResponse = FetchablePartitionResponse; + ) -> Result<(Offset, bool), StreamFetchError> { + type DefaultPartitionResponse = FetchablePartitionResponse>; let error_code = match smartmodule_error { Some(error) => ErrorCode::SmartModuleRuntimeError(error), @@ -528,7 +543,7 @@ impl StreamFetchHandler { error_code, high_watermark: file_partition_response.high_watermark, log_start_offset: file_partition_response.log_start_offset, - records, + records: records.try_into()?, next_filter_offset: next_offset, // we mark last offset in the response that we should sync up ..Default::default() @@ -563,7 +578,7 @@ async fn send_back_error( stream_id: u32, error_code: ErrorCode, ) -> Result<(), SocketError> { - type DefaultPartitionResponse = FetchablePartitionResponse; + type DefaultPartitionResponse = FetchablePartitionResponse>; let partition_response = DefaultPartitionResponse { error_code, partition_index: replica.partition, @@ -590,6 +605,7 @@ async fn send_back_error( } enum StreamFetchError { + Compression(CompressionError), Socket(SocketError), Fetch(ErrorCode), } @@ -606,6 +622,11 @@ impl From for StreamFetchError { } } +impl From for StreamFetchError { + fn from(err: CompressionError) -> Self { + Self::Compression(err) + } +} pub mod publishers { use std::{ diff --git a/crates/fluvio-spu/src/services/public/tests/produce.rs b/crates/fluvio-spu/src/services/public/tests/produce.rs index 3cd47aca89..9c30e3811f 100644 --- a/crates/fluvio-spu/src/services/public/tests/produce.rs +++ b/crates/fluvio-spu/src/services/public/tests/produce.rs @@ -1,7 +1,9 @@ use std::{env::temp_dir, time::Duration}; use dataplane::{ - produce::{DefaultProduceRequest, TopicProduceData, PartitionProduceData}, + produce::{ + DefaultProduceRequest, DefaultPartitionRequest, TopicProduceData, PartitionProduceData, + }, api::RequestMessage, }; use fluvio_controlplane_metadata::partition::Replica; @@ -46,13 +48,15 @@ async fn test_produce_basic() { // Make three produce requests with records and check that returned offset is correct let records_per_request = 9; for i in 0..3 { - let records = create_filter_records(records_per_request); + let records = create_filter_records(records_per_request) + .try_into() + .expect("filter records"); let mut produce_request = DefaultProduceRequest { ..Default::default() }; - let partition_produce = PartitionProduceData { + let partition_produce = DefaultPartitionRequest { partition_index: 0, records, }; @@ -86,7 +90,9 @@ async fn test_produce_basic() { let partitions = (0..2) .map(|_| PartitionProduceData { partition_index: 0, - records: create_filter_records(records_per_request), + records: create_filter_records(records_per_request) + .try_into() + .expect("partition"), }) .collect::>(); let topic_produce_request = TopicProduceData { diff --git a/crates/fluvio-spu/src/services/public/tests/stream_fetch.rs b/crates/fluvio-spu/src/services/public/tests/stream_fetch.rs index 3a61c3adb4..4487314a88 100644 --- a/crates/fluvio-spu/src/services/public/tests/stream_fetch.rs +++ b/crates/fluvio-spu/src/services/public/tests/stream_fetch.rs @@ -150,10 +150,30 @@ async fn test_stream_fetch_basic() { let batch = &partition.records.batches[0]; assert_eq!(batch.base_offset, 0); assert_eq!(batch.get_last_offset(), 1); - assert_eq!(batch.records().len(), 2); - assert_eq!(batch.records()[0].value().as_ref(), TEST_RECORD); - assert_eq!(batch.records()[1].value().as_ref(), TEST_RECORD); - assert_eq!(batch.records()[1].get_offset_delta(), 1); + assert_eq!(batch.memory_records().expect("records").len(), 2); + assert_eq!( + batch + .memory_records() + .expect("failed to get memory records")[0] + .value() + .as_ref(), + TEST_RECORD + ); + assert_eq!( + batch + .memory_records() + .expect("failed to get memory records")[1] + .value() + .as_ref(), + TEST_RECORD + ); + assert_eq!( + batch + .memory_records() + .expect("failed to get memory records")[1] + .get_offset_delta(), + 1 + ); } drop(response); @@ -184,9 +204,23 @@ async fn test_stream_fetch_basic() { let batch = &partition.records.batches[0]; assert_eq!(batch.base_offset, 0); assert_eq!(batch.get_last_offset(), 1); - assert_eq!(batch.records().len(), 2); - assert_eq!(batch.records()[0].value().as_ref(), TEST_RECORD); - assert_eq!(batch.records()[1].value().as_ref(), TEST_RECORD); + assert_eq!(batch.memory_records().expect("records").len(), 2); + assert_eq!( + batch + .memory_records() + .expect("failed to get memory records")[0] + .value() + .as_ref(), + TEST_RECORD + ); + assert_eq!( + batch + .memory_records() + .expect("failed to get memory records")[1] + .value() + .as_ref(), + TEST_RECORD + ); } drop(response); @@ -225,9 +259,23 @@ async fn test_stream_fetch_basic() { let batch = &partition.records.batches[0]; assert_eq!(batch.base_offset, 2); assert_eq!(batch.get_last_offset(), 3); - assert_eq!(batch.records().len(), 2); - assert_eq!(batch.records()[0].value().as_ref(), TEST_RECORD); - assert_eq!(batch.records()[1].value().as_ref(), TEST_RECORD); + assert_eq!(batch.memory_records().expect("records").len(), 2); + assert_eq!( + batch + .memory_records() + .expect("failed to get memory records")[0] + .value() + .as_ref(), + TEST_RECORD + ); + assert_eq!( + batch + .memory_records() + .expect("failed to get memory records")[1] + .value() + .as_ref(), + TEST_RECORD + ); } } @@ -428,12 +476,22 @@ async fn test_stream_fetch_filter( assert_eq!(partition.records.batches.len(), 1); let batch = &partition.records.batches[0]; assert_eq!(batch.base_offset, 0); - assert_eq!(batch.records().len(), 1); + assert_eq!(batch.memory_records().expect("records").len(), 1); assert_eq!( - batch.records()[0].value().as_ref(), + batch + .memory_records() + .expect("failed to get memory records")[0] + .value() + .as_ref(), "a".repeat(100).as_bytes() ); - assert_eq!(batch.records()[0].get_offset_delta(), 1); + assert_eq!( + batch + .memory_records() + .expect("failed to get memory records")[0] + .get_offset_delta(), + 1 + ); } drop(response); @@ -486,9 +544,13 @@ async fn test_stream_fetch_filter( assert_eq!(partition.records.batches.len(), 1); let batch = &partition.records.batches[0]; assert_eq!(batch.base_offset, 4); // first base offset where we had filtered records - assert_eq!(batch.records().len(), 2); + assert_eq!(batch.memory_records().expect("records").len(), 2); assert_eq!( - batch.records()[0].value().as_ref(), + batch + .memory_records() + .expect("failed to get memory records")[0] + .value() + .as_ref(), "a".repeat(100).as_bytes() ); } @@ -579,7 +641,7 @@ async fn test_stream_fetch_filter_individual( .await .expect("create stream"); - let mut records: RecordSet = BatchProducer::builder() + let mut records = BatchProducer::builder() .records(1u16) .record_generator(Arc::new(|_, _| Record::new("1"))) .build() @@ -595,7 +657,7 @@ async fn test_stream_fetch_filter_individual( _ = fluvio_future::timer::sleep(std::time::Duration::from_millis(1000)) => (), } - let mut records: RecordSet = BatchProducer::builder() + let mut records = BatchProducer::builder() .records(1u16) .record_generator(Arc::new(|_, _| Record::new("2"))) .build() @@ -607,7 +669,9 @@ async fn test_stream_fetch_filter_individual( .expect("write"); let response = stream.next().await.expect("first").expect("response"); - let records = response.partition.records.batches[0].records(); + let records = response.partition.records.batches[0] + .memory_records() + .expect("failed to get memory records"); assert_eq!(records.len(), 1); assert_eq!(records[0].value.as_ref(), "2".as_bytes()); @@ -707,7 +771,7 @@ async fn test_stream_filter_error_fetch( Record::new(value) } - let mut records: RecordSet = BatchProducer::builder() + let mut records = BatchProducer::builder() .records(11u16) .record_generator(Arc::new(generate_record)) .build() @@ -728,7 +792,9 @@ async fn test_stream_filter_error_fetch( let response = stream.next().await.expect("first").expect("response"); assert_eq!(response.partition.records.batches.len(), 1); - let records = response.partition.records.batches[0].records(); + let records = response.partition.records.batches[0] + .memory_records() + .expect("memory records"); assert_eq!(records.len(), 5); assert_eq!(records[0].value.as_ref(), "0".as_bytes()); assert_eq!(records[1].value.as_ref(), "2".as_bytes()); @@ -868,9 +934,13 @@ async fn test_stream_filter_max( assert_eq!(partition.records.batches.len(), 1); let batch = &partition.records.batches[0]; assert_eq!(batch.base_offset, 0); - assert_eq!(batch.records().len(), 2); + assert_eq!(batch.memory_records().expect("records").len(), 2); assert_eq!( - batch.records()[0].value().as_ref(), + batch + .memory_records() + .expect("failed to get memory records")[0] + .value() + .as_ref(), "a".repeat(100).as_bytes() ); } @@ -902,9 +972,13 @@ async fn test_stream_filter_max( assert_eq!(partition.records.batches.len(), 1); let batch = &partition.records.batches[0]; assert_eq!(batch.base_offset, 20); - assert_eq!(batch.records().len(), 1); + assert_eq!(batch.memory_records().expect("records").len(), 1); assert_eq!( - batch.records()[0].value().as_ref(), + batch + .memory_records() + .expect("failed to get memory records")[0] + .value() + .as_ref(), "a".repeat(100).as_bytes() ); } @@ -996,7 +1070,7 @@ async fn test_stream_fetch_map_error( .await .expect("create stream"); - let mut records: RecordSet = BatchProducer::builder() + let mut records = BatchProducer::builder() .records(10u16) .record_generator(Arc::new(|i, _| { if i < 9 { @@ -1018,7 +1092,9 @@ async fn test_stream_fetch_map_error( let response = stream.next().await.expect("first").expect("response"); assert_eq!(response.partition.records.batches.len(), 1); - let records = response.partition.records.batches[0].records(); + let records = response.partition.records.batches[0] + .memory_records() + .expect("records"); assert_eq!(records.len(), 9); assert_eq!(records[0].value.as_ref(), "0".as_bytes()); assert_eq!(records[1].value.as_ref(), "2".as_bytes()); @@ -1167,9 +1243,9 @@ async fn test_stream_aggregate_fetch_single_batch( assert_eq!(partition.records.batches.len(), 1); let batch = &partition.records.batches[0]; assert_eq!(batch.base_offset, 0); - assert_eq!(batch.records().len(), 5); + assert_eq!(batch.memory_records().expect("records").len(), 5); - let records = batch.records(); + let records = batch.memory_records().expect("records"); assert_eq!("A0", records[0].value().as_str().expect("string")); assert_eq!("A01", records[1].value().as_str().expect("string")); @@ -1328,9 +1404,9 @@ async fn test_stream_aggregate_fetch_multiple_batch( assert_eq!(partition.records.batches.len(), 1); let batch = &partition.records.batches[0]; assert_eq!(batch.base_offset, 0); - assert_eq!(batch.records().len(), 6); + assert_eq!(batch.memory_records().expect("records").len(), 6); - let records = batch.records(); + let records = batch.memory_records().expect("records"); debug!("final records {:#?}", records); assert_eq!("A0", records[0].value().as_str().expect("string")); @@ -1653,7 +1729,12 @@ async fn test_stream_fetch_array_map( let batch = &response.partition.records.batches[0]; // Output: 10 records containing integers 0-9 - for (i, record) in batch.records().iter().enumerate() { + for (i, record) in batch + .memory_records() + .expect("memory records") + .iter() + .enumerate() + { assert_eq!( record.value.as_ref(), RecordData::from(i.to_string()).as_ref() @@ -1771,7 +1852,7 @@ async fn test_stream_fetch_filter_map( assert_eq!(response.partition.records.batches.len(), 1); let batch = &response.partition.records.batches[0]; - assert_eq!(batch.records().len(), 2); + assert_eq!(batch.memory_records().expect("records").len(), 2); // Output: // @@ -1780,7 +1861,7 @@ async fn test_stream_fetch_filter_map( // 33 -> _ // 44 -> 22 // 55 -> _ - let records = batch.records(); + let records = batch.memory_records().expect("records"); assert_eq!(records[0].value, RecordData::from(11.to_string())); assert_eq!(records[1].value, RecordData::from(22.to_string())); @@ -1900,12 +1981,22 @@ async fn test_stream_fetch_filter_with_params( let batch = &partition.records.batches[0]; assert_eq!(batch.base_offset, 0); - assert_eq!(batch.records().len(), 1); + assert_eq!(batch.memory_records().expect("records").len(), 1); assert_eq!( - batch.records()[0].value().as_ref(), + batch + .memory_records() + .expect("failed to get memory records")[0] + .value() + .as_ref(), "b".repeat(100).as_bytes() ); - assert_eq!(batch.records()[0].get_offset_delta(), 0); + assert_eq!( + batch + .memory_records() + .expect("failed to get memory records")[0] + .get_offset_delta(), + 0 + ); assert_eq!(partition.error_code, ErrorCode::None); assert_eq!(partition.high_watermark, 2); @@ -1945,12 +2036,22 @@ async fn test_stream_fetch_filter_with_params( assert_eq!(partition.records.batches.len(), 1); let batch = &partition.records.batches[0]; assert_eq!(batch.base_offset, 0); - assert_eq!(batch.records().len(), 1); + assert_eq!(batch.memory_records().expect("records").len(), 1); assert_eq!( - batch.records()[0].value().as_ref(), + batch + .memory_records() + .expect("failed to get memory records")[0] + .value() + .as_ref(), "a".repeat(100).as_bytes() ); - assert_eq!(batch.records()[0].get_offset_delta(), 1); + assert_eq!( + batch + .memory_records() + .expect("failed to get memory records")[0] + .get_offset_delta(), + 1 + ); } server_end_event.notify(); @@ -2189,13 +2290,13 @@ async fn test_stream_fetch_join( assert_eq!(response.partition.records.batches.len(), 1); let batch = &response.partition.records.batches[0]; - assert_eq!(batch.records().len(), 2); + assert_eq!(batch.memory_records().expect("records").len(), 2); // Output: // + 9 // 11 -> 20 // 22 -> 31 - let records = batch.records(); + let records = batch.memory_records().expect("records"); assert_eq!(records[0].value, RecordData::from(20.to_string())); assert_eq!(records[1].value, RecordData::from(31.to_string())); @@ -2229,13 +2330,13 @@ async fn test_stream_fetch_join( assert_eq!(response.partition.records.batches.len(), 1); let batch = &response.partition.records.batches[0]; - assert_eq!(batch.records().len(), 2); + assert_eq!(batch.memory_records().expect("records").len(), 2); // Output: // + 9 // 33 -> 42 // 44 -> 53 - let records = batch.records(); + let records = batch.memory_records().expect("records"); assert_eq!(records[0].value, RecordData::from(42.to_string())); assert_eq!(records[1].value, RecordData::from(53.to_string())); @@ -2287,13 +2388,13 @@ async fn test_stream_fetch_join( assert_eq!(response.partition.records.batches.len(), 1); let batch = &response.partition.records.batches[0]; - assert_eq!(batch.records().len(), 2); + assert_eq!(batch.memory_records().expect("records").len(), 2); // Output: // + 22 // 55 -> 77 // 66 -> 88 - let records = batch.records(); + let records = batch.memory_records().expect("records"); assert_eq!(records[0].value, RecordData::from(77.to_string())); assert_eq!(records[1].value, RecordData::from(88.to_string())); diff --git a/crates/fluvio-spu/src/storage/mod.rs b/crates/fluvio-spu/src/storage/mod.rs index a27aea9975..554ec0e5e9 100644 --- a/crates/fluvio-spu/src/storage/mod.rs +++ b/crates/fluvio-spu/src/storage/mod.rs @@ -2,6 +2,7 @@ use std::sync::Arc; use std::fmt::Debug; use std::time::Instant; +use dataplane::batch::BatchRecords; use tracing::{debug, instrument}; use async_rwlock::{RwLock, RwLockReadGuard, RwLockWriteGuard}; @@ -126,9 +127,9 @@ where } #[instrument(skip(self, records, hw_update))] - pub async fn write_record_set( + pub async fn write_record_set( &self, - records: &mut RecordSet, + records: &mut RecordSet, hw_update: bool, ) -> Result { debug!( diff --git a/crates/fluvio-test/src/tests/smoke/message.rs b/crates/fluvio-test/src/tests/smoke/message.rs index 32b8989969..cd37e559ce 100644 --- a/crates/fluvio-test/src/tests/smoke/message.rs +++ b/crates/fluvio-test/src/tests/smoke/message.rs @@ -51,14 +51,15 @@ pub fn validate_message(iter: u32, offset: i64, test_case: &SmokeTestCase, data: for i in 0..prefix_len { assert!( data[i] == prefix[i], - "prefix failed, iter: {}, index: {}, data: {}, prefix: {}, data len: {}, offset: {}, topic: {}", + "prefix failed, iter: {}, index: {}, data: {}, prefix: {}, data len: {}, offset: {}, topic: {}, full_data: {}", iter, i, data[i], prefix_string, data.len(), offset, - test_case.environment.base_topic_name().as_str() + test_case.environment.base_topic_name().as_str(), + std::str::from_utf8(data).expect("failed to parse"), ); } diff --git a/crates/fluvio/Cargo.toml b/crates/fluvio/Cargo.toml index b6d9b9fd90..918561d19e 100644 --- a/crates/fluvio/Cargo.toml +++ b/crates/fluvio/Cargo.toml @@ -56,6 +56,7 @@ fluvio-sc-schema = { version = "0.12.1", path = "../fluvio-sc-schema", default-f fluvio-socket = { path = "../fluvio-socket", version = "0.11.0" } fluvio-protocol = { path = "../fluvio-protocol", version = "0.7" } dataplane = { version = "0.10.0", path = "../fluvio-dataplane-protocol", package = "fluvio-dataplane-protocol" } +fluvio-compression = { version = "0.1.0", path = "../fluvio-compression" } [target.'cfg(not(target_arch = "wasm32"))'.dependencies] dirs = "4.0.0" diff --git a/crates/fluvio/src/consumer.rs b/crates/fluvio/src/consumer.rs index 9b41c82dd2..93af760cb4 100644 --- a/crates/fluvio/src/consumer.rs +++ b/crates/fluvio/src/consumer.rs @@ -17,7 +17,7 @@ pub use fluvio_spu_schema::server::stream_fetch::{ use dataplane::Isolation; use dataplane::ReplicaKey; use dataplane::ErrorCode; -use dataplane::batch::Batch; +use dataplane::batch::{Batch}; use fluvio_types::event::offsets::OffsetPublisher; use crate::FluvioError; @@ -242,7 +242,18 @@ where // the records down the consumer stream, THEN an Err with the error inside. // This way the consumer always gets to read all records that were properly // processed before hitting an error, so that the error does not obscure those records. - let batches = response.partition.records.batches.into_iter().map(Ok); + let batches = response + .partition + .records + .batches + .into_iter() + .map(|raw_batch| { + let batch: Result = raw_batch.try_into(); + match batch { + Ok(batch) => Ok(batch), + Err(err) => Err(ErrorCode::Other(err.to_string())), + } + }); let error = { let code = response.partition.error_code; match code { @@ -339,7 +350,9 @@ where .await?; let ft_stream = async move { - if let Some(Ok(response)) = stream.next().await { + if let Some(Ok(raw_response)) = stream.next().await { + let response: DefaultStreamFetchResponse = raw_response; + let stream_id = response.stream_id; trace!("first stream response: {:#?}", response); @@ -502,7 +515,7 @@ where .records .batches .iter() - .map(|it| it.records().len()) + .map(|it| it.records_len()) .sum(); let diff = self.remaining - count as i64; self.remaining = diff.max(0); diff --git a/crates/fluvio/src/error.rs b/crates/fluvio/src/error.rs index 7cf9eee5d0..db6e673c6c 100644 --- a/crates/fluvio/src/error.rs +++ b/crates/fluvio/src/error.rs @@ -1,5 +1,6 @@ use std::io::Error as IoError; +use fluvio_compression::CompressionError; use fluvio_socket::SocketError; use fluvio_sc_schema::ApiError; use semver::Version; @@ -54,6 +55,8 @@ To interact with this cluster, please install the matching CLI version using the Producer(#[from] ProducerError), #[error("Error building producer config: {0}")] TopicProducerConfigBuilder(#[from] TopicProducerConfigBuilderError), + #[error("Compression error: {0}")] + Compression(#[from] CompressionError), #[error("Unknown error: {0}")] Other(String), } diff --git a/crates/fluvio/src/lib.rs b/crates/fluvio/src/lib.rs index 693eec1ce7..d75ff6fafe 100644 --- a/crates/fluvio/src/lib.rs +++ b/crates/fluvio/src/lib.rs @@ -109,6 +109,8 @@ pub use offset::Offset; pub use crate::admin::FluvioAdmin; pub use crate::fluvio::Fluvio; +pub use fluvio_compression::Compression; + pub(crate) mod built_info { include!(concat!(env!("OUT_DIR"), "/built.rs")); } diff --git a/crates/fluvio/src/producer/accumulator.rs b/crates/fluvio/src/producer/accumulator.rs index 84769dbec8..7365f13fdc 100644 --- a/crates/fluvio/src/producer/accumulator.rs +++ b/crates/fluvio/src/producer/accumulator.rs @@ -4,6 +4,8 @@ use std::sync::Arc; use async_lock::Mutex; use async_channel::Sender; +use dataplane::batch::{RawRecords, Batch}; +use fluvio_compression::{Compression, CompressionError}; use tracing::trace; use dataplane::{Offset, ErrorCode}; @@ -29,10 +31,11 @@ const ENCODING_PROTOCOL_VERSION: i16 = 0; pub(crate) struct RecordAccumulator { batch_size: usize, batches: Arc>, + compression: Compression, } impl RecordAccumulator { - pub(crate) fn new(batch_size: usize, partition_n: i32) -> Self { + pub(crate) fn new(batch_size: usize, partition_n: i32, compression: Compression) -> Self { let mut batches = HashMap::default(); for i in 0..partition_n { batches.insert( @@ -43,6 +46,7 @@ impl RecordAccumulator { Self { batches: Arc::new(batches), batch_size, + compression, } } @@ -76,7 +80,7 @@ impl RecordAccumulator { "Batch is full. Creating a new batch for partition" ); - let mut batch = ProducerBatch::new(self.batch_size); + let mut batch = ProducerBatch::new(self.batch_size, self.compression); match batch.push_record(record) { Some(push_record) => { @@ -115,14 +119,15 @@ where { pub(crate) struct ProducerBatch { pub(crate) notify: Sender<(Offset, ErrorCode)>, batch_metadata: Arc, + compression: Compression, write_limit: usize, - current_size: usize, + current_size_uncompressed: usize, is_full: bool, create_time: Instant, pub(crate) records: Vec, } impl ProducerBatch { - fn new(write_limit: usize) -> Self { + fn new(write_limit: usize, compression: Compression) -> Self { let now = Instant::now(); let (sender, receiver) = async_channel::bounded(1); let batch_metadata = Arc::new(BatchMetadata::new(receiver)); @@ -130,14 +135,19 @@ impl ProducerBatch { Self { notify: sender, batch_metadata, + compression, is_full: false, write_limit, create_time: now, - current_size: 0, + current_size_uncompressed: 0, records: vec![], } } + pub(crate) fn compression(&self) -> Compression { + self.compression + } + pub(crate) fn create_time(&self) -> &Instant { &self.create_time } @@ -149,12 +159,16 @@ impl ProducerBatch { let relative_offset = self.records.len() as i64; let record_size = record.write_size(ENCODING_PROTOCOL_VERSION); - if self.current_size + record_size > self.write_limit { + if self.estimated_size() + record_size > self.write_limit { self.is_full = true; return None; } - self.current_size += record_size; + if self.estimated_size() + record_size == self.write_limit { + self.is_full = true; + } + + self.current_size_uncompressed += record_size; self.records.push(record); @@ -165,10 +179,49 @@ impl ProducerBatch { } pub(crate) fn is_full(&self) -> bool { - self.is_full || self.write_limit <= self.current_size + self.is_full || self.write_limit <= self.estimated_size() + } + + fn estimated_size(&self) -> usize { + (self.current_size_uncompressed as f32 + * match self.compression { + Compression::None => 1.0, + Compression::Gzip | Compression::Snappy | Compression::Lz4 => 0.5, + }) as usize } } +impl TryFrom for Batch { + type Error = CompressionError; + fn try_from(p_batch: ProducerBatch) -> Result { + let mut batch = Self::default(); + let compression = p_batch.compression(); + let records = p_batch.records; + + let records: Vec<_> = records + .into_iter() + .enumerate() + .map(|(i, mut record)| { + record.preamble.set_offset_delta(i as Offset); + record + }) + .collect(); + + let len = records.len() as i32; + + let header = batch.get_mut_header(); + header.last_offset_delta = if len > 0 { len - 1 } else { len }; + + header.set_compression(compression); + + let mut buf = vec![]; + records.encode(&mut buf, 0)?; + let compressed_records = compression.compress(&buf)?; + *batch.mut_records() = RawRecords(compressed_records); + + Ok(batch) + } +} pub(crate) struct BatchEvents { batch_full: EventHandler, new_batch: EventHandler, @@ -216,7 +269,7 @@ mod test { let size = record.write_size(ENCODING_PROTOCOL_VERSION); // Producer batch that can store three instances of Record::from(("key", "value")) - let mut pb = ProducerBatch::new(size * 3 + 1); + let mut pb = ProducerBatch::new(size * 3 + 1, Compression::None); assert!(pb.push_record(record.clone()).is_some()); assert!(pb.push_record(record.clone()).is_some()); @@ -233,7 +286,7 @@ mod test { let size = record.write_size(ENCODING_PROTOCOL_VERSION); // Producer batch that can store three instances of Record::from(("key", "value")) - let mut pb = ProducerBatch::new(size * 3); + let mut pb = ProducerBatch::new(size * 3, Compression::None); assert!(pb.push_record(record.clone()).is_some()); assert!(pb.push_record(record.clone()).is_some()); @@ -248,7 +301,7 @@ mod test { async fn test_record_accumulator() { let record = Record::from(("key", "value")); let size = record.write_size(ENCODING_PROTOCOL_VERSION); - let accumulator = RecordAccumulator::new(size * 3, 1); + let accumulator = RecordAccumulator::new(size * 3, 1, Compression::None); let timeout = std::time::Duration::from_millis(200); let batches = accumulator diff --git a/crates/fluvio/src/producer/config.rs b/crates/fluvio/src/producer/config.rs index 7497524be0..38c93d0019 100644 --- a/crates/fluvio/src/producer/config.rs +++ b/crates/fluvio/src/producer/config.rs @@ -1,11 +1,26 @@ use std::time::Duration; +use tracing::debug; + use derive_builder::Builder; +use once_cell::sync::Lazy; + +use fluvio_compression::Compression; use crate::producer::partitioning::{Partitioner, SiphashRoundRobinPartitioner}; const DEFAULT_LINGER_MS: u64 = 100; const DEFAULT_BATCH_SIZE_BYTES: usize = 16_384; +static DEFAULT_COMPRESSION_CODEC: Lazy = Lazy::new(|| { + use std::env; + let var_value = + env::var("FLV_CLIENT_DEFAULT_COMPRESSION_CODEC").unwrap_or_else(|_| "none".to_string()); + let compression: Compression = var_value.parse().unwrap_or_else(|e| { + debug!(?e, "unknown compression format in FLV_CLIENT_DEFAULT_COMPRESSION_CODEC, using Compression::None"); + Compression::None + }); + compression +}); fn default_batch_size() -> usize { DEFAULT_BATCH_SIZE_BYTES @@ -18,6 +33,11 @@ fn default_linger_duration() -> Duration { fn default_partitioner() -> Box { Box::new(SiphashRoundRobinPartitioner::new()) } + +fn default_compression() -> Compression { + *DEFAULT_COMPRESSION_CODEC +} + /// Options used to adjust the behavior of the Producer. /// Create this struct with [`TopicProducerConfigBuilder`]. /// @@ -34,14 +54,19 @@ pub struct TopicProducerConfig { /// Partitioner assigns the partition to each record that needs to be send #[builder(default = "default_partitioner()")] pub(crate) partitioner: Box, + + /// Compression algorithm used by Fluvio producer to compress data. + #[builder(default = "default_compression()")] + pub(crate) compression: Compression, } impl Default for TopicProducerConfig { fn default() -> Self { Self { - linger: Duration::from_millis(DEFAULT_LINGER_MS), - batch_size: DEFAULT_BATCH_SIZE_BYTES, - partitioner: Box::new(SiphashRoundRobinPartitioner::new()), + linger: default_linger_duration(), + batch_size: default_batch_size(), + partitioner: default_partitioner(), + compression: default_compression(), } } } diff --git a/crates/fluvio/src/producer/mod.rs b/crates/fluvio/src/producer/mod.rs index be37509234..d0769192e2 100644 --- a/crates/fluvio/src/producer/mod.rs +++ b/crates/fluvio/src/producer/mod.rs @@ -288,7 +288,9 @@ impl TopicProducer { .ok_or_else(|| FluvioError::TopicNotFound(topic.to_string()))? .spec; let partition_count = topic_spec.partitions(); - let record_accumulator = RecordAccumulator::new(config.batch_size, partition_count); + let compression = config.compression; + let record_accumulator = + RecordAccumulator::new(config.batch_size, partition_count, compression); let producer_pool = ProducerPool::shared( topic.clone(), spu_pool.clone(), diff --git a/crates/fluvio/src/producer/partition_producer.rs b/crates/fluvio/src/producer/partition_producer.rs index 3431c37e67..d59833bec1 100644 --- a/crates/fluvio/src/producer/partition_producer.rs +++ b/crates/fluvio/src/producer/partition_producer.rs @@ -4,7 +4,6 @@ use std::time::Duration; use async_lock::{Mutex, RwLock}; use dataplane::ReplicaKey; -use dataplane::batch::Batch; use dataplane::produce::{DefaultPartitionRequest, DefaultTopicRequest, DefaultProduceRequest}; use fluvio_future::timer::sleep; use fluvio_types::SpuId; @@ -200,11 +199,12 @@ impl PartitionProducer { let mut batch_notifiers = vec![]; for p_batch in batches_ready { - let batch = p_batch.records; + let notify = p_batch.notify.clone(); + let batch = p_batch.try_into()?; - let batch = Batch::from(batch); partition_request.records.batches.push(batch); - batch_notifiers.push(p_batch.notify); + + batch_notifiers.push(notify); } topic_request.partitions.push(partition_request);