New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Default behavour of TCP client waiting for acks is dangerous? #9

Closed
jonpither opened this Issue Jan 16, 2014 · 15 comments

Comments

Projects
None yet
5 participants
@jonpither

jonpither commented Jan 16, 2014

We had an issue in prod where Riemann had an OOM. The OOM was caused because a rule was added that rolled up all events for a given service throughout the day and sent out one email at midnight.

OK, it was't a great rule, and it wasn't sensible - we've since corrected this. Riemann should be stateless as possible right? Not hanging on to messages in this fashion.

But when Riemann went down, it took down all of our production servers that were talking to it. This is because the other servers were maintaining a TCP connection to Riemann and waiting for acks back on each message sent. When Riemann blew up and stopped ackking, the threads on our prod servers ground to a halt (as shown in a thread-dump).

It one sense this is totally our fault as we didn't question what method of talking to Riemann was right for our use case. But I would also raise the question of whether it's correct for this client library to adopt TCP and do acks by default? Shouldn't for safety's sake a Riemann client be fire and forget by default?

@aphyr

This comment has been minimized.

Show comment
Hide comment
@aphyr

aphyr Jan 16, 2014

Collaborator

Out of curiosity, when you read the docs for throttle, was it obvious that it would allow unbounded memory use? The talk and howto (http://riemann.io/howto.html#roll-up-and-throttle-events) are pretty explicit about this concern and when/how to address it, but if you didn't see a warning, maybe I should add extra docs somewhere else.

As for riemann-clojure-client, as with all network clients and distributed systems in general, providing backpressure by default is, in my opinion, the correct move: it prevents Riemann from getting overloaded by misbehaving clients, prevents problems like "where the heck is my data", subtle undercounting, etc. That's why you should either:

  1. Use an IO threadpool if you intend network ops to be asynchronous, just like with any other network client,
  2. Use the ack parameter to send-event to control whether you want to wait for acknowledgements, or
  3. Use async-send-event, async-send-events, etc to return a deref-able future immediately (which, of course, costs memory).

All of these approaches exist to let you tune the latency/harvest/memory tradeoffs progressively, but each has its failure modes. If you use an IO threadpool, you've got to worry about the queue in front of it, which could OOM your process. If you send with acks, you could block the local thread. If you send without acks, you could lose arbitrary amounts of data and never know. If you use the async variants, you could consume arbitrary memory in the async FSM. Failing to consider the backpressure dynamics carefully could overload either Riemann or your process, but it's hard to make that call in advance for all users. :-/

Collaborator

aphyr commented Jan 16, 2014

Out of curiosity, when you read the docs for throttle, was it obvious that it would allow unbounded memory use? The talk and howto (http://riemann.io/howto.html#roll-up-and-throttle-events) are pretty explicit about this concern and when/how to address it, but if you didn't see a warning, maybe I should add extra docs somewhere else.

As for riemann-clojure-client, as with all network clients and distributed systems in general, providing backpressure by default is, in my opinion, the correct move: it prevents Riemann from getting overloaded by misbehaving clients, prevents problems like "where the heck is my data", subtle undercounting, etc. That's why you should either:

  1. Use an IO threadpool if you intend network ops to be asynchronous, just like with any other network client,
  2. Use the ack parameter to send-event to control whether you want to wait for acknowledgements, or
  3. Use async-send-event, async-send-events, etc to return a deref-able future immediately (which, of course, costs memory).

All of these approaches exist to let you tune the latency/harvest/memory tradeoffs progressively, but each has its failure modes. If you use an IO threadpool, you've got to worry about the queue in front of it, which could OOM your process. If you send with acks, you could block the local thread. If you send without acks, you could lose arbitrary amounts of data and never know. If you use the async variants, you could consume arbitrary memory in the async FSM. Failing to consider the backpressure dynamics carefully could overload either Riemann or your process, but it's hard to make that call in advance for all users. :-/

@jonpither

This comment has been minimized.

Show comment
Hide comment
@jonpither

jonpither Jan 17, 2014

Thanks for the reply.

We've had Riemann running successfully in prod for a while, configured by devs who have read the docs. Therefore it's not that the docs weren't clear, and I actually think the docs are very clear around the recommended use of throttle, and we have followed this guidance for the most part.

What's happened is that now after some time there are a few teams using a Riemann instance, and with any decent sized amount of people you'll get the odd person who will dive in without reading up first, and so mistakes will happen. In this case a long running roll-up was added without a throttle.

This issue is about how a Riemann client setup in the default manner can put the potential client at risk, if something goes wrong with the Riemann server.

We've since switched to UDP and we're setting the ack parameter to true. After reading your reply though and thinking on it, I think we should get back to using TCP, and go for the IO threadpool option with a bounded queue. We can then add some behavour so that when a thread fails to add to the queue because it's full, we can then alert in a different non-Riemann manner.

So I can understand why UDP and not-acking may not be desirable as the default, because users of Riemann wouldn't easily spot events being lost and the server being overwhelmed.

But then, given the way the Clojure/Java client is shipped now, it does put users at risk in a different, potentially more serious way. In our case an entire cluster of clients struggled.

Do you think there's an option to make the IO threadpool option part of the Riemann client itself, perhaps with the ability to configure it with an additional hook to execute when the queue is full?

jonpither commented Jan 17, 2014

Thanks for the reply.

We've had Riemann running successfully in prod for a while, configured by devs who have read the docs. Therefore it's not that the docs weren't clear, and I actually think the docs are very clear around the recommended use of throttle, and we have followed this guidance for the most part.

What's happened is that now after some time there are a few teams using a Riemann instance, and with any decent sized amount of people you'll get the odd person who will dive in without reading up first, and so mistakes will happen. In this case a long running roll-up was added without a throttle.

This issue is about how a Riemann client setup in the default manner can put the potential client at risk, if something goes wrong with the Riemann server.

We've since switched to UDP and we're setting the ack parameter to true. After reading your reply though and thinking on it, I think we should get back to using TCP, and go for the IO threadpool option with a bounded queue. We can then add some behavour so that when a thread fails to add to the queue because it's full, we can then alert in a different non-Riemann manner.

So I can understand why UDP and not-acking may not be desirable as the default, because users of Riemann wouldn't easily spot events being lost and the server being overwhelmed.

But then, given the way the Clojure/Java client is shipped now, it does put users at risk in a different, potentially more serious way. In our case an entire cluster of clients struggled.

Do you think there's an option to make the IO threadpool option part of the Riemann client itself, perhaps with the ability to configure it with an additional hook to execute when the queue is full?

@aphyr

This comment has been minimized.

Show comment
Hide comment
@aphyr

aphyr Jan 17, 2014

Collaborator

Be advised that UDP with ack true is meaningless; UDP doesn't actually support acks. I think the client should actually throw in those cases but maybe not, haha. Could be a bug. ;-)

In general, I do not advise sending inline with Riemann. Calls to send-event, just like any other network operation, can and will throw, interrupting your application logic. Pretty much every system I build has a monitoring thread or threadpool, and I do the sampling and IO in there. Application designs tend to vary though (I suggested four common patterns in the previous comment) so I've tried to avoid prescribing any particular pattern in the Riemann client. Instead, I've tried to make it easy for you to build and tune the appropriate strategy yourself--that's why there's blocking and asynchronous variants of each call, plus tunable behavior around the FSM for sequencing responses.

In Clojure, IO threadpools are pretty easy. (riemann/send-async client {...}) does it already. (send-off client riemann/send-event {...}) will also get you 80% of the way there, if you want to use the synchronous variant and log responses. You can dump the async responses into a future or agent for delayed error handling as well. For a full bounded-queue/elastic-bounded-threadpool implementation, you can build one out of a ThreadPoolExecutor and LBQ in about ten lines of code, most of which is tuning parameters. Since those parameters tend to be strongly application-dependent, I've avoided choosing them for you.

The fact is that this is a network client, and just like every other network client its calls can and will fail, and I can't hide that from you. Heck, I may have done too much hiding in the client already--there's a complex series of queues and IO threadpools sitting inside the existing client which have, so far, not exploded and broken your app--but they certainly could. You haven't noticed because I and the Netty authors did our jobs reasonably well, and because your use case and failure modes haven't left the operational envelope of that system. But there definitely are failure modes in which you'll become suddenly and unhappily aware of the existence of that threadpool, those queues, and those memory semantics.

I've tried to choose the balance of performance, failure semantics, and memory pressure which will work for most users in the most intuitive way. I'm really sorry that didn't align with your assumptions, but given the feedback over the past couple years from the user base, I think that choosing unacknowledged asynchronous IO by default would confuse more people--originally, Riemann-Java-Client was asynchronous, and user demand pushed it towards the current semantics. The asynchronous functions are already built for you, and clearly named.

I don't think I'm going to change the default behavior of the client, but I'll keep this in mind if we start getting additional complaints.

Collaborator

aphyr commented Jan 17, 2014

Be advised that UDP with ack true is meaningless; UDP doesn't actually support acks. I think the client should actually throw in those cases but maybe not, haha. Could be a bug. ;-)

In general, I do not advise sending inline with Riemann. Calls to send-event, just like any other network operation, can and will throw, interrupting your application logic. Pretty much every system I build has a monitoring thread or threadpool, and I do the sampling and IO in there. Application designs tend to vary though (I suggested four common patterns in the previous comment) so I've tried to avoid prescribing any particular pattern in the Riemann client. Instead, I've tried to make it easy for you to build and tune the appropriate strategy yourself--that's why there's blocking and asynchronous variants of each call, plus tunable behavior around the FSM for sequencing responses.

In Clojure, IO threadpools are pretty easy. (riemann/send-async client {...}) does it already. (send-off client riemann/send-event {...}) will also get you 80% of the way there, if you want to use the synchronous variant and log responses. You can dump the async responses into a future or agent for delayed error handling as well. For a full bounded-queue/elastic-bounded-threadpool implementation, you can build one out of a ThreadPoolExecutor and LBQ in about ten lines of code, most of which is tuning parameters. Since those parameters tend to be strongly application-dependent, I've avoided choosing them for you.

The fact is that this is a network client, and just like every other network client its calls can and will fail, and I can't hide that from you. Heck, I may have done too much hiding in the client already--there's a complex series of queues and IO threadpools sitting inside the existing client which have, so far, not exploded and broken your app--but they certainly could. You haven't noticed because I and the Netty authors did our jobs reasonably well, and because your use case and failure modes haven't left the operational envelope of that system. But there definitely are failure modes in which you'll become suddenly and unhappily aware of the existence of that threadpool, those queues, and those memory semantics.

I've tried to choose the balance of performance, failure semantics, and memory pressure which will work for most users in the most intuitive way. I'm really sorry that didn't align with your assumptions, but given the feedback over the past couple years from the user base, I think that choosing unacknowledged asynchronous IO by default would confuse more people--originally, Riemann-Java-Client was asynchronous, and user demand pushed it towards the current semantics. The asynchronous functions are already built for you, and clearly named.

I don't think I'm going to change the default behavior of the client, but I'll keep this in mind if we start getting additional complaints.

@aterreno

This comment has been minimized.

Show comment
Hide comment
@aterreno

aterreno Jan 21, 2014

Hi @aphyr , just commenting since I am working with @jonpither.

I've tried to dig into the problem a little further, I did notice that with the UDP client the channel closed exception get swallowed (not in your code, fair enough, it's fire and forget).

We can't use that transport at all, since we are storing our client in an atom and there is no way to understand if the connection is available. The typical test case is restart riemann server and our code won't reconnect since it can't sense that the server is gone.

Code looks like this:

(try
    (when (client system)
      (println "****SENDING")
      (r/send-event (client system) event false)
      (println "****SENT"))
    (catch java.lang.Throwable e
      (println (format "CATCHED %s" e))))

And will always print SENDING / SENT with the stack trace from netty in the middle.

java.nio.channels.ClosedChannelException
  at org.jboss.netty.channel.socket.nio.NioDatagramWorker.cleanUpWriteBuffer(NioDatagramWorker.java:729)
  at org.jboss.netty.channel.socket.nio.NioDatagramWorker.writeFromUserCode(NioDatagramWorker.java:444)
  at org.jboss.netty.channel.socket.nio.NioDatagramPipelineSink.eventSunk(NioDatagramPipelineSink.java:112)
  at org.jboss.netty.channel.Channels.write(Channels.java:632)
  at org.jboss.netty.handler.codec.oneone.OneToOneEncoder.handleDownstream(OneToOneEncoder.java:70)
  at org.jboss.netty.channel.Channels.write(Channels.java:611)
  at org.jboss.netty.channel.Channels.write(Channels.java:578)
  at org.jboss.netty.channel.AbstractChannel.write(AbstractChannel.java:259)
  at com.aphyr.riemann.client.UdpTransport.sendMaybeRecvMessage(UdpTransport.java:162)

Etc..

Taking your suggesting now of LBQ or maybe using a core.async channel to send the riemann events.

Thanks a lot for your help,
toni

aterreno commented Jan 21, 2014

Hi @aphyr , just commenting since I am working with @jonpither.

I've tried to dig into the problem a little further, I did notice that with the UDP client the channel closed exception get swallowed (not in your code, fair enough, it's fire and forget).

We can't use that transport at all, since we are storing our client in an atom and there is no way to understand if the connection is available. The typical test case is restart riemann server and our code won't reconnect since it can't sense that the server is gone.

Code looks like this:

(try
    (when (client system)
      (println "****SENDING")
      (r/send-event (client system) event false)
      (println "****SENT"))
    (catch java.lang.Throwable e
      (println (format "CATCHED %s" e))))

And will always print SENDING / SENT with the stack trace from netty in the middle.

java.nio.channels.ClosedChannelException
  at org.jboss.netty.channel.socket.nio.NioDatagramWorker.cleanUpWriteBuffer(NioDatagramWorker.java:729)
  at org.jboss.netty.channel.socket.nio.NioDatagramWorker.writeFromUserCode(NioDatagramWorker.java:444)
  at org.jboss.netty.channel.socket.nio.NioDatagramPipelineSink.eventSunk(NioDatagramPipelineSink.java:112)
  at org.jboss.netty.channel.Channels.write(Channels.java:632)
  at org.jboss.netty.handler.codec.oneone.OneToOneEncoder.handleDownstream(OneToOneEncoder.java:70)
  at org.jboss.netty.channel.Channels.write(Channels.java:611)
  at org.jboss.netty.channel.Channels.write(Channels.java:578)
  at org.jboss.netty.channel.AbstractChannel.write(AbstractChannel.java:259)
  at com.aphyr.riemann.client.UdpTransport.sendMaybeRecvMessage(UdpTransport.java:162)

Etc..

Taking your suggesting now of LBQ or maybe using a core.async channel to send the riemann events.

Thanks a lot for your help,
toni

@jonpither

This comment has been minimized.

Show comment
Hide comment
@jonpither

jonpither Jan 21, 2014

Hi @aphyr

Just want to say thanks for your response. We're looking at building something around TCP.

I was incorrect in my earlier post when said we are now using UDP with the ack param set to true. As you stated this doesn't make sense and we're passing false through.

Jon.

jonpither commented Jan 21, 2014

Hi @aphyr

Just want to say thanks for your response. We're looking at building something around TCP.

I was incorrect in my earlier post when said we are now using UDP with the ack param set to true. As you stated this doesn't make sense and we're passing false through.

Jon.

@aphyr

This comment has been minimized.

Show comment
Hide comment
@aphyr

aphyr Jan 21, 2014

Collaborator

Oy. @tazle was working on the riemann-java-client UDP server as well, and had a whole series of problems with reconnection and error logic; he was the last person to give that code a redesign pass so he might have ideas. Netty is complicated, haha. If you figure something out, I'll gladly take PRs. Sorry bout all this. :(

Collaborator

aphyr commented Jan 21, 2014

Oy. @tazle was working on the riemann-java-client UDP server as well, and had a whole series of problems with reconnection and error logic; he was the last person to give that code a redesign pass so he might have ideas. Netty is complicated, haha. If you figure something out, I'll gladly take PRs. Sorry bout all this. :(

@aterreno

This comment has been minimized.

Show comment
Hide comment
@aterreno

aterreno Jan 21, 2014

Just in case somebody else will ever have the same problem, here's the way we solved it:

(ns clj-components.utils.bounded-executor
  "See: https://github.com/aphyr/riemann-clojure-client/issues/9#issuecomment-32624706 to understand what's going on here"
  (:import (java.util.concurrent ThreadPoolExecutor TimeUnit LinkedBlockingQueue RejectedExecutionHandler)))

(def reject-handler
  "Handles a rejection on the bounded executor. i.e. when the LBQ is full."
  (proxy [RejectedExecutionHandler] []
    (rejectedExecution [runnable executor])))

(def bounded-executor
  "Bounded Execution, current settings are calcuated thinking on the current volumes of Riemann In Production"
  (let [cores (.availableProcessors (Runtime/getRuntime))]
    (ThreadPoolExecutor. 1 cores 5 TimeUnit/SECONDS (LinkedBlockingQueue. 250) reject-handler)))

(defn run-bounded [f]
  "Exectutes f in a bounded executor"
  (let [executor bounded-executor]
    (.execute executor (Thread. f))))

Thanks a lot @aphyr for your time & effort. Really helpful.

aterreno commented Jan 21, 2014

Just in case somebody else will ever have the same problem, here's the way we solved it:

(ns clj-components.utils.bounded-executor
  "See: https://github.com/aphyr/riemann-clojure-client/issues/9#issuecomment-32624706 to understand what's going on here"
  (:import (java.util.concurrent ThreadPoolExecutor TimeUnit LinkedBlockingQueue RejectedExecutionHandler)))

(def reject-handler
  "Handles a rejection on the bounded executor. i.e. when the LBQ is full."
  (proxy [RejectedExecutionHandler] []
    (rejectedExecution [runnable executor])))

(def bounded-executor
  "Bounded Execution, current settings are calcuated thinking on the current volumes of Riemann In Production"
  (let [cores (.availableProcessors (Runtime/getRuntime))]
    (ThreadPoolExecutor. 1 cores 5 TimeUnit/SECONDS (LinkedBlockingQueue. 250) reject-handler)))

(defn run-bounded [f]
  "Exectutes f in a bounded executor"
  (let [executor bounded-executor]
    (.execute executor (Thread. f))))

Thanks a lot @aphyr for your time & effort. Really helpful.

@aphyr

This comment has been minimized.

Show comment
Hide comment
@aphyr

aphyr Jan 21, 2014

Collaborator

This is definitely one of those "you should do this, but there's no clear guide how to do it" things about Riemann (and distributed systems IO in general). Thanks for writing up your approach; if it's OK with you I'll ensconce it in the riemann.io howto. :)

Collaborator

aphyr commented Jan 21, 2014

This is definitely one of those "you should do this, but there's no clear guide how to do it" things about Riemann (and distributed systems IO in general). Thanks for writing up your approach; if it's OK with you I'll ensconce it in the riemann.io howto. :)

@tazle

This comment has been minimized.

Show comment
Hide comment
@tazle

tazle Jan 21, 2014

Hmm, was the ClosedChannelException issue with the Clojure UDP client? I've only used the Java UDP client directly.

tazle commented Jan 21, 2014

Hmm, was the ClosedChannelException issue with the Clojure UDP client? I've only used the Java UDP client directly.

@aphyr

This comment has been minimized.

Show comment
Hide comment
@aphyr

aphyr Jan 21, 2014

Collaborator

Clojure UDP client just forwards to the Java UDP client. I'm a little surprised that it closed in the first place, given it's connectionless, but I'm definitely not spooled up on that code. O_o

Collaborator

aphyr commented Jan 21, 2014

Clojure UDP client just forwards to the Java UDP client. I'm a little surprised that it closed in the first place, given it's connectionless, but I'm definitely not spooled up on that code. O_o

@tazle

This comment has been minimized.

Show comment
Hide comment
@tazle

tazle Jan 21, 2014

Right, the problem is that sendEvents in the Java client silently swallows exceptions. What I did to work around this was I wrote SimpleUdpTransport, which doesn't generate the exceptions in the first place. It uses plain UDP sockets instead of Netty.

tazle commented Jan 21, 2014

Right, the problem is that sendEvents in the Java client silently swallows exceptions. What I did to work around this was I wrote SimpleUdpTransport, which doesn't generate the exceptions in the first place. It uses plain UDP sockets instead of Netty.

@tazle

This comment has been minimized.

Show comment
Hide comment
@tazle

tazle Jan 21, 2014

The Clojure client seems to use the Netty-based UDP transport. Netty and UDP don't mix very well. You'd have to handle the exception in question in sendMaybeRecvMessage of UdpTransport, and figure out how to resurrect the channel so that it would be useful again. Or to create a new channel. Neither is appealing.

Looking at the transport code, I did add a way of reacting to the exception in the code that uses the client. If you supply an ExceptionReporter to the UdpTransport/TcpTransport, it'll get callbacks with any exceptions encountered on the Netty Channel. The default implementation prints the stack trace to stderr, since that's what the original code did before I added the reporter mechanism.

Of course it seems that you can't insert a reporter through the Clojure client.

tazle commented Jan 21, 2014

The Clojure client seems to use the Netty-based UDP transport. Netty and UDP don't mix very well. You'd have to handle the exception in question in sendMaybeRecvMessage of UdpTransport, and figure out how to resurrect the channel so that it would be useful again. Or to create a new channel. Neither is appealing.

Looking at the transport code, I did add a way of reacting to the exception in the code that uses the client. If you supply an ExceptionReporter to the UdpTransport/TcpTransport, it'll get callbacks with any exceptions encountered on the Netty Channel. The default implementation prints the stack trace to stderr, since that's what the original code did before I added the reporter mechanism.

Of course it seems that you can't insert a reporter through the Clojure client.

@aphyr

This comment has been minimized.

Show comment
Hide comment
@aphyr

aphyr Jan 21, 2014

Collaborator

(Clojure client is a Java client, so you can just use the java interop methods on it.)

Collaborator

aphyr commented Jan 21, 2014

(Clojure client is a Java client, so you can just use the java interop methods on it.)

@tazle

This comment has been minimized.

Show comment
Hide comment
@tazle

tazle Jan 21, 2014

Right. You'd have to implement the equivalent of udp-client to supply additional parameters to the transport. Or to use the simple transport. I think you can reach the actual transport using public references too, starting with a RiemannClient, but that requires some pretty ugly casting.

tazle commented Jan 21, 2014

Right. You'd have to implement the equivalent of udp-client to supply additional parameters to the transport. Or to use the simple transport. I think you can reach the actual transport using public references too, starting with a RiemannClient, but that requires some pretty ugly casting.

@aterreno

This comment has been minimized.

Show comment
Hide comment
@aterreno

aterreno Jan 22, 2014

@aphyr yeah, sure go ahead and write up the approach on the howto ;)

Glad that the conversation is going.. I now understand from @tazle comments why the clojure udp client is having that strange behaviour...

aterreno commented Jan 22, 2014

@aphyr yeah, sure go ahead and write up the approach on the howto ;)

Glad that the conversation is going.. I now understand from @tazle comments why the clojure udp client is having that strange behaviour...

@pyr pyr self-assigned this Feb 9, 2016

@pyr pyr added the question label Feb 9, 2016

@pyr pyr closed this Feb 9, 2016

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment