Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

IPs resolved from *.service.consul URLs seem to be cached #453

Closed
eliw00d opened this issue Aug 5, 2019 · 14 comments · Fixed by #457
Closed

IPs resolved from *.service.consul URLs seem to be cached #453

eliw00d opened this issue Aug 5, 2019 · 14 comments · Fixed by #457
Labels

Comments

@eliw00d
Copy link
Contributor

eliw00d commented Aug 5, 2019

We use *.service.consul URLs so that the IPs can be resolved during runtime to the correct Kafka brokers. However, we started noticing stale IPs whenever KafkaJS tried to connect. If we deleted the pods in k8s and created new ones, the IPs were correct.

I haven't really dug too deep into the source yet to know exactly where this happens, but maybe a configuration option would be nice to disable the caching? Or is there something I am missing?

@Nevon
Copy link
Collaborator

Nevon commented Aug 5, 2019

I haven't done any testing yet, so don't take this as an authoritative answer, but I would suspect that there's something in your environment that is caching the IP.

KafkaJS uses the Net and TLS modules to create sockets, which defaults to using the dns.lookup method to resolve domains. By default, that has no caching mechanism and just delegates to the OS.

@eliw00d
Copy link
Contributor Author

eliw00d commented Aug 5, 2019

So, when we provide

brokers: [process.env.BROKER]

where process.env.BROKER is something like 'kafka-broker.service.consul:9092', we then get error messages like:

Error: connect ETIMEDOUT www.xxx.yyy.zzz:9092\n    at TCPConnectWrap.afterConnect [as oncomplete] (net.js:1106:14)

But, when we remote into the pod and ping 'kafka-broker.service.consul:9092', we get the correct IP. It could very well be something else in our environment, but I'm not sure why what would happen.

@eliw00d
Copy link
Contributor Author

eliw00d commented Aug 5, 2019

When you .connect() using something like 'kafka-broker.service.consul:9092', would it do a lookup at the time of connection and then only ever know about the resolved IPs from that initial connection? We only connect once, when the pod gets deployed.

@Nevon
Copy link
Collaborator

Nevon commented Aug 6, 2019

The lookup would only happen when it tries to establish a connection, yes, so if the broker is replaced and the resolved IP changed, my assumption would be that the existing connection is closed and KafkaJS would try to reconnect, which should mean that a new lookup is done - but this is only an assumption at this point (the lookup is done internally by Net/TLS.Socket, not by us, so I'm not 100% sure).

I think this sounds like a promising line of reasoning though. It would be good to try to get a repro case up. Do you think you could explain a bit more about your setup and maybe share some debug logs from when this is happening?

And just to make sure, what's your consul dns_config? By default, it allows stale reads.

@tulios
Copy link
Owner

tulios commented Aug 6, 2019

@eliw00d it works like this:

  1. when the client connects the first time, it uses the brokers in the list to fetch metadata about the cluster
  2. The metadata includes the ips of all nodes in the cluster, this information is cached and expires according to metadataMaxAge, which by default is 5 minutes. We also re-fetch metadata on some errors
  3. The library will fallback to the original brokers (seed brokers) if it can't use the metadata ips

Like @Nevon said, we use NET and TLS, so we don't cache DNS, it just delegates to the OS.

Have you configured KAFKA_ADVERTISED_HOST_NAME in your cluster? You can try to run your consumers with log-level debug, it should give you more information about what is happening.

@tulios tulios added the question label Aug 6, 2019
@eliw00d
Copy link
Contributor Author

eliw00d commented Aug 6, 2019

I know that we changed the dns_config to not allow stale reads and I'm pretty sure we don't have KAFKA_ADVERTISED_HOST_NAME configured anywhere.

I will try logging to see if that helps narrow anything down, and try to get some more information for you guys.

@eliw00d
Copy link
Contributor Author

eliw00d commented Aug 7, 2019

So, I put some logging in and re-deployed the pod. Then, I deleted the brokers it knew about and tried to do a producer.send(). This resulted in connection timeouts for all of the previously resolved IPs (172.23.1.78, 172.23.1.192, 172.23.1.194) but no attempts to do a new lookup (which would result in 172.23.1.110 and 172.23.1.182). It finally failed with a retries exceeded error. One thing I found interesting is this log:

{"level":"debug","time":1565189319221,"pid":1,"hostname":"my-hostname","name":"kafkajs","namespace":"Connection","broker":"my-kafka-broker.service.consul:9092","clientId":"my-client-id","correlationId":2,"size":459,"data":{"throttleTime":0,"brokers":[{"nodeId":2,"host":"172.23.1.110","port":9092,"rack":"my-rack"},{"nodeId":3,"host":"172.23.1.182","port":9092,"rack":"my-rack"}],"clusterId":"my-cluster-id","controllerId":2,"topicMetadata":[{"topicErrorCode":0,"topic":"my-topic","isInternal":false,"partitionMetadata":[{"partitionErrorCode":0,"partitionId":0,"leader":2,"replicas":[2,3,1],"isr":[3,2],"offlineReplicas":[1]},{"partitionErrorCode":0,"partitionId":2,"leader":2,"replicas":[1,2,3],"isr":[3,2],"offlineReplicas":[1]},{"partitionErrorCode":0,"partitionId":1,"leader":3,"replicas":[3,1,2],"isr":[3,2],"offlineReplicas":[1]}]},{"topicErrorCode":0,"topic":"my-topic","isInternal":false,"partitionMetadata":[{"partitionErrorCode":0,"partitionId":0,"leader":3,"replicas":[3,1,2],"isr":[3,2],"offlineReplicas":[1]},{"partitionErrorCode":0,"partitionId":2,"leader":2,"replicas":[2,3,1],"isr":[3,2],"offlineReplicas":[1]},{"partitionErrorCode":0,"partitionId":1,"leader":2,"replicas":[1,2,3],"isr":[3,2],"offlineReplicas":[1]}]}]},"msg":"Response Metadata(key: 3, version: 5)","v":1}

where you can clearly see the brokers have been updated. So, KafkaJS does have the updated metadata during this time.

It definitely seems like no new lookups were done when producer.send() was called. Would the result be different if I did something like:

await producer.connect()
await producer.send()
await producer.disconnect()

? It already seems to connect/disconnect on its own before/after the call to producer.send(), so this seemed unnecessary.

Here is a snippet of the preceding logs (there were many similar loops):

{"level":"debug","time":1565189318072,"pid":1,"hostname":"my-hostname","name":"kafkajs","namespace":"Connection","broker":"172.23.1.192:9092","clientId":"my-client-id","ssl":false,"sasl":false,"msg":"Connecting","v":1}
{"level":"error","time":1565189319072,"pid":1,"hostname":"my-hostname","name":"kafkajs","namespace":"Connection","broker":"172.23.1.192:9092","clientId":"my-client-id","msg":"Connection timeout","v":1}
{"level":"error","time":1565189319072,"pid":1,"hostname":"my-hostname","name":"kafkajs","namespace":"BrokerPool","retryCount":5,"retryTime":7612,"msg":"Failed to connect to broker, reconnecting","v":1}
{"level":"debug","time":1565189319073,"pid":1,"hostname":"my-hostname","name":"kafkajs","namespace":"Connection","broker":"my-kafka-broker.service.consul:9092","clientId":"my-client-id","ssl":false,"sasl":false,"msg":"Connecting","v":1}
{"level":"debug","time":1565189319220,"pid":1,"hostname":"my-hostname","name":"kafkajs","namespace":"Connection","broker":"my-kafka-broker.service.consul:9092","clientId":"my-client-id","correlationId":2,"expectResponse":true,"size":86,"msg":"Request Metadata(key: 3, version: 5)","v":1}

@eliw00d
Copy link
Contributor Author

eliw00d commented Aug 7, 2019

Okay, so I finally dug through the source and found this. So, if the nodeId matches the previous metadata's nodeId, it returns result without any changes. In our case, we reuse nodeIds, so it never gets a chance to check if the host and port match or assign new values to that broker.

Maybe that could be changed to:

this.brokers = this.metadata.brokers.reduce((result, { nodeId, host, port, rack }) => {
    const existingBroker = result[nodeId]
    if (existingBroker && existingBroker.host === host && existingBroker.port === port && existingBroker.rack === rack) {

or similar ?

@devshawn
Copy link

devshawn commented Aug 7, 2019

@tulios - In response to your points made above:

  1. The metadata includes the ips of all nodes in the cluster, this information is cached and expires according to metadataMaxAge, which by default is 5 minutes. We also re-fetch metadata on some errors

This should likely refetch metadata if a broker connection fails. Otherwise, if a broker IP changes (such as when hosted in Kubernetes), you could have up to 5 minutes of a missing broker (or more).

  1. The library will fallback to the original brokers (seed brokers) if it can't use the metadata ips

This can be dangerous. It should likely fallback to the original bootstrap servers and do another metadata lookup. See https://issues.apache.org/jira/browse/KAFKA-3068 for reasoning from the past.

Broker IPs should not be assumed as near-static, if metadata is outdated the cache should be updated if possible (i.e. broker is up but IP changed), or fail if the broker is down (as expected).

KAFKA_ADVERTISED_HOST_NAME is set on the broker properly -- I'd take a close look at how the Java clients handle metadata caching and broker metadata changes.

@tulios
Copy link
Owner

tulios commented Aug 7, 2019

@devshawn Seed brokers in this case = original bootstrap servers

@eliw00d we can consider this a bug, if nodeId, host and ip are the same we should reuse the broker, otherwise we should replace.

@eliw00d
Copy link
Contributor Author

eliw00d commented Aug 7, 2019

I don't know how often rack would change, if at all, but since it's used here would it be safe to ensure that it is criteria for replacing as well?

@eliw00d
Copy link
Contributor Author

eliw00d commented Aug 7, 2019

What would be a good workaround for this in the interim?

@tulios
Copy link
Owner

tulios commented Aug 8, 2019

@eliw00d we have the pre-release 1.11.0-beta.1 with the fix (PR #457), give it a try.

@eliw00d
Copy link
Contributor Author

eliw00d commented Aug 8, 2019

@tulios
It works! Thank you! With idempotent on it actually times out our request while retrying all the previous IPs, but eventually succeeds in the background. So, we'll just have to figure that out separately. Thanks again!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

Successfully merging a pull request may close this issue.

4 participants