Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Need the ability to produce/consume messages beyond 1MB in size. #2342

Closed
ajhunyady opened this issue May 3, 2022 · 13 comments
Closed

Need the ability to produce/consume messages beyond 1MB in size. #2342

ajhunyady opened this issue May 3, 2022 · 13 comments
Assignees
Milestone

Comments

@ajhunyady
Copy link
Contributor

Currently, when a client producer a record of 5881006 bytes, the producer panics:

I have a rust producer and pass a  JSON string. It results in an error: Record too large. What's wrong?
thread 'main' panicked at 'Failed to send message.: Producer(RecordTooLarge(16384))', src/main.rs:39:9
@ajhunyady ajhunyady added the bug Something isn't working label May 3, 2022
@ajhunyady ajhunyady changed the title Need the ability to configure message size beyond 1MB Need the ability to produce/consume messages beyond 1MB in size. May 3, 2022
@ajhunyady
Copy link
Contributor Author

ajhunyady commented May 3, 2022

pub const STORAGE_MAX_BATCH_SIZE: u32 = 1048588;

pub const STORAGE_MAX_BATCH_SIZE: u32 = 1048588;

This value is not configurable as large records, greater than 1Mb impact performance.

@nacardin
Copy link
Contributor

nacardin commented May 3, 2022

For now, we can increase default size

@nacardin nacardin added this to the 0.9.26 milestone May 3, 2022
@ajhunyady
Copy link
Contributor Author

@nacardin according to @tjtelan and @sehz there are performance considerations that need to be taken into account.

  • Can we quantify the impact in performance degradation?
  • Is it safe or we expose the product to various side-effects?

@sehz sehz added SPU SPU related API Performance consumer and removed bug Something isn't working labels May 3, 2022
@sehz
Copy link
Contributor

sehz commented May 5, 2022

record size must be less than batch and segment size

@thinkrapido
Copy link

Hi @AJ , I've done compression but it's still too large (7131124 uncompressed, 1253783 compressed)

It is not possible for us to control the input because it comes from an RPC out of our control.

This use case is a real show stopper for us.

@ajhunyady
Copy link
Contributor Author

@thinkrapido we have change the configuration to process 2.0MB records, so you should be able to produce/consume your records if you compress them.

Are you currently running a local cluster or against InfinyOn Cloud?

If you are running a local cluster, you can deploy a development build to test it. https://github.com/infinyon/fluvio/blob/master/DEVELOPER.md

Otherwise, this feature will be available in the next release of InfinyOn Cloud.

@morenol
Copy link
Contributor

morenol commented May 7, 2022

Keep in mind that, if you are using the Rust API, you should increase the batch_size too:

     let config = TopicProducerConfigBuilder::default()
                        .batch_size(10_000_000)
                        .compression(Compression::Gzip)
                        .build()?;
     let producer = fluvio.topic_producer_with_config("my-topic", config).await?;
     producer.send(RecordKey::NULL, large_record).await?;

@thinkrapido
Copy link

async fn main() -> ! {
    // kafka
    let consumer = consumer();
    consumer.subscribe(&["modern_blocks_json"])
        .unwrap_or_else(|_| panic!("Failed to subscribe to topics {:?}", "modern_blocks_json"));
    
    // fluvio
    let fluvio = Fluvio::connect().await.expect("Failed to connect to fluvio");
    let config = TopicProducerConfigBuilder::default()
        .batch_size(10_000_000)
        .compression(Compression::Gzip)
        .build().unwrap();
    let producer = fluvio.topic_producer_with_config("modern-blocks-json", config).await.expect("Failed to create a producer");

    loop {
        let r = consumer.recv().await;
        if let Ok(m) = r {
            let message = m                               
                .payload()
                .map(|p| std::str::from_utf8(p).ok())
                .unwrap_or(None)
                .unwrap_or(r#"{}"#)
                .to_string();
            // let payload = message.as_bytes();
            // println!("{}", payload.len());
            // let compressed = Compression::Snappy.compress(payload).unwrap();
            // println!("{}", compressed.len());
            // producer.send(RecordKey::NULL, compressed).await.expect("Failed to send message.");
            producer.send(RecordKey::NULL, message).await.expect("Failed to send message.");
            print!(".");
        }
    }
}

thread 'main' panicked at 'Failed to send message.: Producer(RecordTooLarge(10000000))', src/main.rs:50:59 note: run with RUST_BACKTRACE=1 environment variable to display a backtrace ....

@thinkrapido
Copy link

still not working
fluvio = "0.12.9"

@morenol
Copy link
Contributor

morenol commented May 7, 2022

It looks like the record is even larger than 10_000_000 bytes before compression, could you try with a batch_size larger than that, maybe 15_000_000?

@ajhunyady
Copy link
Contributor Author

@thinkrapido we have some ideas outside of raising the buffer size but we need to understand the shape of your data. Let's schedule some time to talk.

@ajhunyady ajhunyady reopened this May 8, 2022
@ajhunyady
Copy link
Contributor Author

Try buffer sizes up to 32Mb of raw data.

@ajhunyady
Copy link
Contributor Author

Closed by #2356

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment