Skip to content
Permalink
Browse files

Merge #750

750: DNS/IP based subscription implementation. r=karknu a=karknu

A subscription service that manages a set of muxbearers with
applications. Configuration can either be in the form of a domain name or
a list of IP addresses.

Co-authored-by: Karl Knutsson <karl.knutsson@iohk.io>
  • Loading branch information...
iohk-bors and karknu committed Jul 12, 2019
2 parents 87d5d4b + b4c2808 commit 76bb5c16aa88aaccecb79c92542ab7f8a882caf4
@@ -16,7 +16,7 @@ import Ouroboros.Byron.Proxy.DB (DB)
import Ouroboros.Byron.Proxy.Network.Protocol (responderVersions)
import Ouroboros.Byron.Proxy.ChainSync.Server (PollT, chainSyncServer)
import Ouroboros.Network.Protocol.Handshake.Type (Accept (..))
import Ouroboros.Network.Socket (AnyMuxResponderApp (..), withServerNode)
import Ouroboros.Network.Socket (AnyMuxResponderApp (..), newConnectionTable, withServerNode)

import Orphans ()

@@ -33,15 +33,17 @@ runServer
-> IO ()
runServer serverOptions epochSlots db = do
addrInfos <- Network.getAddrInfo (Just addrInfoHints) (Just host) (Just port)
tbl <- newConnectionTable
case addrInfos of
[] -> error "no getAddrInfo"
(addrInfo : _) -> withServerNode
tbl
addrInfo
encodeTerm
decodeTerm
(\_ _ _ -> Accept)
(fmap AnyMuxResponderApp (responderVersions epochSlots app))
wait
(\_ -> wait)

where

Some generated files are not rendered by default. Learn more.

@@ -173,13 +173,15 @@ pingPongClientCount 0 = PingPong.SendMsgDone ()
pingPongClientCount n = SendMsgPing (pure (pingPongClientCount (n-1)))

serverPingPong :: IO ()
serverPingPong =
serverPingPong = do
tbl <- newConnectionTable
withSimpleServerNode
tbl
defaultLocalSocketAddrInfo
(\(DictVersion codec)-> encodeTerm codec)
(\(DictVersion codec)-> decodeTerm codec)
(\(DictVersion _) -> acceptEq)
(simpleSingletonVersions (0::Int) (NodeToNodeVersionData 0) (DictVersion nodeToNodeCodecCBORTerm) muxApplication) $ \serverAsync ->
(simpleSingletonVersions (0::Int) (NodeToNodeVersionData 0) (DictVersion nodeToNodeCodecCBORTerm) muxApplication) $ \_ serverAsync ->
wait serverAsync -- block until async exception
where
muxApplication :: MuxApplication ResponderApp DemoProtocol0 IO LBS.ByteString Void ()
@@ -267,13 +269,15 @@ pingPongClientPipelinedMax c =
(\n' -> go (Right n' : acc) o n)

serverPingPong2 :: IO ()
serverPingPong2 =
serverPingPong2 = do
tbl <- newConnectionTable
withSimpleServerNode
tbl
defaultLocalSocketAddrInfo
(\(DictVersion codec)-> encodeTerm codec)
(\(DictVersion codec)-> decodeTerm codec)
(\(DictVersion _) -> acceptEq)
(simpleSingletonVersions (0::Int) (NodeToNodeVersionData 0) (DictVersion nodeToNodeCodecCBORTerm) muxApplication) $ \serverAsync ->
(simpleSingletonVersions (0::Int) (NodeToNodeVersionData 0) (DictVersion nodeToNodeCodecCBORTerm) muxApplication) $ \_ serverAsync ->
wait serverAsync -- block until async exception
where
muxApplication :: MuxApplication ResponderApp DemoProtocol1 IO LBS.ByteString Void ()
@@ -335,13 +339,15 @@ clientChainSync sockAddrs =


serverChainSync :: FilePath -> IO ()
serverChainSync sockAddr =
serverChainSync sockAddr = do
tbl <- newConnectionTable
withSimpleServerNode
tbl
(mkLocalSocketAddrInfo sockAddr)
(\(DictVersion codec)-> encodeTerm codec)
(\(DictVersion codec)-> decodeTerm codec)
(\(DictVersion _) -> acceptEq)
(simpleSingletonVersions (0::Int) (NodeToNodeVersionData 0) (DictVersion nodeToNodeCodecCBORTerm) muxApplication) $ \serverAsync ->
(simpleSingletonVersions (0::Int) (NodeToNodeVersionData 0) (DictVersion nodeToNodeCodecCBORTerm) muxApplication) $ \_ serverAsync ->
wait serverAsync -- block until async exception
where
prng = mkSMGen 0
@@ -514,13 +520,15 @@ clientBlockFetch sockAddrs = do


serverBlockFetch :: FilePath -> IO ()
serverBlockFetch sockAddr =
serverBlockFetch sockAddr = do
tbl <- newConnectionTable
withSimpleServerNode
tbl
(mkLocalSocketAddrInfo sockAddr)
(\(DictVersion codec)-> encodeTerm codec)
(\(DictVersion codec)-> decodeTerm codec)
(\(DictVersion _) -> acceptEq)
(simpleSingletonVersions (0::Int) (NodeToNodeVersionData 0) (DictVersion nodeToNodeCodecCBORTerm) muxApplication) $ \serverAsync ->
(simpleSingletonVersions (0::Int) (NodeToNodeVersionData 0) (DictVersion nodeToNodeCodecCBORTerm) muxApplication) $ \_ serverAsync ->
wait serverAsync -- block until async exception
where
prng = mkSMGen 0
@@ -55,6 +55,9 @@ library
Ouroboros.Network.Server.Version.CBOR
Ouroboros.Network.Server.Version.Protocol
Ouroboros.Network.Socket
Ouroboros.Network.Subscription.Common
Ouroboros.Network.Subscription.Dns
Ouroboros.Network.Subscription.Subscriber
Ouroboros.Network.Protocol.ChainSync.Client
Ouroboros.Network.Protocol.ChainSync.Codec
Ouroboros.Network.Protocol.ChainSync.Server
@@ -114,7 +117,9 @@ library
bytestring >=0.10 && <0.11,
cborg >=0.2.1 && <0.3,
containers,
dns,
fingertree >=0.1.4.2 && <0.2,
iproute >=1.7.0 && <1.8,
network,
serialise >=0.2 && <0.3,
stm >=2.4 && <2.6,
@@ -190,6 +195,9 @@ test-suite tests
Ouroboros.Network.Protocol.LocalTxSubmission.Test
Ouroboros.Network.Socket
Ouroboros.Network.Server.Socket
Ouroboros.Network.Subscription.Common
Ouroboros.Network.Subscription.Dns
Ouroboros.Network.Subscription.Subscriber
Ouroboros.Network.TxSubmission.Inbound
Ouroboros.Network.TxSubmission.Outbound

@@ -204,6 +212,7 @@ test-suite tests
Test.Mux
Test.Pipe
Test.Socket
Test.Subscription
default-language: Haskell2010
build-depends: base,
typed-protocols,
@@ -221,8 +230,10 @@ test-suite tests
cborg,
containers,
directory,
dns,
fingertree,
hashable,
iproute,
mtl,
network,
pipes,
@@ -240,6 +251,7 @@ test-suite tests
ghc-options: -Wall
-Wno-unticked-promoted-constructors
-fno-ignore-asserts
-threaded
if flag(ipv6)
cpp-options: -DOUROBOROS_NETWORK_IPV6

@@ -119,14 +119,20 @@ connectTo =
-- | A specialised version of @'Ouroboros.Network.Socket.withServerNode'@
--
withServer
:: Socket.AddrInfo
:: ConnectionTable
-> Socket.AddrInfo
-> (forall vData. DictVersion vData -> vData -> vData -> Accept)
-> Versions NodeToClientVersion DictVersion
(AnyMuxResponderApp NodeToClientProtocols IO BL.ByteString)
-> (Async () -> IO t)
-> IO t
withServer addr =
withServer tbl addr acceptVersion versions k =
withServerNode
tbl
addr
(\(DictVersion codec) -> encodeTerm codec)
(\(DictVersion codec) -> decodeTerm codec)
acceptVersion
versions
(\_ -> k)

@@ -133,13 +133,18 @@ connectTo =
-- | A specialised version of @'Ouroboros.Network.Socket.withServerNode'@
--
withServer
:: Socket.AddrInfo
:: ConnectionTable
-> Socket.AddrInfo
-> (forall vData. DictVersion vData -> vData -> vData -> Accept)
-> Versions NodeToNodeVersion DictVersion (AnyMuxResponderApp NodeToNodeProtocols IO BL.ByteString)
-> (Async () -> IO t)
-> IO t
withServer addr =
withServer tbl addr acceptVersion versions k =
withServerNode
addr
tbl
addr
(\(DictVersion codec) -> encodeTerm codec)
(\(DictVersion codec) -> decodeTerm codec)
acceptVersion
versions
(\_ -> k)

0 comments on commit 76bb5c1

Please sign in to comment.
You can’t perform that action at this time.