Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with HTTPS or Subversion.

Download ZIP
Browse files

big rewrite, NEXT: handle heartbeat correctly.

  • Loading branch information...
commit 4228521d7dbafd27b732ccfde39b6c238b8ec7e6 1 parent d350ebf
@yihuang authored
View
144 Apps.hs
@@ -1,89 +1,85 @@
-{-# LANGUAGE OverloadedStrings, PatternGuards #-}
+{-# LANGUAGE OverloadedStrings, TemplateHaskell #-}
module Apps where
-import Data.Char
-import Data.Monoid
-import Data.ByteString (ByteString)
-import qualified Data.ByteString.Char8 as S
-import Data.Map (Map)
+import Control.Exception (fromException)
+import Control.Monad (forever, when, forM_)
+import Control.Monad.IO.Class (liftIO)
+import Control.Applicative
+import Control.Concurrent.MVar
+
+import Data.Map (Map)
import qualified Data.Map as M
+import Data.Monoid (mappend, mconcat)
+import Data.Attoparsec
+import Data.Attoparsec.Char8 (skipSpace)
+import Data.ByteString (ByteString)
+import qualified Data.ByteString.Char8 as S
+
+import Network.WebSockets.Lite
+
+echo :: WSLite ()
+echo = forever $ recvBS >>= send
-import Control.Exception
-import Control.Monad
-import Control.Monad.IO.Class
-import Control.Concurrent
+close' :: WSLite ()
+close' = return ()
-import Network.WebSockets
+data ChatMessage = ChatJoin ByteString
+ | ChatData ByteString
+ | ChatError ByteString
-import Network.Sockjs.Types
-import Network.Sockjs
+chatParser :: Parser ChatMessage
+chatParser = ChatJoin <$> (string "join" *> skipSpace *> takeByteString)
+ <|> ChatData <$> takeByteString
-echo :: TextProtocol p => Request -> WebSockets p ()
-echo req = do
- acceptRequest req
- _ <- startHBThread
- sendSockjs SockjsOpen
- forever $ do
- msg <- receiveSockjs
- sendSockjsData msg
+instance UpProtocol ChatMessage where
+ decode s = parseOnly chatParser s
-close :: TextProtocol p => Request -> WebSockets p ()
-close req = do
- acceptRequest req
- sendSockjs SockjsOpen
+instance DownProtocol ChatMessage where
+ encode (ChatData s) = s
+ encode (ChatError e) = "error: " `mappend` e
+ encode (ChatJoin name) = name `mappend` " joined"
-type ServerState p = Map ByteString (Sink p)
+type ChatState = MVar (Map ByteString Sink)
-clientExists :: Protocol p => ByteString -> ServerState p -> Bool
-clientExists name = maybe False (const True) . M.lookup name
+newChatState :: IO ChatState
+newChatState = newMVar M.empty
-chat :: TextProtocol p => MVar (ServerState p) -> Request -> WebSockets p ()
-chat state req = do
- acceptRequest req
- sendSockjs SockjsOpen
+chat :: ChatState -> WSLite ()
+chat clients = do
+ name <- recvJoin
sink <- getSink
- msg <- receiveSockjs
- clients <- liftIO $ readMVar state
- case msg of
- _ | not (prefix `S.isPrefixOf` msg) ->
- sendSockjsData "Wrong Annoucement!"
- | any ($ name)
- [S.null, S.any isPunctuation, S.any isSpace] ->
- sendSockjsData $
- "Name cannot " `mappend`
- "contain punctuation or whitespace, and " `mappend`
- "cannot be empty"
- | clientExists name clients ->
- sendSockjsData "User already exists"
- | otherwise -> do
- liftIO $ modifyMVar_ state $ \s -> do
- let s' = M.insert name sink s
- sendSink sink $ sockjsData $
- "Welcome! Users: " `mappend`
- S.intercalate ", " (M.keys s)
- broadcast (name `mappend` " joined") s'
- return s'
- _ <- startHBThread
- talk state name
- where
- prefix = "Hi! I am "
- name = S.drop (S.length prefix) msg
-
-broadcast :: TextProtocol p => ByteString -> ServerState p -> IO ()
-broadcast message clients =
- mapM_ (flip sendSink (sockjsData message)) $ M.elems clients
-
-talk :: TextProtocol p => MVar (ServerState p) -> ByteString -> WebSockets p ()
-talk state user = flip catchWsError catchDisconnect $ do
- msg <- receiveSockjs
- liftIO $ readMVar state >>= broadcast
- (user `mappend` ": " `mappend` msg)
- talk state user
+ exists <- liftIO $ modifyMVar clients $ \cs -> do
+ case M.lookup name cs of
+ Nothing -> return (M.insert name sink cs, False)
+ Just _ -> return (cs, True)
+ when exists $ fail' "User already exists."
+
+ flip catchError (handleDisconnect name) $ do
+ welcome name
+ broadcast $ ChatJoin name
+ forever $ do
+ msg <- recv
+ case msg of
+ ChatData s -> broadcast $ ChatData $ mconcat [name, ": ", s]
+ _ -> fail' "invalid message."
where
- catchDisconnect e = case fromException e of
- Just ConnectionClosed -> liftIO $ modifyMVar_ state $ \s -> do
- let s' = M.delete user s
- broadcast (user `mappend` " disconnected") s'
- return s'
+ fail' s = send (ChatError s) >> close
+ recvJoin = do msg <- recv
+ case msg of
+ ChatJoin name -> return name
+ _ -> fail' "invalid message."
+
+ broadcast msg = do
+ sinks <- M.elems <$> liftIO (readMVar clients)
+ forM_ sinks (flip sendSink msg)
+
+ welcome name = do
+ users <- filter (/=name) . M.keys <$> liftIO (readMVar clients)
+ send $ ChatData $ "Welcome! Users: " `mappend` S.intercalate ", " users
+
+ handleDisconnect name e = case fromException e of
+ Just ConnectionClosed -> do
+ liftIO $ modifyMVar_ clients $ return . M.delete name
+ broadcast $ ChatData $ mconcat [name, " disconnected."]
_ -> return ()
View
3  Network/Sockjs/Timer.hs → Control/Concurrent/Timer.hs
@@ -1,4 +1,5 @@
-module Network.Sockjs.Timer where
+module Control.Concurrent.Timer where
+
import Control.Concurrent
type TimerId = ThreadId
View
7 Control/Monad/Utils.hs
@@ -0,0 +1,7 @@
+module Control.Monad.Utils where
+
+ifM :: Monad m => m Bool -> m a -> m a -> m a
+ifM mb m1 m2 = do
+ b <- mb
+ if b then m1 else m2
+
View
12 Data/ByteString/Utils.hs
@@ -0,0 +1,12 @@
+module Data.ByteString.Utils where
+
+import Data.ByteString (ByteString)
+import qualified Data.ByteString.Lazy as L
+import qualified Data.ByteString as S
+
+toLazy :: ByteString -> L.ByteString
+toLazy = L.fromChunks . (:[])
+
+toStrict :: L.ByteString -> ByteString
+toStrict = S.concat . L.toChunks
+
View
78 Data/Enumerator/Utils.hs
@@ -0,0 +1,78 @@
+module Data.Enumerator.Utils where
+
+import Control.Applicative
+import Control.Monad.IO.Class (liftIO)
+
+import Data.ByteString (ByteString)
+import qualified Data.ByteString.Lazy as L
+import Data.Int (Int64)
+import Data.Enumerator
+import qualified Data.Enumerator.List as EL
+import Blaze.ByteString.Builder (Builder)
+import qualified Blaze.ByteString.Builder as B
+
+import Control.Concurrent.STM
+import Network.WebSockets.Lite.Emulate
+
+-- | An `Enumerator' only do IO.
+ioEnum :: IO () -> Enumerator a IO b
+ioEnum io step = liftIO io >> returnI step
+
+enumSingle :: Monad m => a -> Enumerator a m b
+enumSingle = enumChunks . (:[])
+
+enumChunks :: Monad m => [a] -> Enumerator a m b
+enumChunks xs = checkContinue0 $ \_ f -> f (Chunks xs) >>== returnI
+
+-- | set a limit to the stream size, but don't break chunk.
+limit :: Monad m => Int64 -> Enumeratee ByteString ByteString m b
+limit n step | n <= 0 = return step
+limit n (Continue k) = continue loop where
+ loop (Chunks []) = continue loop
+ loop (Chunks xs) = iter where
+ len = L.length (L.fromChunks xs)
+ iter = if len <= n
+ then k (Chunks xs) >>== limit (n - len)
+ else k (Chunks xs) >>== (`yield` Chunks [])
+ loop EOF = k EOF >>== (`yield` EOF)
+limit _ step = return step
+
+-- | fetch multiple (at least one) items from TChan at a time, if TChan is empty, block on it.
+readTChan' :: TChan a -> STM [a]
+readTChan' ch = (:) <$> readTChan ch <*> readRest ch
+ where
+ readRest ch = do
+ empty <- isEmptyTChan ch
+ if empty
+ then return []
+ else (:) <$> readTChan ch <*> readRest ch
+
+-- | fetch multiple (at least one) chunks from TChan at a time, and combine them into one.
+enumStreamChanContents :: StreamChan a -> Enumerator [a] IO b
+enumStreamChanContents ch = checkContinue0 $ \loop f -> do
+ streams <- liftIO $ atomically $ readTChan' ch
+ let chunks = takeWhile isChunk streams
+ datas = concat [xs | (Chunks xs) <- chunks]
+ if null chunks
+ then f EOF >>== returnI
+ else f (Chunks [datas]) >>== loop
+ where isChunk (Chunks _) = True
+ isChunk _ = False
+
+-- | like `Enumerator.List.concatMap' , but terminate when return Nothing
+concatMapMaybe :: Monad m => (ao -> Maybe [ai])
+ -> Enumeratee ao ai m b
+concatMapMaybe f = checkDone (continue . step) where
+ step k EOF = yield (Continue k) EOF
+ step k (Chunks xs) = loop k xs
+
+ loop k [] = continue (step k)
+ loop k (x:xs) = do
+ case f x of
+ Nothing -> k EOF >>==
+ (`yield` (Chunks xs))
+ Just fx -> k (Chunks fx) >>==
+ checkDoneEx (Chunks xs) (`loop` xs)
+
+chunking :: Monad m => Enumeratee ByteString Builder m a
+chunking = EL.concatMap ((:[B.flush]) . B.fromByteString)
View
31 Main.hs
@@ -1,46 +1,39 @@
{-# LANGUAGE OverloadedStrings, TemplateHaskell #-}
-import System.IO.Unsafe (unsafePerformIO)
import System.Environment (getArgs)
-
import Data.Maybe
-import qualified Data.Map as M
-
import Control.Applicative
-import Control.Concurrent
-import Network.Wai
+import Network.Wai (Application)
import Network.Wai.Handler.Warp
-import Network.WebSockets (TextProtocol)
import Data.FileEmbed (embedDir)
import qualified Network.Wai.Handler.WebSockets as WaiWS
import qualified Network.Wai.Application.Static as Static
import Network.Wai.Application.Sockjs
-import Apps (echo, chat, close, ServerState, chat)
-
-serverState :: TextProtocol p => MVar (ServerState p)
-serverState = unsafePerformIO $ newMVar M.empty
+import Apps (echo, chat, close', newChatState, ChatState, chat)
staticApp :: Application
staticApp = Static.staticApp Static.defaultFileServerSettings
-- { Static.ssFolder = Static.embeddedLookup $ Static.toEmbedded $(embedDir "static") }
{ Static.ssFolder = Static.fileSystemLookup "static" }
-wsApps :: TextProtocol p => AppRoute p
-wsApps = [ ( ["echo"], (echo, Nothing) )
- , ( ["chat"], (chat serverState, Just ["websocket"]) )
- , ( ["close"], (close, Nothing) )
- , ( ["disabled_websocket_echo"], (echo, Just ["websocket"]) )
- ]
+mkApps :: ChatState -> WSLiteRoute
+mkApps st = [ ( ["echo"], (echo, Nothing) )
+ , ( ["chat"], (chat st, Just ["websocket"]) )
+ , ( ["close"], (close', Nothing) )
+ , ( ["disabled_websocket_echo"], (echo, Just ["websocket"]) )
+ ]
main :: IO ()
main = do
port <- read . fromMaybe "8080" . listToMaybe <$> getArgs
sockjsState <- newSockjsState
+ chatState <- newChatState
putStrLn $ "http://localhost:"++show port++"/static/client.html"
+ let apps = mkApps chatState
runSettings defaultSettings
{ settingsPort = port
- , settingsIntercept = WaiWS.intercept (websocketApp wsApps)
- } $ httpRoute [(["static"], staticApp)] (sockjsApp sockjsState wsApps)
+ , settingsIntercept = WaiWS.intercept (wsApps apps)
+ } $ waiRoute [(["static"], staticApp)] (waiApps sockjsState apps)
View
124 Network/Sockjs.hs
@@ -1,55 +1,83 @@
-{-# LANGUAGE OverloadedStrings #-}
+{-# LANGUAGE ExistentialQuantification, OverloadedStrings, DeriveDataTypeable #-}
module Network.Sockjs
- ( sendSockjs
- , sendSockjsData
- , sockjsData
- , sendSinkSockjs
- , receiveSockjs
- , startHBThread
+ ( SockjsMessage (..)
+ , renderSockjs
+ , SockjsException (..)
+ , SockjsRequest (..)
+ , decodeSockjs
+ , decodeValue
) where
-import Control.Applicative
-import Control.Monad
-import Control.Monad.IO.Class
-import Control.Concurrent
+import Debug.Trace
+import Data.Maybe
+import Prelude hiding ( (++) )
+import Data.Typeable
import Data.Monoid
-import Data.ByteString (ByteString)
+import Blaze.ByteString.Builder (Builder)
import qualified Blaze.ByteString.Builder as B
+import Data.ByteString (ByteString)
+import qualified Data.ByteString.Char8 as S
import qualified Data.ByteString.Lazy as L
+import Data.Enumerator (Enumeratee)
+import qualified Data.Enumerator.List as EL
import Data.Aeson
-import Network.WebSockets
-import Network.Sockjs.Types
-
-heartBeatInterval :: Int
-heartBeatInterval = 25
-
-sendSockjs :: TextProtocol p => SockjsMessage -> WebSockets p ()
-sendSockjs = sendTextData . B.toLazyByteString . renderSockjs
-
-sendSockjsData :: TextProtocol p => ByteString -> WebSockets p ()
-sendSockjsData = sendSockjs . SockjsData . (:[])
-
-sockjsData :: (TextProtocol p, WebSocketsData a) => a -> Message p
-sockjsData = textData . B.toLazyByteString . renderSockjs . SockjsData . (:[]) . mconcat . L.toChunks . toLazyByteString
-
-sendSinkSockjs :: TextProtocol p => Sink p -> SockjsMessage -> IO ()
-sendSinkSockjs sink = sendSink sink . textData . B.toLazyByteString . renderSockjs
-
-receiveSockjs :: (TextProtocol p, FromJSON a, Monoid a) => WebSockets p a
-receiveSockjs = mconcat <$> receiveSockjs'
-
-receiveSockjs' :: (TextProtocol p, FromJSON a) => WebSockets p [a]
-receiveSockjs' = do
- msg <- receiveData
- case msg of
- "" -> receiveSockjs'
- _ -> maybe (throwWsError (SockjsError "Broken JSON encoding [receive]."))
- return
- (unSockjsRequest <$> decodeValue msg)
-
-startHBThread :: TextProtocol p => WebSockets p ThreadId
-startHBThread = do
- sink <- getSink
- liftIO . forkIO . forever $ do
- threadDelay (heartBeatInterval*1000*1000)
- sendSinkSockjs sink SockjsHeartbeat
+import Data.Aeson.Encode (fromValue)
+import Data.Aeson.Parser (value)
+import Data.Attoparsec
+import Control.Exception
+import Control.Applicative
+
+(++) :: Monoid a => a -> a -> a
+(++) = mappend
+
+data SockjsRequest a = SockjsRequest { unSockjsRequest :: [a] }
+ deriving (Show) -- for debug
+
+instance FromJSON a => FromJSON (SockjsRequest a) where
+ parseJSON js@(Array _) = SockjsRequest <$> parseJSON js
+ parseJSON js = SockjsRequest . (:[]) <$> parseJSON js
+
+data SockjsMessage = SockjsOpen
+ | SockjsHeartbeat
+ | SockjsData [ByteString]
+ | SockjsClose Int ByteString
+ deriving (Show)
+
+renderSockjs :: SockjsMessage -> Builder
+renderSockjs msg = case msg of
+ SockjsOpen -> B.fromByteString "o"
+ SockjsHeartbeat -> B.fromByteString "h"
+ (SockjsData xs) -> mconcat $
+ [ B.fromByteString "a"
+ , (fromValue . toJSON $ xs)
+ ]
+ (SockjsClose code reason) -> B.fromLazyByteString . L.fromChunks $
+ [ "c["
+ , S.pack (show code)
+ , ",\""
+ , reason
+ , "\"]"
+ ]
+
+decodeValue :: (FromJSON a) => ByteString -> Maybe a
+decodeValue s = case parse value s of
+ Done _ v -> case fromJSON v of
+ Success a -> Just a
+ _ -> Nothing
+ _ -> Nothing
+
+decodeSockjs :: ByteString -> Maybe SockjsMessage
+decodeSockjs s = case S.uncons s of
+ Just ('o', _) -> Just SockjsOpen
+ Just ('h', _) -> Just SockjsHeartbeat
+ Just ('a', s') -> SockjsData <$> decodeValue s'
+ Just ('c', s') -> do (code, reason) <- decodeValue s'
+ return $ SockjsClose code reason
+ _ -> trace ("unknown message:"++show s) Nothing
+
+data SockjsException = SockjsReadEOF
+ | SockjsError String
+ deriving (Show, Typeable)
+
+instance Exception SockjsException
+
View
76 Network/Sockjs/Types.hs
@@ -1,76 +0,0 @@
-{-# LANGUAGE ExistentialQuantification, OverloadedStrings, DeriveDataTypeable #-}
-module Network.Sockjs.Types
- ( SockjsMessage (..)
- , renderSockjs
- , SockjsException (..)
- , SockjsRequest (..)
- , decodeSockjs
- , decodeValue
- ) where
-
-import Debug.Trace
-import Prelude hiding ( (++) )
-import Data.Typeable
-import Data.Monoid
-import Blaze.ByteString.Builder (Builder)
-import qualified Blaze.ByteString.Builder as B
-import qualified Blaze.ByteString.Builder.Char8 as B
-import Data.ByteString (ByteString)
-import qualified Data.ByteString.Lazy.Char8 as L
-import Data.Aeson
-import Data.Aeson.Encode (fromValue)
-import Data.Aeson.Parser (value)
-import qualified Data.Attoparsec.Lazy as L
-import Control.Exception
-import Control.Applicative
-
-(++) :: Monoid a => a -> a -> a
-(++) = mappend
-
-data SockjsRequest a = SockjsRequest { unSockjsRequest :: [a] }
- deriving (Show) -- for debug
-
-instance FromJSON a => FromJSON (SockjsRequest a) where
- parseJSON js@(Array _) = SockjsRequest <$> parseJSON js
- parseJSON js = SockjsRequest . (:[]) <$> parseJSON js
-
-data SockjsMessage = SockjsOpen
- | SockjsHeartbeat
- | SockjsData [ByteString]
- | SockjsClose Int ByteString
- deriving (Show)
-
-renderSockjs :: SockjsMessage -> Builder
-renderSockjs msg = case msg of
- SockjsOpen -> B.fromByteString "o"
- SockjsHeartbeat -> B.fromByteString "h"
- (SockjsData xs) -> B.fromByteString "a"
- ++ (fromValue . toJSON $ xs)
- ++ B.fromByteString ""
- (SockjsClose code reason) -> B.fromByteString "c["
- ++ B.fromString (show code)
- ++ B.fromByteString ",\""
- ++ B.fromByteString reason
- ++ B.fromByteString "\"]"
-
-decodeValue :: (FromJSON a) => L.ByteString -> Maybe a
-decodeValue s = case L.parse value s of
- L.Done _ v -> case fromJSON v of
- Success a -> Just a
- _ -> Nothing
- _ -> Nothing
-
-decodeSockjs :: L.ByteString -> Maybe SockjsMessage
-decodeSockjs s = case L.uncons s of
- Just ('o', _) -> Just SockjsOpen
- Just ('h', _) -> Just SockjsHeartbeat
- Just ('a', s') -> SockjsData <$> decodeValue s'
- Just ('c', s') -> do (code, reason) <- decodeValue s'
- return $ SockjsClose code reason
- _ -> trace ("unknown message:"++show s) Nothing
-
-data SockjsException = SockjsReadEOF
- | SockjsError String
- deriving (Show, Typeable)
-
-instance Exception SockjsException
View
407 Network/Wai/Application/Sockjs.hs
@@ -1,69 +1,66 @@
{-# LANGUAGE OverloadedStrings, TupleSections, ViewPatterns, Rank2Types, ExistentialQuantification #-}
module Network.Wai.Application.Sockjs
- ( AppRoute
- , sockjsApp
- , websocketApp
- , httpRoute
+ ( WSLiteRoute
+ , waiApps
+ , wsApps
+ , waiRoute
, newSockjsState
) where
-- imports {{{
-import Prelude hiding (catch)
-import Data.Map (Map)
+import Data.Map (Map)
import qualified Data.Map as M
-import Data.Monoid
-import Data.List
-import Data.Maybe
-import Data.IORef
-import Data.Int (Int64)
-
-import Control.Exception
-import Control.Monad.IO.Class
-import Control.Monad
-import Control.Applicative
-import Control.Concurrent
-import Control.Concurrent.STM
-
-import Data.ByteString (ByteString)
+import Data.Monoid
+import Data.List
+import Data.Maybe
+import Data.IORef
+import Data.Int (Int64)
+
+import Prelude hiding (catch)
+import Control.Exception (catch, throw, mask, onException, SomeException)
+import Control.Monad.IO.Class (liftIO)
+import Control.Monad (forever, msum, when)
+import Control.Applicative ( (<$>) )
+import Control.Concurrent
+import Control.Concurrent.STM
+import Control.Concurrent.Timer
+
+import Data.ByteString (ByteString)
+import Data.ByteString.Utils (toLazy, toStrict)
import qualified Data.Text as T
-import Data.Text (Text)
+import Data.Text (Text)
import qualified Data.ByteString.Char8 as S
import qualified Data.ByteString.Lazy as L
-
-import Data.Enumerator hiding (map, foldl, mapM, concatMap)
-import qualified Data.Enumerator as E
-import qualified Data.Enumerator.List as EL
-
-import Blaze.ByteString.Builder (Builder)
+import Blaze.ByteString.Builder (Builder)
import qualified Blaze.ByteString.Builder as B
-
-import qualified Data.Attoparsec.Lazy as L
-import Data.Aeson (json, Value(Array), encode)
-import Data.Digest.Pure.MD5 (md5)
+import Data.Enumerator ( Enumerator, Enumeratee, run_, ($$), ($=), (=$), (>==>), Stream(..), returnI )
+import qualified Data.Enumerator.Utils as E
+import qualified Data.Enumerator.List as EL
+import qualified Data.Aeson as JSON
+import qualified Data.Aeson.Encode as JSON
+import Data.Digest.Pure.MD5 (md5)
import qualified Data.Binary as Binary
-import Network.HTTP.Types
-import Network.Wai
-
-import Network.WebSockets hiding (Request, Response, requestHeaders)
+import Network.HTTP.Types
+import Network.Wai
+import Web.Cookie
import qualified Network.WebSockets as WS
-import Network.WebSockets.Emulate
-
-import Web.Cookie
-
-import Network.Sockjs.Types
-import Network.Sockjs.Timer
+import Network.WebSockets.Lite
+import Network.WebSockets.Lite.Emulate
+import Network.Sockjs
-- }}}
responseLimit :: Int64
responseLimit = 128*1024
+heartBeatInterval :: Int
+heartBeatInterval = 5
-type AppRoute p = [([Text], (WSApp p, Maybe [Text]))]
+type WSLiteRoute = [([Text], (WSLite (), Maybe [Text]))]
-sockjsApp :: MVar SessionMap -> AppRoute EmulateProtocol -> Application
-sockjsApp msm apps req = case (requestMethod req, matchResult) of
+waiApps :: MVar SessionMap -> WSLiteRoute -> Application
+waiApps state apps req = case (requestMethod req, matchResult) of
("OPTIONS", Just _) -> return $ optionsResponse req
(m, Just ((app, disallows), path)) -> case (m, path) of
@@ -76,64 +73,64 @@ sockjsApp msm apps req = case (requestMethod req, matchResult) of
return $ chunkingTestResponse req
(_, [server, sid, path'])
- | maybe False (elem path') disallows ||
- T.null server || T.null sid ||
- T.any (=='.') server || T.any (=='.') sid ->
+ | maybe False (elem path') disallows
+ || T.null server || T.null sid
+ || T.any (=='.') server || T.any (=='.') sid ->
return $ notFound []
("POST", [_, sid, "xhr"]) ->
- downstream msm 1 sid app returnI hsJavascript newlineL req
+ downstream state sid app 1 returnI hsJavascript newline req
("POST", [_, sid, "xhr_streaming"]) ->
- let prelude = enumChunks
- [ B.fromByteString $ S.replicate 2048 'h'
- , B.fromByteString "\n"
- , B.flush
- ]
- in downstream msm responseLimit sid app prelude hsJavascript newlineL req
+ let prelude = E.enumSingle $ S.replicate 2048 'h' `mappend` "\n"
+ in downstream state sid app responseLimit prelude hsJavascript newline req
("GET", [_, sid, "jsonp"]) ->
- let wrap cb s = mconcat [toLazy cb, "(", encode s, ");\r\n"]
- in withCallback $ \cb ->
- downstream msm 1 sid app returnI hsJavascript (wrap cb) req
+ let wrap callback b = mconcat
+ [ B.fromByteString callback
+ , B.fromByteString "("
+ , JSON.fromValue . JSON.toJSON . B.toByteString $ b
+ , B.fromByteString ");\r\n"
+ ]
+ in withCallback $ \callback ->
+ downstream state sid app 1 returnI hsJavascript (wrap callback) req
("GET", [_, sid, "htmlfile"]) ->
- withCallback $ \cb ->
- downstream msm responseLimit sid app (htmlfile cb) hsHtml wrap req
+ withCallback $ \callback ->
+ downstream state sid app responseLimit (htmlfile callback) hsHtml wrap req
where
- wrap s = mconcat ["<script>\np(", encode s, ");\n</script>\r\n"]
- htmlfile cb = enumChunks
- [ B.fromByteString
- "<!doctype html>\n\
- \<html><head>\n\
- \ <meta http-equiv=\"X-UA-Compatible\" content=\"IE=edge\" />\n\
- \ <meta http-equiv=\"Content-Type\" content=\"text/html; charset=UTF-8\" />\n\
- \</head><body><h2>Don't panic!</h2>\n\
- \ <script>\n\
- \ document.domain = document.domain;\n\
- \ var c = parent.", B.fromByteString cb, B.fromByteString ";\n\
- \ c.start();\n\
- \ function p(d) {c.message(d);};\n\
- \ window.onload = function() {c.stop();};\n\
- \ </script>"
- , B.fromByteString $ S.replicate 658 ' ' -- ensure 1024 length
- , B.flush
- ]
+ wrap b = mconcat [ B.fromByteString "<script>\np("
+ , JSON.fromValue . JSON.toJSON . B.toByteString $ b
+ , B.fromByteString ");\n</script>\r\n"
+ ]
+ htmlfile callback = E.enumSingle $
+ "<!doctype html>\n\
+ \<html><head>\n\
+ \ <meta http-equiv=\"X-UA-Compatible\" content=\"IE=edge\" />\n\
+ \ <meta http-equiv=\"Content-Type\" content=\"text/html; charset=UTF-8\" />\n\
+ \</head><body><h2>Don't panic!</h2>\n\
+ \ <script>\n\
+ \ document.domain = document.domain;\n\
+ \ var c = parent." `mappend` callback `mappend` ";\n\
+ \ c.start();\n\
+ \ function p(d) {c.message(d);};\n\
+ \ window.onload = function() {c.stop();};\n\
+ \ </script>" `mappend` S.replicate 658 ' ' -- ensure 1024 length
("GET", [_, sid, "eventsource"]) ->
let wrap b = mconcat
- [ "data: "
+ [ B.fromByteString "data: "
, b
- , "\r\n\r\n"
+ , B.fromByteString "\r\n\r\n"
]
- prelude = enumChunks [B.fromByteString "\r\n", B.flush]
- in downstream msm responseLimit sid app prelude hsEventStream wrap req
+ prelude = E.enumSingle "\r\n"
+ in downstream state sid app responseLimit prelude hsEventStream wrap req
("POST", [_, sid, "xhr_send"]) ->
- upstream msm sid id noContent req
+ upstream state sid id (noContent req) req
("POST", [_, sid, "jsonp_send"]) ->
- upstream msm sid parse (\req' -> ok (hsCookie req') $ B.fromByteString "ok") req
+ upstream state sid parse (ok (hsCookie req) $ B.fromByteString "ok") req
where
ct = fromMaybe "application/x-www-form-urlencoded" $ lookup "Content-Type" $ requestHeaders req
parse body = case ct of
@@ -147,7 +144,7 @@ sockjsApp msm apps req = case (requestMethod req, matchResult) of
_ -> return $ notFound []
where
withCallback f = case lookup "c" (queryString req) of
- Just (Just cb) | not (S.null cb) -> f cb
+ Just (Just callback) | not (S.null callback) -> f callback
_ -> return $ serverError "\"callback\" parameter required"
matchResult = msum (map match apps)
@@ -155,35 +152,12 @@ sockjsApp msm apps req = case (requestMethod req, matchResult) of
isIframe p = T.isPrefixOf "iframe" p && T.isSuffixOf ".html" p
--- enumerator utils {{{
-
-toLazy :: ByteString -> L.ByteString
-toLazy = L.fromChunks . (:[])
-
-toStrict :: L.ByteString -> ByteString
-toStrict = S.concat . L.toChunks
-
-ifM :: Monad m => m Bool -> m a -> m a -> m a
-ifM mb m1 m2 = do
- b <- mb
- if b then m1 else m2
-
-ioEnum :: IO () -> Enumerator a IO b
-ioEnum io step = liftIO io >> E.returnI step
-
-enumChunks :: Monad m => [a] -> Enumerator a m b
-enumChunks xs = E.checkContinue0 $ \_ f -> f (E.Chunks xs) >>== E.returnI
--- }}}
-
-- response utils {{{
-newlineL :: L.ByteString -> L.ByteString
-newlineL = (`mappend` "\n")
-
-newlineB :: Builder -> Builder
-newlineB = (`mappend` B.fromByteString "\n")
+newline :: Builder -> Builder
+newline = (`mappend` B.fromByteString "\n")
-hsJavascript, hsPlain, hsHtml, hsEventStream, hsCache, hsNoCache :: Headers
+hsJavascript, hsPlain, hsHtml, hsEventStream, hsCache, hsNoCache :: [Header]
hsJavascript = [("Content-Type", "application/javascript; charset=UTF-8")]
hsPlain = [("Content-Type", "text/plain; charset=UTF-8")]
hsHtml = [("Content-Type", "text/html; charset=UTF-8")]
@@ -191,35 +165,35 @@ hsEventStream = [("Content-Type", "text/event-stream; charset=UTF-8")]
hsCache = [("Cache-Control", "public; max-age=31536000;")
,("Expires", "31536000")]
hsNoCache = [("Cache-Control", "no-store, no-cache, must-revalidate, max-age=0")]
-hsAC :: Request -> Headers
+hsAC :: Request -> [Header]
hsAC req = [("access-control-max-age", "31536000")
,("access-control-allow-origin", origin)
,("access-control-allow-credentials", "true")]
where origin = fromMaybe "*" $ lookup "Origin" $ requestHeaders req
-hsCookie :: Request -> Headers
+hsCookie :: Request -> [Header]
hsCookie req = [("Set-Cookie", mconcat ["JSESSIONID=", jsid, "; path=/"])]
where jsid = fromMaybe "dummy" $
lookup "Cookie" (requestHeaders req) >>=
lookup "JSESSIONID" . parseCookies
-hsETag :: ByteString -> Headers
+hsETag :: ByteString -> [Header]
hsETag etag = [("ETag", etag)]
-hsCommon :: Request -> Headers
+hsCommon :: Request -> [Header]
hsCommon req = hsCookie req
++ hsAC req
-ok :: Headers -> Builder -> Response
+ok :: [Header] -> Builder -> Response
ok = ResponseBuilder statusOK
jsOk :: Request -> Builder -> Response
jsOk req = ok (hsJavascript ++ hsCommon req)
sockjsOk :: Request -> SockjsMessage -> Response
-sockjsOk req = jsOk req . newlineB . renderSockjs
+sockjsOk req = jsOk req . newline . renderSockjs
-notFound :: Headers -> Response
+notFound :: [Header] -> Response
notFound hs = ResponseBuilder statusNotFound hs mempty
notModified :: Response
@@ -236,10 +210,8 @@ serverError msg = ResponseBuilder statusServerError [] (B.fromByteString msg)
optionsResponse :: Request -> Response
optionsResponse req = ResponseBuilder statusNoContent
- ( ("Allow", "OPTIONS, POST")
- : hsCache
- ++ hsCommon req
- ) mempty
+ (("Allow", "OPTIONS, POST") : (hsCache ++ hsCommon req))
+ mempty
chunkingTestResponse :: Request -> Response
chunkingTestResponse req = ResponseEnumerator enum
@@ -247,12 +219,10 @@ chunkingTestResponse req = ResponseEnumerator enum
enum f = run_ $ chunkings $$ iter
where
iter = f statusOK (hsJavascript ++ hsAC req)
- prelude = enumChunks [ B.fromByteString "h\n", B.flush
- , B.fromByteString $ S.replicate 2048 ' '
- , B.fromByteString "h\n", B.flush
- ]
- step = enumChunks [B.fromByteString "h\n", B.flush]
- chunkings = foldl (\e ms -> e >==> ioEnum (threadDelay $ ms*1000) >==> step)
+ prelude = E.enumChunks ["h\n", S.replicate 2048 ' ' `mappend` "h\n"]
+ $= E.chunking
+ step = E.enumSingle "h\n" $= E.chunking
+ chunkings = foldl (\e ms -> e >==> E.ioEnum (threadDelay $ ms*1000) >==> step)
prelude
[5, 25, 125, 625, 3125]
@@ -284,70 +254,81 @@ iframeResponse req =
\</html>"
hashed = toStrict $ Binary.encode $ md5 content
--- | pass data from outChan to client streamlined.
+-- | get messages application.
downstream :: MVar SessionMap
- -> Int64 -- response size limit, when exceeded, close connection, client will reconnect automitically.
-> SessionId
- -> WSApp EmulateProtocol -- the websocket app to run if not exists.
- -> (forall a. Enumerator Builder IO a) -- prelude data to send.
- -> Headers -- extra response headers, e.g. content-type.
- -> (L.ByteString -> L.ByteString) -- wraper for response message.
+ -> WSLite () -- the app to run if not exists.
+ -> Int64 -- response size limit, when exceeded, close connection, client will reconnect automitically.
+ -> (forall a. Enumerator ByteString IO a) -- prelude data to send.
+ -> [Header] -- extra response headers, e.g. content-type.
+ -> (Builder -> Builder) -- wraper for response message.
-> Application
-downstream msm rsplimit sid app prelude headers wrap req = return $ ResponseEnumerator $ \f -> do
- sess <- getOrNewSession msm sid app req
+downstream state sid app rsplimit prelude headers wrap req = return $ ResponseEnumerator $ \f -> do
+ sess <- getOrNewSession state sid app
closed' <- readIORef (closed sess)
liftIO $ putStrLn $ "closed:" ++ show closed'
if closed'
- then run_ $ prelude >==> enumChunks [wrapB $ renderSockjs $ SockjsClose 3000 "Go away!", B.flush]
+ then run_ $ (prelude >==> enumMsg (SockjsClose 3000 "Go away!"))
+ $= E.chunking
$$ header f
else tryModifyMVar (inited sess)
- ( run_ $ prelude >==> enumChunks [wrapB $ renderSockjs $ SockjsClose 2010 "Another connection still open", B.flush]
+ -- fail to hold the lock
+ ( run_ $ (prelude >==> enumMsg (SockjsClose 2010 "Another connection still open"))
+ $= E.chunking
$$ header f
)
+ -- got the lock
(\inited' -> do
- if inited'
- then cancelTimer sess
- else do -- ignore response
- _ <- atomically $ readTChan $ outChan sess
- return ()
- let body = prelude >==> messages (outChan sess)
- r <- run_ (body $$ header f)
- `catch` (\e -> atomically (writeTChan (inChan sess) EOF) >> throw (e::SomeException))
+ cancelTimer sess
+ let open = if inited'
+ then returnI
+ else enumMsg SockjsOpen
+ body = prelude
+ >==> open
+ >==> messages (outChan sess)
+ >==> enumMsg (SockjsClose 3000 "Go away!")
+ iter = body
+ $= E.limit rsplimit
+ $= E.chunking
+ $$ header f
+ -- when write to socket failed, means connection lost, notify inner app.
+ r <- run_ iter `catch` (\e ->
+ atomically (writeTChan (inChan sess) EOF) >>
+ throw (e::SomeException) )
startTimer sess
return (True, r)
)
where
+ renderBS = B.toByteString . wrap . renderSockjs
+ enumMsg = E.enumChunks . (:[]) . renderBS
header f = f statusOK (headers ++ hsCommon req ++ hsNoCache)
- wrapB = B.fromLazyByteString . wrap . B.toLazyByteString
- messages :: StreamChan ByteString -> Enumerator Builder IO b
- messages ch = enumChan ch
- $= EL.map (wrap . parseFramePayload . toLazy)
- $= limit rsplimit
- $= EL.concatMap ((:[B.flush]) . B.fromLazyByteString)
+ messages :: StreamChan ByteString -> Enumerator ByteString IO b
+ messages ch = E.enumStreamChanContents ch
+ $= EL.map (renderBS . SockjsData)
+-- | send message applicatin.
upstream :: MVar SessionMap
-> SessionId
-> (L.ByteString -> L.ByteString) -- parse payload from request body.
- -> (Request -> Response) -- success response.
+ -> Response -- success response.
-> Application
-upstream msm sid parse successResponse req =
- M.lookup sid <$> liftIO (readMVar msm) >>=
+upstream state sid parse successResponse req =
+ M.lookup sid <$> liftIO (readMVar state) >>=
maybe (return $ notFound $ hsCookie req)
- (\sess ->
- ifM (liftIO $ readIORef $ closed sess)
- (return $ sockjsOk req $ SockjsClose 3000 "Go away!")
- ( do
- msg <- parse . L.fromChunks <$> EL.consume
- if L.null msg
- then return $ serverError "Payload expected."
- else -- validate json format.
- case L.parse json msg of
- L.Done _ (Array _) -> do
- liftIO $ passRequest (inChan sess) msg
- return $ successResponse req
- _ -> return $ serverError "Broken JSON encoding."
- )
+ (\sess -> do
+ closed' <- liftIO $ readIORef $ closed sess
+ if closed'
+ then return $ sockjsOk req $ SockjsClose 3000 "Go away!"
+ else do
+ payload <- parse . L.fromChunks <$> EL.consume
+ if L.null payload
+ then return $ serverError "Payload expected."
+ else case JSON.decode payload of
+ Just (SockjsRequest xs) -> do
+ liftIO $ atomically $ writeTChan (inChan sess) (Chunks xs)
+ return successResponse
+ Nothing -> return $ serverError "Broken JSON encoding."
)
-- }}}
@@ -368,8 +349,6 @@ data Session = Session
type SessionMap = Map SessionId Session
-type WSApp p = WS.Request -> WebSockets p ()
-
newSockjsState :: IO (MVar SessionMap)
newSockjsState = newMVar M.empty
@@ -387,10 +366,9 @@ tryModifyMVar m ac f =
-- | create session if not exists.
getOrNewSession :: MVar SessionMap
-> SessionId
- -> WSApp EmulateProtocol
- -> Request
+ -> WSLite ()
-> IO Session
-getOrNewSession msm sid app req = modifyMVar msm $ \sm ->
+getOrNewSession state sid app = modifyMVar state $ \sm ->
case M.lookup sid sm of
Just old -> return (sm, old)
Nothing -> do
@@ -401,11 +379,12 @@ getOrNewSession msm sid app req = modifyMVar msm $ \sm ->
closed' <- newIORef False
timer' <- newIORef Nothing
thread <- forkIO $ do
- let req' = RequestHttpPart (rawPathInfo req) (requestHeaders req)
- runEmulator in_ out req' (\r -> app r `catchWsError` const (return ()))
- M.lookup sid <$> readMVar msm >>=
+ -- TODO handle heart beat correctly
+ -- runWSLite in_ out (startHBThread >> app)
+ runWSLite in_ out app
+ M.lookup sid <$> readMVar state >>=
maybe (return $ error "impossible:session disappeared before close!")
- (closeSession msm)
+ (closeSession state)
let sess = Session
{ sessionId = sid
@@ -420,13 +399,14 @@ getOrNewSession msm sid app req = modifyMVar msm $ \sm ->
return (sm', sess)
closeSession :: MVar SessionMap -> Session -> IO ()
-closeSession msm sess = do
+closeSession state sess = do
putStrLn $ "close session "++show (sessionId sess)
writeIORef (closed sess) True
atomically $ writeTChan (outChan sess) EOF
threadDelay (5*1000*1000)
- modifyMVar_ msm $ return . M.delete (sessionId sess)
+ modifyMVar_ state $ return . M.delete (sessionId sess)
+-- | start a timer, when timeout close the session
startTimer :: Session -> IO ()
startTimer sess = do
putStrLn $ "start timer "++show (sessionId sess)
@@ -437,6 +417,7 @@ startTimer sess = do
t <- setTimeout (5*1000*1000) $ atomically $ writeTChan (inChan sess) EOF
writeIORef (timer sess) (Just t)
+-- | try to cancel the close-session timer.
cancelTimer :: Session -> IO ()
cancelTimer sess = do
putStrLn $ "cancel timer "++show (sessionId sess)
@@ -447,45 +428,41 @@ cancelTimer sess = do
-- }}}
--- transport utils {{{
-
-passRequest :: StreamChan ByteString -> L.ByteString -> IO ()
-passRequest ch =
- atomically
- . writeTChan ch
- . return
- . B.toByteString
- . encodeFrame EmulateProtocol Nothing
- . Frame True BinaryFrame
-
-parseFramePayload :: L.ByteString -> L.ByteString
-parseFramePayload = fromMaybe (error "Internal Encoding Error [downstream]") .
- (framePayload <$>) . L.maybeResult . L.parse (decodeFrame EmulateProtocol)
-
-limit :: Monad m => Int64 -> Enumeratee L.ByteString L.ByteString m b
-limit n step | n <= 0 = return step
-limit n (Continue k) = continue loop where
- loop (Chunks []) = continue loop
- loop (Chunks xs) = iter where
- len = L.length (L.concat xs)
- iter = if len <= n
- then k (Chunks xs) >>== limit (n - len)
- else k (Chunks xs) >>== (`E.yield` Chunks [])
- loop EOF = k EOF >>== (`E.yield` EOF)
-limit _ step = return step
-
--- }}}
-
-websocketApp :: AppRoute Hybi00 -> WSApp Hybi00
-websocketApp apps req = case msum $ map match apps of
- Just ((app, disallows), [_,_,"websocket"])
- | maybe True (notElem "websocket") disallows -> app req
- _ -> WS.rejectRequest req "Forbidden!"
+startHBThread :: WSLite ThreadId
+startHBThread = do
+ sink <- getSink
+ liftIO . forkIO . forever $ do
+ threadDelay (heartBeatInterval*1000*1000)
+ sink $ B.toByteString $ renderSockjs SockjsHeartbeat
+
+sockjsWrapper :: Monad m => Enumeratee ByteString ByteString m a
+sockjsWrapper = E.concatMapMaybe f
+ where f s = if S.null s
+ then Just [] -- ignore empty message.
+ else unSockjsRequest <$> decodeValue s
+
+wsApp :: WSLite () -> WS.Request -> WS.WebSockets WS.Hybi00 ()
+wsApp lite req = do
+ WS.acceptRequest req
+ WS.sendTextData ("o"::ByteString)
+ sink <- WS.getSink
+ let sink' = WS.sendSink sink . WS.textData . B.toLazyByteString . renderSockjs . SockjsData . (:[])
+ iter = sockjsWrapper =$ iterWSLite lite sink'
+ success <- (WS.runIteratee iter >> return True) `WS.catchWsError`
+ ((>> return False) . liftIO . putStrLn . ("uncaught exception: "++) . show)
+ when success $ WS.sendTextData $ B.toByteString $ renderSockjs $ SockjsClose 3000 "Go away!"
+
+wsApps :: WSLiteRoute -> WS.Request -> WS.WebSockets WS.Hybi00 ()
+wsApps apps req =
+ case msum $ map match apps of
+ Just ((app, disallows), [_,_,"websocket"])
+ | maybe True (notElem "websocket") disallows -> wsApp app req
+ _ -> WS.rejectRequest req "Forbidden!"
where path = decodePathSegments $ WS.requestPath req
match (prefix, app) = (app, ) <$> stripPrefix prefix path
-httpRoute :: [([Text], Application)] -> Application -> Application
-httpRoute routes fallback req =
+waiRoute :: [([Text], Application)] -> Application -> Application
+waiRoute routes fallback req =
fromMaybe (fallback req) $ msum $ map match routes
where
path = pathInfo req
View
2  static/client.js
@@ -49,7 +49,7 @@ $(document).ready(function () {
ws.onopen = function() {
log('open');
- ws.send('Hi! I am ' + user);
+ ws.send('join ' + user);
};
ws.onclose = function() {
log('closed');
Please sign in to comment.
Something went wrong with that request. Please try again.