Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

EoS #173 transactional producer #206

Merged
merged 25 commits into from
Nov 28, 2018
Merged

Conversation

ianwsperber
Copy link
Contributor

@ianwsperber ianwsperber commented Nov 16, 2018

Resolves #173

An implementation of the transactional producer. I chose to deviate from the API discussed in #173 because the "transaction object" seemed like too much syntactic sugar. Given that we can still use the normal send & sendBatch methods to deliver messages in the transaction (I don't think that was clear in the original design, there was no need for an add method or internal buffer), returning a transaction object just seemed to add the overheard of an additional variable.

I'm really not opposed to the transaction object if we think it's more convenient, but I thought it best to first provide an API with the least amount of abstractions.

I'm holding off on adding documentation until we're in agreement on the design 😄

NOTE: I haven't spent much time thinking about error states yet. There could be some work to be done there still.

@ianwsperber ianwsperber self-assigned this Nov 16, 2018
transactional,
transactionalId,
}) => {
if (transactional && !transactionalId) {
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have many questions on how to pick a good transactionalId; I'm not saying we should set this by default but it isn't the most trivial value ever, we should think about making this easier later.

Some reference:
https://www.confluent.io/blog/transactions-apache-kafka/

The key to fencing out zombies properly is to ensure that the input topics and partitions in the read-process-write cycle is always the same for a given transactional.id. If this isn’t true, then it is possible for some messages to leak through the fencing provided by transactions.

Copy link
Owner

@tulios tulios left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Outstanding work! Sorry for the delay, I was traveling. I'm guessing that you will need the consumer to start using the transactional producer, right? It will be nice to give the whole flow a run before we commit to any APIs, but it might be more work than it is worth it

getProducerId() {
return producerId
beginTransaction() {
transactionalGuard()
Copy link
Owner

@tulios tulios Nov 20, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[...] returning a transaction object just seemed to add the overheard of an additional variable.

The idea behind the transaction object was to create a design where users won't accidentally call the wrong methods and it also prevents this kind of code, where you have to check if it is transactional before each operation. I'm all for developer ergonomics but I'm open to any design, WDYT @Nevon ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree that we should aim for good ergonomics, but I'm afraid a transaction object will create confusion around who is responsible for the transaction. At the end of the day it's still the producer that sends messages and advances the state of the transaction. Will be good to have more feedback on the best design from @Nevon et al

Copy link
Collaborator

@Nevon Nevon Nov 21, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Given that we get so many questions that are just misunderstandings of the APIs or calling things out of order, I would strongly advocate that if we can make it impossible to misuse the external API, we should make it so, rather than giving the users a footgun and then yelling at them when they shoot themselves. 😅

At the end of the day it's still the producer that sends messages and advances the state of the transaction

It sounds to me like the problem then is that the publicly exposed producer interface is the same one that we use internally. The idea behind returning the transaction object was that then there wouldn't be a way to try to send from a transactional producer without having started a transaction, because that API wouldn't be exposed.

Some fantasy example code (ignore the option names, parameters, etc.):

const producer = kafka.producer({ transactional: true })
const transaction = producer.transaction() // this is basically like `beginTransaction`
await transaction.send([ ... ])
await transaction.commit()

producer.send() // `send` is not a function, so there's nothing to call

You know more of the details of how the transactional producer works, so maybe this design isn't feasible, but I would really like to avoid us doing the same thing that the Java client does, as I think it's neither user-friendly nor nice for the code, as you end up with all these guards everywhere.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Although thinking about it some more, if each producer can only have one transaction in progress at any given time, the user could still try to create more transactions before committing or rolling back the previous one, so I guess it would still be prone to misuse. 🤔

src/producer/transactionManager.js Outdated Show resolved Hide resolved
src/producer/transactionManager.js Outdated Show resolved Hide resolved
},
}

// Enforce the state machine
stateMachine.guard(transactionManager, 'initProducerId', [STATES.UNINITIALIZED, STATES.READY])
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I like this; it makes the whole definition clear. What about moving it to the state machine initialization? I feel that having this bit far away from the creation of the state machine will make the code harder to grasp on the long run

Copy link
Contributor Author

@ianwsperber ianwsperber Nov 20, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since we reference the state machine within these methods to transition state I think it'd be a bit weird to add the guards during instantiation. However I could add a method to add all the guard on instantiation of the transaction manager, which should provide the clarity you're asking for (and allow us to use a "proxy" object, as you mentioned in another comment)

@ianwsperber ianwsperber changed the base branch from master to eos November 20, 2018 18:20
@ianwsperber
Copy link
Contributor Author

@tulios I changed the target branch to eos. We can merge sensitive work into there until the API is stable and the flow has been validated 😄

@Nevon
Copy link
Collaborator

Nevon commented Nov 21, 2018

@ianwsperber You can point it back to master. We just won't release a new version while working on this. It beats having a long-lived giant feature branch. 😄

@@ -58,6 +67,10 @@ module.exports = class Client {
logger: this[PRIVATE.LOGGER],
cluster,
createPartitioner,
idempotent,
transactional,
transactionalId,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there ever a case in which the producer would have a transactionalId but not have transactional: true?

Maybe these options belong in a single object instead, like:

kafka.producer({
  transactional: {
    id,
    timeout
  }
})

Then you wouldn't need the transactional flag at all (at least not exposed in the public API). Then again, if the id and the timeout are all optional, then how do you signal that you want a transactional producer? Passing in an empty object feels weird. It just feels even weirder to have a bunch of options that can be set in such a way that they contradict each other.

I'm not sure how the idempotent option fits into this, as again, a transactional producer has to be idempotent, so it wouldn't make sense to configure it like:

kafka.producer({
  idempotent: false,
  transactional: { id: 'foo' }
})

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it's reasonable to maintain a separate flag for transactional and transactionalId. This would allow a user to, for example, maintain a transactionalId value that it always present and a transactional flag that is conditionally switched. Also it's more explicit.

The idempotent flag cannot be false if the transactional flag is true. We can validate for this condition but it is odd. If we dislike the pattern than one route would be to expose a separate transaction producer & idempotent producer class or factory method.

@ianwsperber
Copy link
Contributor Author

@Nevon Ok, I can point back to master, but this does introduce risk if there's a need for a hotfix or a desire to release another feature

@ianwsperber ianwsperber changed the base branch from eos to master November 21, 2018 18:03
@tulios
Copy link
Owner

tulios commented Nov 22, 2018

@ianwsperber we can always cherry-pick fixes and release from a tag. I don't want to block you on this, so when you feel that it's good enough, comment here, and we can merge. Things can always be improved or changed on later PRs.

@ianwsperber
Copy link
Contributor Author

@tulios @Nevon I had some time to reflect on the transactional producer design and came around to your suggested API.

// As long as we provide a "transactionalId" we can create a transaction. No "transactional" flag.
const producer = Kafka.producer({ transactionalId: 'foo', idempotent: false, ... })

// Begin a transaction. First time the producer begins a transaction we'll also initialize the PID
const transaction = await producer.transaction()

// Send message with our transactional id, PID & epoch
await transaction.send({ topic: 'foo', messages: [{value: 'bar' }] })
// We could still send messages outside the transaction
await producer.send({ topic: 'foo', messages: [{value: 'bar' }] }) 

// End the transaction
await transaction.commit()
// This API still can't guarantee the method call order will be correct,
// so we raise an exception if the user attempts to use the transaction after it has ended.
await transaction.send({ ... }) // Rejects

I realized that providing a producer.transaction() method we could obviate the need for a transactional flag - we simply opt in to transactions when we call transaction(). Whether or not the user flagged the producer as idempotent, when they call the producer.transaction() method we'll create a separate, transactional producer id, which we'll use only on messages sent from the transaction object. We'll also enforce the idempotent semantics on the transaction object's send methods regardless of whether the parent producer is idempotent.

Not sure if we would also want A. To ensure there is only one active transaction per producer B. Allow the user to provide a transactional ID to the transaction method call. I think A could be a good idea, to prevent the user from accidentally spawning competing transactions. I think B is problematic, because if the user calls transaction multiple times with differing values we'd lose the 1-1 relation between producer and transaction (instead it'd be 1-N) - simpler if for now we only allow setting the transactional ID in producer creation.

LMK what you think of the revised API!!!

@Nevon
Copy link
Collaborator

Nevon commented Nov 25, 2018

I think it looks really good. I agree with you that we should probably stop the user from creating multiple active transactions at the same time. As for allowing them to specify a transaction id in the transaction method call, I think we can hold off on that for now unless we see a very clear use-case for it. We can always add that functionality later if we decide, but it's harder to remove it if we realize it's a bad idea.

@tulios
Copy link
Owner

tulios commented Nov 27, 2018

@ianwsperber I like the new spec, and I agree that we should only accept one transaction per producer and the user should provide the transactionId to the producer function.

…actional-producer

* 'master' of github.com:tulios/kafkajs:
  Replace forEach with forOf which is almost 3x faster
  Add support to protocol ListOffsets v1
  Keep socket request internal to the request queue
  Document purpose of SocketRequest
  Throw error for invalid request state transitions
  Expose maxInFlightRequests
  Update connection to use the request queue
  Abstract the request queue (inflight + pending)
  Move request abstraction to network/requestQueue
  add support to maxInFlightRequests on the connection
  Use INT_32_MAX_VALUE for correlation id
@ianwsperber
Copy link
Contributor Author

@tulios @Nevon Made updates to only allow one ongoing transaction per producer

@tulios tulios merged commit 6faebb3 into master Nov 28, 2018
@tulios
Copy link
Owner

tulios commented Nov 28, 2018

Great work @ianwsperber!

@tulios tulios deleted the eos-173-transactional-producer branch November 29, 2018 08:42
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants