Skip to content

Commit

Permalink
Add thread-safety and forkPingThread
Browse files Browse the repository at this point in the history
  • Loading branch information
jaspervdj committed Dec 5, 2014
1 parent 616dc2c commit 536849d
Show file tree
Hide file tree
Showing 7 changed files with 95 additions and 20 deletions.
5 changes: 5 additions & 0 deletions CHANGELOG
Original file line number Diff line number Diff line change
@@ -1,3 +1,8 @@
- 0.9.2.0
* Make sending and receiving messages thread-safe by default
* Export `forkPingThread`
* Fix Windows `withSocketsDo` issue

- 0.9.1.0
* Don't use Network.ByteString.Lazy.sendAll on Windows

Expand Down
4 changes: 4 additions & 0 deletions example/server.lhs
Original file line number Diff line number Diff line change
Expand Up @@ -84,8 +84,12 @@ Our application starts by accepting the connection. In a more realistic
application, you probably want to check the path and headers provided by the
pending request.

We also fork a pinging thread in the background. This will ensure the connection
stays alive on some browsers.

> application state pending = do
> conn <- WS.acceptRequest pending
> WS.forkPingThread conn 30

When a client is succesfully connected, we read the first message. This should
be in the format of "Hi, I am Jasper", where Jasper is the requested username.
Expand Down
4 changes: 3 additions & 1 deletion src/Network/WebSockets.hs
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,6 @@ module Network.WebSockets
, HandshakeException (..)
, ConnectionException (..)


-- * Running a standalone server
, ServerApp
, runServer
Expand All @@ -61,6 +60,9 @@ module Network.WebSockets
, runClientWith
, runClientWithSocket
, runClientWithStream

-- * Utilities
, forkPingThread
) where


Expand Down
10 changes: 7 additions & 3 deletions src/Network/WebSockets/Client.hs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ module Network.WebSockets.Client

--------------------------------------------------------------------------------
import qualified Blaze.ByteString.Builder as Builder
import Control.Concurrent.MVar (newMVar)
import Control.Exception (finally, throw)
import Data.IORef (newIORef)
import qualified Data.Text as T
Expand Down Expand Up @@ -99,13 +100,16 @@ runClientWithStream stream host path opts customHeaders app = do
Response _ _ <- return $ finishResponse protocol request response
parse <- decodeMessages protocol stream
write <- encodeMessages protocol ClientConnection stream
sentRef <- newIORef False

parseState <- newMVar (Available parse)
writeState <- newMVar (Available write)
sentRef <- newIORef False
app Connection
{ connectionOptions = opts
, connectionType = ClientConnection
, connectionProtocol = protocol
, connectionParse = parse
, connectionWrite = write
, connectionParse = parseState
, connectionWrite = writeState
, connectionSentClose = sentRef
}
where
Expand Down
82 changes: 69 additions & 13 deletions src/Network/WebSockets/Connection.hs
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
--------------------------------------------------------------------------------
-- | This module exposes connection internals and should only be used if you
-- really know what you are doing.
{-# LANGUAGE OverloadedStrings #-}
module Network.WebSockets.Connection
( PendingConnection (..)
Expand All @@ -7,6 +9,7 @@ module Network.WebSockets.Connection
, acceptRequestWith
, rejectRequest

, Available (..)
, Connection (..)

, ConnectionOptions (..)
Expand All @@ -22,17 +25,23 @@ module Network.WebSockets.Connection
, sendClose
, sendCloseCode
, sendPing

, forkPingThread
) where


--------------------------------------------------------------------------------
import qualified Blaze.ByteString.Builder as Builder
import Control.Exception (throw)
import Control.Concurrent (forkIO, threadDelay)
import Control.Concurrent.MVar (MVar, newMVar, putMVar, takeMVar)
import Control.Exception (AsyncException, fromException,
handle, onException, throw)
import Control.Monad (unless)
import qualified Data.ByteString as B
import Data.IORef (IORef, newIORef, readIORef,
writeIORef)
import Data.List (find)
import qualified Data.Text as T
import Data.Word (Word16)


Expand Down Expand Up @@ -91,15 +100,18 @@ acceptRequestWith pc ar = case find (flip compatible request) protocols of
let subproto = maybe [] (\p -> [("Sec-WebSocket-Protocol", p)]) $ acceptSubprotocol ar
response = finishRequest protocol request subproto
sendResponse pc response
parse <- decodeMessages protocol (pendingStream pc)
write <- encodeMessages protocol ServerConnection (pendingStream pc)
sentRef <- newIORef False
parse <- decodeMessages protocol (pendingStream pc)
write <- encodeMessages protocol ServerConnection (pendingStream pc)

parseState <- newMVar (Available parse)
writeState <- newMVar (Available write)
sentRef <- newIORef False
let connection = Connection
{ connectionOptions = pendingOptions pc
, connectionType = ServerConnection
, connectionProtocol = protocol
, connectionParse = parse
, connectionWrite = write
, connectionParse = parseState
, connectionWrite = writeState
, connectionSentClose = sentRef
}

Expand All @@ -116,13 +128,20 @@ rejectRequest :: PendingConnection -> B.ByteString -> IO ()
rejectRequest pc message = sendResponse pc $ response400 [] message


--------------------------------------------------------------------------------
-- | Type protecting the 'connectionParse' and 'connectionWrite' IO actions. By
-- using this inside an 'MVar' we can protect from concurrent writes/reads, and
-- writes/reads to closed channels.
data Available a = Available !a | Unavailable


--------------------------------------------------------------------------------
data Connection = Connection
{ connectionOptions :: !ConnectionOptions
, connectionType :: !ConnectionType
, connectionProtocol :: !Protocol
, connectionParse :: !(IO (Maybe Message))
, connectionWrite :: !(Message -> IO ())
, connectionParse :: !(MVar (Available (IO (Maybe Message))))
, connectionWrite :: !(MVar (Available (Message -> IO ())))
, connectionSentClose :: !(IORef Bool)
-- ^ According to the RFC, both the client and the server MUST send
-- a close control message to each other. Either party can initiate
Expand All @@ -133,8 +152,11 @@ data Connection = Connection


--------------------------------------------------------------------------------
-- | Set options for a 'Connection'.
data ConnectionOptions = ConnectionOptions
{ connectionOnPong :: !(IO ())
-- ^ Whenever a 'pong' is received, this IO action is executed. It can be
-- used to tickle connections or fire missiles.
}


Expand All @@ -148,10 +170,16 @@ defaultConnectionOptions = ConnectionOptions
--------------------------------------------------------------------------------
receive :: Connection -> IO Message
receive conn = do
mbMsg <- connectionParse conn
case mbMsg of
Nothing -> throw ConnectionClosed
Just msg -> return msg
state <- takeMVar m
case state of
Unavailable -> putMVar m Unavailable >> throw ConnectionClosed
Available parse -> do
mbMsg <- parse `onException` putMVar m Unavailable
case mbMsg of
Nothing -> putMVar m Unavailable >> throw ConnectionClosed
Just msg -> putMVar m (Available parse) >> return msg
where
m = connectionParse conn


--------------------------------------------------------------------------------
Expand Down Expand Up @@ -196,10 +224,17 @@ receiveData conn = do
--------------------------------------------------------------------------------
send :: Connection -> Message -> IO ()
send conn msg = do
state <- takeMVar m
case msg of
(ControlMessage (Close _ _)) -> writeIORef (connectionSentClose conn) True
_ -> return ()
connectionWrite conn msg
case state of
Unavailable -> putMVar m Unavailable >> throw ConnectionClosed
Available write -> do
write msg `onException` putMVar m Unavailable
putMVar m (Available write)
where
m = connectionWrite conn


--------------------------------------------------------------------------------
Expand Down Expand Up @@ -247,3 +282,24 @@ sendCloseCode conn code =
-- | Send a ping
sendPing :: WebSocketsData a => Connection -> a -> IO ()
sendPing conn = send conn . ControlMessage . Ping . toLazyByteString


--------------------------------------------------------------------------------
-- | Forks a ping thread, sending a ping message every @n@ seconds over the
-- connection. The thread dies silently if the connection crashes or is closed.
forkPingThread :: Connection -> Int -> IO ()
forkPingThread conn n
| n <= 0 = return ()
| otherwise = do
_ <- forkIO (ignore `handle` go 1)
return ()
where
go :: Int -> IO ()
go i = do
threadDelay (n * 1000 * 1000)
sendPing conn (T.pack $ show i)
go (i + 1)

ignore e = case fromException e of
Just async -> throw (async :: AsyncException)
Nothing -> return ()
6 changes: 5 additions & 1 deletion tests/javascript/server.hs
Original file line number Diff line number Diff line change
Expand Up @@ -104,4 +104,8 @@ application pc = do
--------------------------------------------------------------------------------
-- | Accepts clients, spawns a single handler for each one.
main :: IO ()
main = WS.runServer "0.0.0.0" 8000 application
main = WS.runServerWith "0.0.0.0" 8000 options application
where
options = WS.defaultConnectionOptions
{ WS.connectionPingInterval = 2
}
4 changes: 2 additions & 2 deletions websockets.cabal
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
Name: websockets
Version: 0.9.1.0
Version: 0.9.2.0

Synopsis:
A sensible and clean way to write WebSocket-capable servers in Haskell.
Expand Down Expand Up @@ -108,7 +108,7 @@ Test-suite websockets-tests
case-insensitive >= 0.3 && < 1.3,
containers >= 0.3 && < 0.6,
mtl >= 2.0 && < 2.3,
network >= 2.3 && < 2.6,
network >= 2.3 && < 2.7,
random >= 1.0 && < 1.1,
SHA >= 1.5 && < 1.7,
text >= 0.10 && < 1.3,
Expand Down

0 comments on commit 536849d

Please sign in to comment.