Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Transactions #216

Closed
5 of 7 tasks
VictorDenisov opened this issue Feb 24, 2019 · 15 comments
Closed
5 of 7 tasks

Transactions #216

VictorDenisov opened this issue Feb 24, 2019 · 15 comments
Assignees

Comments

@VictorDenisov
Copy link
Contributor

VictorDenisov commented Feb 24, 2019

I would like to work on transactions for kafka-go: https://www.confluent.io/blog/transactions-apache-kafka/
Are there any objections?

Tasks:

@achille-roussel
Copy link
Contributor

The plan sounds solid to me 👍

@Andy320
Copy link

Andy320 commented Mar 19, 2019

@VictorDenisov @achille-roussel
Thanks for your hard working about this major feature. And may I know the ETA?Looking forward to it in our project.

@VictorDenisov
Copy link
Contributor Author

It's difficult to say. Of course I'm just like you are interested in delivering it as soon as possible.

@VictorDenisov
Copy link
Contributor Author

@achille-roussel I have a question about this function:

func (batch *Batch) Read(b []byte) (int, error) {

and this function
func (batch *Batch) readMessage(

Was there any specific reason to implement readMessage with key and value readers as arguments? I would like to refactor it to return full message instead of reading it using key function.

I'm asking because transactions introduce control messages and control messages should not surface to the client app. The key of control messages specifies the type of the control message - aborted or committed transaction. It means I need to read the key of a control message within

func (r *messageSetReaderV2) readMessage(min int64,
.
If this message is an abort message then I need to abandon the whole chunk of messages between the previous control message and the current control message. It means I need to read the messages, store them somewhere and once I understand what message I want to return to the client I need to invoke the key function for the key of the message.

Long story short - it would make more sense to me to return full message from readMessage function.
Do you have any comment?

@sneko
Copy link

sneko commented Jun 24, 2019

@VictorDenisov great initiative! 👍

I saw you closed some of your PRs, is it still something you're working on?

Thank you,

@VictorDenisov
Copy link
Contributor Author

@sneko Yes. The work is still in progress. There is a PR for reading transactional topics: #261 . Currently I'm working on writing transactional message. I would give you a link to my branch, but it's mostly dysfunctional yet. I'll post it here once I have a hint of working code.

@VictorDenisov
Copy link
Contributor Author

@sneko I merged reading transactional messages(#261) to the master branch of my personal repository: https://github.com/VictorDenisov/kafka-go so that it can be imported in go programs. If you decide to give it a try let me know how it goes.

@sneko
Copy link

sneko commented Nov 13, 2019

Sorry for the delay of my answer @VictorDenisov , I need to finish other things before digging into Kafka transactions, but I will 😄

During last months, did you get critical issue with the last version you pushed onto your repo?

Thanks 🐰

@VictorDenisov
Copy link
Contributor Author

@sneko I actually ended up using confluent's kafka go. It has a way worse api, but it has all the features. One producer per topic turned out to be a major blocker to me.

@sunshine69
Copy link

hi there, what is the staus of transactional support for kafka-go?

I saw the field in the conn config but it is hard for me to use. Basically if I want t conn like this

	tcpConn, err := net.Dial("tcp", brokers[0])
	if err != nil {
		log.Fatalf("failed to dial %v", err)
	}
	myConfig := kafka.ConnConfig{
		ClientID:        kafkaClientId,
		Topic:           kafkaTopicForward,
		Partition:       kafkaPartition,
		TransactionalID: "my-transaction-id",
	}
	connForward := kafka.NewConnWith(tcpConn, myConfig)

How can I set the partition for the connForward in that example? Without this, the method

_, err = connForward.WriteMessages(
			kafka.Message{
				Key:     m.Key,
				Value:   m.Value,
				Headers: m.Headers,
			})

will return error that it tries to connect to non leader partition.

@sunshine69
Copy link

Bugger. I found it in the conn config struct. Should work, trying now ...

@sunshine69
Copy link

And then I realized that I already set it correctly but I got error

2020/11/21 17:19:27 failed to write messages:[6] Not Leader For Partition: the client attempted to send messages to a replica that is not the leader for some partition, the client's metadata are likely out of date

@sunshine69
Copy link

and if I use the conn like this (and I guess transaction is not enabled?)

connForward, err := kafka.DialLeader(ctx, "tcp", brokers[0], kafkaTopicForward, kafkaPartition)

then the write is ok without errors

@VictorDenisov
Copy link
Contributor Author

I'm not sure about the state of transactions in this client. Last time I left it, it could only consume records with transactions and couldn't really write transactional records.

@achille-roussel
Copy link
Contributor

I will close this issue as the work to support transactions is being tracked elsewhere in #755

Thanks a lot for your valuable contributions!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

5 participants