Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Bug]: Offset::end() with topic replicas reads from beginning. #3788

Closed
gibbz00 opened this issue Dec 19, 2023 · 6 comments
Closed

[Bug]: Offset::end() with topic replicas reads from beginning. #3788

gibbz00 opened this issue Dec 19, 2023 · 6 comments
Assignees
Labels
bug Something isn't working

Comments

@gibbz00
Copy link

gibbz00 commented Dec 19, 2023

As the title suggests, I'm unable to get a consumer with Offset::end() to work when the topic replica factor is higher than 1.

Here's what I'm using to reproduce this:

use fluvio::{metadata::topic::TopicSpec, FluvioAdmin, Offset, RecordKey};
use futures::TryStreamExt;
use std::time::Duration;

const DELAY_MILLIS: u64 = 1000;
const MAX_RECORDS: u8 = 10;
const TOPIC_NAME: &str = "dectest-offset";
const TOPIC_REPLICAS: u32 = 3;

#[tokio::main]
async fn main() -> anyhow::Result<()> {
    reset_topic().await?;

    println!("Number of topic replicas: {}", TOPIC_REPLICAS);

    tokio::spawn(consume(TestOffset::Beginning));

    let producer = fluvio::producer(TOPIC_NAME).await?;
    for index in 0..MAX_RECORDS {
        producer.send(RecordKey::NULL, index.to_string()).await?;
        println!("[PRODUCER] sent: {}", index);
        tokio::time::sleep(Duration::from_millis(DELAY_MILLIS)).await;

        if index == 5 {
            tokio::spawn(consume(TestOffset::End));
        }
    }

    Ok(())
}

enum TestOffset {
    Beginning,
    End,
}

async fn consume(offset: TestOffset) -> anyhow::Result<()> {
    let mut stream = fluvio::consumer(TOPIC_NAME, 0)
        .await?
        .stream(match offset {
            TestOffset::Beginning => Offset::beginning(),
            TestOffset::End => Offset::end(),
        })
        .await?;

    while let Some(record) = stream.try_next().await? {
        let index = record.get_value().as_utf8_lossy_string().parse::<u8>()?;

        println!(
            "[CONSUMER_{}] recieved: {}",
            match offset {
                TestOffset::Beginning => "BEGINNING",
                TestOffset::End => "END",
            },
            index
        );

        if index == MAX_RECORDS - 1 {
            break;
        }
    }

    Ok(())
}

async fn reset_topic() -> anyhow::Result<()> {
    let admin = FluvioAdmin::connect().await?;
    let _ = admin.delete::<TopicSpec>(TOPIC_NAME).await;
    admin
        .create(TOPIC_NAME.to_string(), false, TopicSpec::new_computed(1, TOPIC_REPLICAS, None))
        .await?;

    Ok(())
}

And here's the output:

Number of topic replicas: 1
[PRODUCER] sent: 0
[CONSUMER_BEGINNING] recieved: 0
... repeated normally
[PRODUCER] sent: 5
[CONSUMER_BEGINNING] recieved: 5
[PRODUCER] sent: 6
[CONSUMER_BEGINNING] recieved: 6
[CONSUMER_END] recieved: 6
... repeated normally
[PRODUCER] sent: 9
[CONSUMER_END] recieved: 9
[CONSUMER_BEGINNING] recieved: 9
Number of topic replicas: 3
[PRODUCER] sent: 0
[CONSUMER_BEGINNING] recieved: 0
... repeated normally
[CONSUMER_BEGINNING] recieved: 4
[PRODUCER] sent: 5
[CONSUMER_BEGINNING] recieved: 5
[PRODUCER] sent: 6
[CONSUMER_END] recieved: 0
[CONSUMER_END] recieved: 1
[CONSUMER_END] recieved: 2
[CONSUMER_END] recieved: 3
[CONSUMER_END] recieved: 4
[CONSUMER_END] recieved: 5
[CONSUMER_BEGINNING] recieved: 6
[CONSUMER_END] recieved: 6
[PRODUCER] sent: 7
[CONSUMER_END] recieved: 7
[CONSUMER_BEGINNING] recieved: 7
[PRODUCER] sent: 8
[CONSUMER_END] recieved: 8
[CONSUMER_BEGINNING] recieved: 8
[PRODUCER] sent: 9
[CONSUMER_BEGINNING] recieved: 9
[CONSUMER_END] recieved: 9

About 10% of the times though, things execute correctly, showing the same output as the first case.

Replicas were enabled by applying this resource:

apiVersion: fluvio.infinyon.com/v1
kind: SpuGroup
metadata:
  name: main
spec:
  replicas: 3

Versions

[dependencies]
anyhow = "1.0.75"
fluvio ={ version = "0.21.3", default-features = false, features = ["rustls"] }
tokio = { version = "1.35.0", features = ["full"] }
futures = "0.3.29"
Fluvio CLI           : 0.11.2
Fluvio CLI Arch      : aarch64-unknown-linux-musl
Fluvio CLI SHA256    : 84eeb3d4806ce3c6fe1c5fc79652d8a3dd5a67c23b13c88f79f042f5b244f0db
Fluvio channel frontend SHA256 : 84eeb3d4806ce3c6fe1c5fc79652d8a3dd5a67c23b13c88f79f042f5b244f0db
Fluvio Platform      : 0.11.2 (k3d-cluster)
Git Commit           : cddc05388b7a1278837adc506430f339f311212d
@gibbz00 gibbz00 added the bug Something isn't working label Dec 19, 2023
@ajhunyady
Copy link
Contributor

@gibbz00 thanks for reporting, we'll take a look.

@digikata
Copy link
Contributor

digikata commented Dec 20, 2023

Thanks for the example code @gibbz00! I used it to reproduce the behavior and then analyze the issue a bit further.

Overall it looks like it has to do with our replica startup logic. If a client start looking at end offsets before a FLV_SHORT_RECONCILLATION default of 10 seconds, you can get different end offset values. This depends on the spu connected to. Once the sync starts, the replicas stay in sync in a finer interval.

The FLV_SHORT_RECONCILLATION time can be customized by setting an environment variable of the same name to a value in seconds. If the cluster is started with FLV_SHORT_RECONCILLATION=3 the example works as expected. There is a tradeoff in how short or long the setting is and how much network chatter that might create.

It is a little unexpected that we don't start the sync up earlier and I have opened an issue to take a look at that over a longer term. (and fix the mispelling of reconciliation). #3790

@digikata
Copy link
Contributor

digikata commented Dec 20, 2023

Some small modifications to rule out a race from producer to consumer, and play with when reading End started:

use std::sync::Arc;
use fluvio::{metadata::topic::TopicSpec, FluvioAdmin, Offset, RecordKey};
use futures::TryStreamExt;
use std::time::Duration;

const DELAY_MILLIS: u64 = 1000;
const MAX_RECORDS: u8 = 15;
const REC_TRIGGER_OFFSET: u8 = 11;  // start sometime after FLV_SHORT_RECONCILLIATION setting in seconds
const TOPIC_NAME: &str = "dectest-offset";
const TOPIC_REPLICAS: u32 = 3;

#[tokio::main]
async fn main() -> anyhow::Result<()> {
    reset_topic().await?;

    println!("Number of topic replicas: {}", TOPIC_REPLICAS);

    let notify_start = Arc::new(tokio::sync::Notify::new());
    tokio::spawn(consume(TestOffset::Beginning, notify_start.clone()));
    tokio::spawn(consume(TestOffset::End, notify_start.clone()));

    let producer = fluvio::producer(TOPIC_NAME).await?;
    for index in 0..MAX_RECORDS {
        producer.send(RecordKey::NULL, index.to_string()).await?;
        println!("[PRODUCER] sent: {}", index);
        tokio::time::sleep(Duration::from_millis(DELAY_MILLIS)).await;
    }

    Ok(())
}

#[derive(PartialEq)]
enum TestOffset {
    Beginning,
    End,
}

async fn consume(offset: TestOffset, onotify: Arc<tokio::sync::Notify>) -> anyhow::Result<()> {
    // wait for consume signal to start
    if offset == TestOffset::End {
        onotify.notified().await;
    }

    let mut stream = fluvio::consumer(TOPIC_NAME, 0)
        .await?
        .stream(match offset {
            TestOffset::Beginning => Offset::beginning(),
            TestOffset::End => Offset::end(),
        })
        .await?;

    while let Some(record) = stream.try_next().await? {
        let index = record.get_value().as_utf8_lossy_string().parse::<u8>()?;

        println!(
            "[CONSUMER_{}] recieved: {}",
            match offset {
                TestOffset::Beginning => "BEGINNING",
                TestOffset::End => "END",
            },
            index
        );

        if offset == TestOffset::Beginning && index == REC_TRIGGER_OFFSET {
            onotify.notify_waiters();
        }

        if index == MAX_RECORDS - 1 {
            break;
        }
    }

    Ok(())
}

async fn reset_topic() -> anyhow::Result<()> {
    let admin = FluvioAdmin::connect().await?;
    let _ = admin.delete::<TopicSpec>(TOPIC_NAME).await;
    admin
        .create(TOPIC_NAME.to_string(), false, TopicSpec::new_computed(1, TOPIC_REPLICAS, None))
        .await?;

    Ok(())
}

@gibbz00
Copy link
Author

gibbz00 commented Dec 20, 2023

Hi, thanks for the response and taking your time to analyze the issue 😊

Can also confirm that your conclusion seems correct by running your code over here. (Only when FLV_SHORT_RECONCILLATION was set to 3 though, not consistently on 10.) Shortening it will have to do for now :3

Again, big thank you

@ajhunyady
Copy link
Contributor

ajhunyady commented Dec 20, 2023

@gibbz00, I'd love to connect on Discord. We are rolling out Stateful Services and you may want to join our private release. Please DM me if you are interested.

@digikata
Copy link
Contributor

I'll close this, but if you run into more problems or have added questions, feel free to reopen or create a new issue

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something isn't working
Projects
None yet
Development

No branches or pull requests

3 participants