Skip to content

Commit ffda946

Browse files
author
Alberto
committed
self node detection, HTTP errors, cleaning documentation
1 parent 9f923b8 commit ffda946

File tree

1 file changed

+71
-65
lines changed

1 file changed

+71
-65
lines changed

transient-universe/src/Transient/Move/Internals.hs

Lines changed: 71 additions & 65 deletions
Original file line numberDiff line numberDiff line change
@@ -115,7 +115,7 @@ import Control.Concurrent
115115
import System.Mem.StableName
116116
import Unsafe.Coerce
117117
import System.Environment
118-
118+
import Data.Default
119119

120120
--import System.Random
121121
pk = BS.pack
@@ -546,9 +546,15 @@ wormhole' node (Cloud comp) = local $
546546
log <- getLog
547547

548548
if not $ recover log
549-
then
550-
runTrans $
551-
( do
549+
then runTrans $ (do
550+
my <- getMyNode
551+
if node== my then do
552+
let conn = fromMaybe (error "wormhole: no connection") moldconn
553+
rself <- liftIO $ newIORef $ Just Self
554+
liftIO $ writeIORef (remoteNode conn) $ Just node
555+
setData conn{connData= rself}
556+
comp
557+
else do
552558
tr "befor mconnect"
553559
conn <- mconnect node
554560
tr "wormhole after connect"
@@ -561,20 +567,16 @@ wormhole' node (Cloud comp) = local $
561567
)
562568
<*** do
563569
when (isJust moldconn) . setData $ fromJust moldconn
564-
else -- when (isJust mclosure) . setState $ fromJust mclosure
570+
else
565571

566572
do
567573
-- tr "YES REC"
568574
let conn = fromMaybe (error "wormhole: no connection in remote node") moldconn
569575
setData $ conn {calling = False}
570576
runTrans $ comp
571577

572-
-- <*** do when (isJust mclosure) . setData $ fromJust mclosure
573578

574-
-- #ifndef ghcjs_HOST_OS
575-
-- type JSString= String
576-
-- pack= id
577-
-- #endif
579+
578580

579581
data CloudException = CloudException Node SessionId IdClosure String deriving (Typeable, Show, Read)
580582

@@ -707,49 +709,13 @@ teleport = do
707709

708710
newtype PrevClos = PrevClos {unPrevClos :: DBRef LocalClosure}
709711

710-
{-
711-
receive conn clos idSession = do
712-
tr ("RECEIVE",clos, idSession)
713-
(lc, log) <- setCont clos idSession
714-
local $ do
715-
s <- giveParseString
716-
-- tr ("PARSESTRING",s,"LOG",toPath $ fulLog log)
717-
if recover log && not (BS.null s)
718-
then return ()
719-
else do
720-
when (synchronous conn) $ liftIO $ takeMVar $ localMvar lc
721-
tr ("EVAR waiting in", localCon lc, localClos lc)
722-
mr@(Right (a, b, c, _)) <- readEVar $ fromJust $ localEvar lc
723-
724-
tr ("RECEIVED", (a, b, c))
725-
726-
case mr of
727-
Right (SDone, _, _, _) -> empty
728-
Right (SError _, _, _, _) -> error "receive: SERROR"
729-
Right (SLast log, s2, closr, conn') -> do
730-
cdata <- liftIO $ readIORef $ connData conn' -- connection may have been changed
731-
liftIO $ writeIORef (connData conn) cdata
732-
case fromMaybe (error "Transient.Move:666: no connection") cdata of
733-
HTTP2Node _ _ _ httpheaders -> setState httpheaders
734-
tr ("RECEIVEDDDDDDDDDDDDDDDDDDDDDDD SLAST", log)
735-
setLog (idConn conn) log s2 closr
736-
Right (SMore log, s2, closr, conn') -> do
737-
cdata <- liftIO $ readIORef $ connData conn'
738-
liftIO $ writeIORef (connData conn) cdata
739-
case fromMaybe (error "Transient.Move:671: no connection") cdata of
740-
HTTP2Node _ _ _ httpheaders -> setState httpheaders
741-
tr ("RECEIVEDDDDDDDDDDDDDDDDDDDDDDD", log, closr)
742-
setLog (idConn conn) log s2 closr
743-
Left except -> do
744-
throwt except
745-
empty
746-
-}
712+
747713

748714
receive conn clos idSession = do
749715
tr ("RECEIVE",clos, idSession)
750716
(lc, log) <- setCont clos idSession
751717
s <- giveParseString
752-
-- tr ("PARSESTRING",s,"LOG",toPath $ fulLog log)
718+
-- tr ("receive PARSESTRING",s,"LOG",toPath $ fulLog log)
753719
if recover log && not (BS.null s)
754720
then (abduce >> receive1 lc) <|> return() -- watch this event var and continue restoring
755721
else receive1 lc
@@ -764,16 +730,18 @@ receive conn clos idSession = do
764730

765731
case mr of
766732
Right (SDone, _, _, _) -> empty
767-
Right (SError _, _, _, _) -> error "receive: SERROR"
733+
Right (SError e, _, _, _) -> error $ show("receive:",e)
768734
Right (SLast log, s2, closr, conn') -> do
769735
cdata <- liftIO $ readIORef $ connData conn' -- connection may have been changed
770736
liftIO $ writeIORef (connData conn) cdata
771-
tr ("RECEIVEDDDDDDDDDDDDDDDDDDDDDDD SLAST", log)
737+
-- setData conn'
738+
tr ("RECEIVED -------> SLAST", log)
772739
setLog (idConn conn) log s2 closr
773740
Right (SMore log, s2, closr, conn') -> do
774741
cdata <- liftIO $ readIORef $ connData conn'
775742
liftIO $ writeIORef (connData conn) cdata
776-
tr ("RECEIVEDDDDDDDDDDDDDDDDDDDDDDD", log, closr)
743+
-- setData conn'
744+
tr ("RECEIVED -------> SMORE", log, closr)
777745
setLog (idConn conn) log s2 closr
778746
Left except -> do
779747
throwt except
@@ -1025,7 +993,7 @@ msend :: Connection -> BL.ByteString -> TransIO ()
1025993
#ifndef ghcjs_HOST_OS
1026994

1027995
msend con bs = do
1028-
tr ("MSEND", unsafePerformIO $ readIORef $ remoteNode con, idConn con, "--------->------>", bs)
996+
ttr ("MSEND", unsafePerformIO $ readIORef $ remoteNode con, idConn con, "--------->------>", bs)
1029997
c <- liftIO $ readIORef $ connData con
1030998
con' <- case c of
1031999
Nothing -> do
@@ -1341,6 +1309,7 @@ mread (Connection _ _ _ (Just (Node2Web sconn )) _ _ _ _ _ _ _)= do
13411309
-- parallel many, used for parsing
13421310
parMany p = do
13431311
d <- isDone
1312+
-- tr("parmany",d)
13441313
if d then empty else p <|> parMany p
13451314

13461315
parallelReadHandler :: Loggable a => Connection -> TransIO (StreamData a)
@@ -1351,6 +1320,7 @@ parallelReadHandler conn = do
13511320
nomore = (do s <- getParseBuffer; if BS.null s then empty else error $ show $ ("malformed data received: expected Int, received: ", BS.take 10 s))
13521321

13531322
extractPacket = do
1323+
-- tr "extractPacket"
13541324
len <- integer <|> nomore
13551325
str <- tTake (fromIntegral len)
13561326
tr ("MREAD <-------<-------", str)
@@ -2104,6 +2074,16 @@ listen (node@(Node _ port _ _)) = onAll $ do
21042074
liftIO $ writeIORef (myNode conn) node'
21052075
setData conn
21062076

2077+
-- onException $ \(e :: SomeException) -> do -- msend conn $ BS.pack $ show e ; empty
2078+
-- -- cdata <- liftIO $ readIORef $ connData conn
2079+
-- ttr "ONEXcepTION1"
2080+
-- msend conn $ chunked "404: not found" <> endChunk
2081+
-- -- case cdata of
2082+
-- -- Just(HTTP2Node _ _ _ _) -> msend conn $ "\r\n" <> chunked "404: not found" <> endChunk
2083+
-- -- Just (HTTPS2Node _) -> msend conn $ "\r\n" <> chunked "404: not found" <> endChunk
2084+
-- -- _ -> msend conn $ BS.pack $ show e
2085+
-- empty
2086+
21072087
liftIO $ modifyMVar_ (fromJust $ connection node') $ const $ return [conn]
21082088

21092089
addNodes [node']
@@ -2113,6 +2093,17 @@ listen (node@(Node _ port _ _)) = onAll $ do
21132093

21142094
mlog <- listenNew (show port) conn <|> listenResponses :: TransIO (StreamData NodeMSG)
21152095
execLog mlog
2096+
2097+
chunked tosend= do
2098+
let l= fromIntegral $ BS.length tosend
2099+
toHex l <> "\r\n" <> tosend <> "\r\n"
2100+
2101+
endChunk= "0\r\n\r\n"
2102+
2103+
toHex 0 = mempty
2104+
toHex l =
2105+
let (q, r) = quotRem l 16
2106+
in toHex q <> (BS.singleton $ if r <= 9 then toEnum (fromEnum '0' + r) else toEnum (fromEnum 'A' + r -10))
21162107

21172108
-- onFinish $ const $ do
21182109
-- liftIO $ print "FINISH IN CLOSURE 0"
@@ -2187,9 +2178,11 @@ listenNew port conn' = do
21872178
ParseContext
21882179
)
21892180
}
2181+
2182+
21902183
cdata <- liftIO $ newIORef $ Just (Node2Node port sock addr)
21912184
let conn' = conn {connData = cdata}
2192-
-- TOD t _ <- liftIO $ getClockTime
2185+
21932186
liftIO $ modifyMVar_ (isBlocked conn) $ const $ return Nothing -- Just <$> return t
21942187
setState conn'
21952188
liftIO $ atomicModifyIORef connectionList $ \m -> (conn' : m, ()) -- TODO
@@ -2199,11 +2192,6 @@ listenNew port conn' = do
21992192
firstLine@(method, uri, vers) <- getFirstLine
22002193
headers <- getHeaders
22012194

2202-
-- setState $ HTTPHeaders firstLine headers
2203-
-- let httpHeaders= HTTPHeaders firstLine headers
2204-
-- tr ("HEADERS", headers)
2205-
-- string "\r\n\r\n"
2206-
-- tr (method, uri,vers)
22072195
case (method, uri) of
22082196
("CLOS", hisCookie) -> do
22092197
conn <- getSData
@@ -2230,7 +2218,7 @@ listenNew port conn' = do
22302218
-- it is a HTTP request
22312219
-- processMessage the current request in his own thread and then (<|>) any other request that arrive in the same connection
22322220
-- first@(method,uri, vers) <-cutBody firstLine headers <|> parMany cutHTTPRequest
2233-
httpHeaders@(HTTPHeaders (first@(method,uri, vers)) headers) <-cutBody firstLine headers <|> parMany cutHTTPRequest
2221+
httpHeaders@(HTTPHeaders (first@(method,uri, vers)) headers) <- cutBody firstLine headers <|> parMany cutHTTPRequest
22342222

22352223

22362224
let uri' = BC.tail $ uriPath uri -- !> uriPath uri
@@ -2381,7 +2369,7 @@ listenNew port conn' = do
23812369

23822370
liftIO $ SBSL.sendAll sock $ "HTTP/1.1 200 OK\r\nContent-Type: application/json\r\nTransfer-Encoding: chunked\r\n"
23832371
-- httpreq :: HTTPHeaders <- getState <|> error "no state"
2384-
ttr headers
2372+
tr headers
23852373
return $ SLast $ ClosureData remoteClosure s1 thisClosure s2 $ lazyByteString s
23862374
where
23872375

@@ -2410,15 +2398,18 @@ listenNew port conn' = do
24102398

24112399
u = unsafePerformIO
24122400
cutHTTPRequest = do
2401+
-- tr "cutHTTPRequest"
24132402
first@(method, _, _) <- getFirstLine
2414-
ttr first
2403+
-- tr ("first",first)
24152404
-- tr ("after getfirstLine", method, uri, vers)
24162405
headers <- getHeaders
24172406
-- setState $ HTTPHeaders first headers
24182407
cutBody first headers
24192408

24202409
cutBody (first@(method, _, _)) headers = do
2410+
-- tr "cutBody"
24212411
let ret= HTTPHeaders first headers
2412+
-- tr("headers",headers)
24222413
if method == "POST"
24232414
then case fmap (read . BC.unpack) $ lookup "Content-Length" headers of
24242415
Nothing -> return ret -- most likely chunked encoding, processed in the same thread
@@ -2714,11 +2705,17 @@ processMessage s1 closl s2 closr mlog deleteClosure = do
27142705
-- when deleteClosure $ liftIO $ atomically $ flushDBRef dbref
27152706
case mcont of
27162707
Nothing -> do
2717-
node <- liftIO $ readIORef (remoteNode conn) `onNothing` error "mconnect: no remote node?"
2718-
let e = "request received for non existent closure: " ++ show dbref
2719-
let err = CloudException node s1 closl $ show e
2708+
cdata <- liftIO $ readIORef $ connData conn
2709+
case cdata of
2710+
Just(HTTP2Node _ _ _ _) -> runTrans $ msend conn $ "\r\n" <> chunked "404: not found" <> endChunk
2711+
Just (HTTPS2Node _) -> runTrans $ msend conn $ "\r\n" <> chunked "404: not found" <> endChunk
2712+
_ -> do
2713+
node <- liftIO $ readIORef (remoteNode conn) `onNothing` error "mconnect: no remote node?"
2714+
let e = "request received for non existent closure: " ++ show dbref
2715+
let err = CloudException node s1 closl $ show e
27202716

2721-
throw err
2717+
throw err
2718+
return Nothing
27222719
Just LocalClosure {localCont = Nothing} -> do
27232720
tr "RESTORECLOSuRE"
27242721
restoreClosure s1 closl
@@ -2831,6 +2828,15 @@ type SKey = String
28312828
type SValue = String
28322829
type Service = [(SKey, SValue)]
28332830

2831+
instance Default Service where
2832+
def= [("service","$serviceName")
2833+
,("executable", "$execName")
2834+
,("package","$gitRepo")]
2835+
2836+
instance Default [Service] where
2837+
def= [def]
2838+
2839+
28342840
lookup2 key doubleList =
28352841
let r = mapMaybe (lookup key) doubleList
28362842
in if null r then Nothing else Just $ head r

0 commit comments

Comments
 (0)