New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
How to do application-level ping timeout properly? #159
Comments
The extracted code (with ClassyPrelude as implicit Prelude): import qualified Control.Exception as Unsafe
import qualified Data.Aeson as J
import GHC.Conc (myThreadId)
import qualified Network.Wai as W
import qualified Network.Wai.Handler.WebSockets as W
import qualified Network.WebSockets as W
import qualified Servant.Server.Internal.ServantErr as W (responseServantErr)
data KillWaiThread =
KillWaiThread
deriving (Show)
instance Exception KillWaiThread
-- |The whole point of this module is to have application-level pings for greater control in application. I.e. to solve <https://github.com/jaspervdj/websockets/issues/159>.
pingingWsApp :: (W.Connection -> IO ()) -> W.Application
pingingWsApp action __unused1 __unused2 = do
onTcpPong <- newTVarIO (pure ())
W.websocketsOr
W.defaultConnectionOptions
{ W.connectionOnPong = join (readTVarIO onTcpPong)
}
(wsApp onTcpPong)
backup
__unused1
__unused2
where
backup _ respond' = respond' . W.responseServantErr $ toServantErr NotAWebSocket
-----------------------------------------------
wsApp onTcpPong pendingConn =
ignoreKillWaiThread . ignoreClosedTCP $ do
conn <- W.acceptRequest pendingConn
waiThreadId <- myThreadId
pingTimeoutKiller :: TVar (Maybe (Async ())) <- newTVarIO Nothing
flip finally (traverse_ cancel =<< readTVarIO pingTimeoutKiller) $ do
atomically . writeTVar onTcpPong $ do
newKiller <-
async $ do
threadDelay' (2 * connectionTimeout)
sendDisconnect conn ("Ping timeout." :: Text)
Unsafe.throwTo waiThreadId KillWaiThread -- this is ugly, but what else can we do w/ the API provided by 'Network.WebSockets'? / Unsafe.throw, because we don’t want it wrapped — we’ll catch it later in 'ignoreKillWaiThread'.
previousKiller <- atomically $ swapTVar pingTimeoutKiller (Just newKiller)
cancel `traverse_` previousKiller
W.forkPingThread conn (round connectionTimeout)
action conn
-----------------------------------------------
ignoreClosedTCP = handle (\(_ :: W.ConnectionException) -> pure ()) -- don’t log them, no point
-----------------------------------------------
ignoreKillWaiThread = Unsafe.handle (\(_ :: KillWaiThread) -> pure ()) -- don’t log them, no point
-----------------------------------------------
sendDisconnect conn why = do
sendJson conn . J.object $ ["tag" J..= ("Error" :: Text), "description" J..= why]
W.sendClose conn ("{}" :: ByteString) |
When adapting ↑, it’s important to get masked and unmasked exceptions right here… |
Also because of the API of |
@michalrus thanks for the code above! Do you happen to know if any progress has been made since this was posted, or is your solution still the best way to do this? |
It looks solid -- I agree that the API is not ideal, and I'm happy to change parts of it if there's a way that offers existing users a clean migration path. I think it would be a bit cleaner to have a single killer thread that shares an reference to a timeout, and every pong that is received prolongs this timeout -- that way it's not necessary to cancel and restart the killer threads. That is the way it's done in the snap backend of websockets, but of course snap also provides a different API than warp. |
@jaspervdj thanks! Question, is there some reason why the ping thread itself doesn't wait for pongs and cause the connection be closed when a pong doesn't return for a long time? This seems like a natural job for the ping thread (in addition to keeping the connection alive through proxies and stuff). That was my expectation anyway for how the ping thread would behave--it was an unpleasant discovery that this library doesn't actually have a mechanism to detect disconnects by itself and that you have to build something crazy out of TVars like the above. I think it would cause less astonishment for users to let the ping thread do this out of the box (with an option to turn it on and off). I'm currently working on another way to manage pongs using a |
Okay, I have a couple thoughts about the current API (particularly when writing a server):
Suggestion: what if you provide a way to set the |
Hmm, this is a bit surprising to me. Once that is fixed, I think it should be doable to build an application-level ping timeout on top of |
Maybe I'm misunderstanding, but the FWIW, having just been through the process of writing my own ping/pong system, I don't think runApp pending = do
conn <- WS.acceptRequest pending
withPinger conn $ \pongChan -> do
-- Run main application loop
forever $ WS.receive conn >>= \case
WS.ControlMessage (WS.Pong _) -> writeChan pongChan ()
-- ... other cases, including DataMessage withPinger conn action = do
pongChan <- newChan
mainAsync <- async $ action pongChan
pingerAsync <- async $ runPinger conn pongChan
waitEitherCatch mainAsync pingerAsync >>= \case
-- If the application async died for any reason, kill the pinger async
Left _ -> cancel pingerAsync
-- The pinger thread should never throw an exception. If it does, kill the app thread
Right (Left err) -> cancel mainAsync
-- The pinger thread exited due to a pong timeout. Tell the app thread about it.
Right (Right ()) -> cancelWith mainAsync PongTimeout
runPinger conn pongChan = fix $ \loop -> do
WS.sendPing conn (mempty :: B.ByteString)
threadDelay pingWaitTime
-- See if we got a pong in that time
timeout 1000000 (readChan pongChan) >>= \case
Just () -> loop
Nothing -> return () |
@thomasjm That makes sense. I think |
@jaspervdj Please let me know if there are specific parts of this that my team could help with. We're using the library pretty heavily in production and have run into quite a few issues with connection state. (Possible we could sponsor some of the work if that would help.) FWIW we're just using |
Hi @nbouscal -- sorry for dragging this out. I am currently vacationing in Mexico so I don't have time to look at this properly, but I did have some downtime on a bus this morning, so I put together #199 with a draft of what I think I want it to look like. Note that because of the circumstances, I have only checked that this compiles; I haven't tried to run it at all. Could you help me out by reviewing and sense-checking that PR? Thanks for your patience! |
No worries, hope you’re enjoying your vacation! I’ll take a look this weekend. Thanks! |
There's a similar implementation out there at https://github.com/digitallyinduced/ihp/blob/dbb4ec64fe7b460fd80041e0b7a2867d90529d78/IHP/WebSocket.hs#L124 |
I've written an implementation that reuses |
There is no clean or built in way to do this, see jaspervdj/websockets#159. This implementation works by keeping track of the last received pong, and then checking whether the previous ping has been answered with a pong when sending a new ping. If that isn't the case, then the connection is terminated early through the use of `withInteruptablePingThread`.
Looking at the current implementation of ping pong handling, I suggest the following changes:
|
I've found a way to implement ping-pong generically for any connection and it also simplifies the code! Please take a look at #239 |
I have a working solution with the current API, but it feels sooooo hacky. 😺
connectionOnPong
doesn’t get itsConnection
(why?), we’re bindingonPong <- newTVarIO (pure ())
.connectionOnPong
, we’re passingjoin (readTVarIO onPong)
.Connection
, we’re putting an action into thisonPong
, which:Async
, which:threadDelay
s(2 * tcpTimeout)
.throwTo waiThread PingTimeout
(this is awful).Async
.finally
, that cancels thatAsync
, as well, so that after clean disconnection we don’t run the code meant for ping timeout.This way, on average after
1.5 * tcpTimeout
(*), a dropped connection is detected and killed, and the whole system can learn about it, run appropriate clean up code etc.(*) Even if we were scheduling this killer
Async
right after sending a Ping (which seems more in line with the naming (ping timeout), it’d still happen after1.5 * tcpTimeout
, because on average, the Ping message would be sent0.5 * tcpTimeout
after the connection got dropped.When we don’t do that, dropping the connection on
iptables
does absolutely nothing, it just stays there as open, even after Warp’ssetTimeout
timeout passes! I don’t understand why, though.How to do this properly? Because my way surely doesn’t feel appropriate (although it works well).
The text was updated successfully, but these errors were encountered: