Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Writer.close() does not stop the requests for metadata, started by Writer.writeMessages() #803

Closed
arjun-1 opened this issue Dec 2, 2021 · 4 comments
Assignees
Labels

Comments

@arjun-1
Copy link

arjun-1 commented Dec 2, 2021

First of all, thank you for writing this great library.

Describe the bug
When calling writer.Close() after writer.WriteMessages(), the Writer keeps causing the periodic making of metadata requests. This happens through Writer's underlying Transport where the metadata requests originate from the discover() entry point. This is problematic in some of the integration tests that I am writing, where I expect no such leaking of resources.

Kafka Version
6.0.1

To Reproduce

func main() {
	c := make(chan bool)

	writer := kafka.Writer{
		Addr: kafka.TCP("localhost:29092"),
		Topic: "topic",
	}

	writer.WriteMessages(context.Background(), kafka.Message{
		Key:   []byte("key"),
		Value: []byte("value"),
	})

	writer.Close();
	<-c
}

Expected behavior
When calling writer.Close() after writer.WriteMessages(), I expect Writer to close all connections that it caused. And to stop any requests from happening that it previously caused.

@achille-roussel
Copy link
Contributor

This may also be related to #805, let me know if the change helped address your issues.

@arjun-1
Copy link
Author

arjun-1 commented Dec 6, 2021

This may also be related to #805, let me know if the change helped address your issues.

@achille-roussel responding to your remark #804 (comment) here: the snippet works. Any connections at the end of my test is now closed. In fact calling transport.CloseIdleConnections() did the trick.

What I don't understand though, why the same couldn't be achieved using writer.Close(). Supposedly, this calls transport.CloseIdleConnections conditionally:

if w.transport != nil {
	w.transport.CloseIdleConnections()
}

It turns out though that w.transport is nil in my case, even though I explicitly provide a Transport when constructing a new kafka.Writer struct. Is that by design?

EDIT:
Looks like this is unintended behaviour? When the Writer struct is created through NewWriter, both fields transport and Transport are provided:

Transport:    transport,
transport:    transport,

That does not happen when Writer was created via the struct. Nevertheless, CloseIdleConnections condition check is based only on transport, not Transport

@achille-roussel
Copy link
Contributor

This is intended actually to retain the historical behavior prior to v0.4 where the writer was managing the connections directly.

If you use a kafka.Writer without specifying the transport, kafka.DefaultTransport is used, and potentially shared by many writers, in which case it doesn't make sense to automatically close the connections when one writer is closed.

@arjun-1
Copy link
Author

arjun-1 commented Dec 7, 2021

This is intended actually to retain the historical behavior prior to v0.4 where the writer was managing the connections directly.

If you use a kafka.Writer without specifying the transport, kafka.DefaultTransport is used, and potentially shared by many writers, in which case it doesn't make sense to automatically close the connections when one writer is closed.

I see, makes sense. Your suggested workaround works for me, so this issue is no longer a problem for me

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

Successfully merging a pull request may close this issue.

2 participants