Skip to content

Commit

Permalink
Fix bundle import and streaming client
Browse files Browse the repository at this point in the history
  • Loading branch information
jhbertra committed Jul 28, 2023
1 parent 8ce02ed commit d568002
Show file tree
Hide file tree
Showing 7 changed files with 140 additions and 74 deletions.
32 changes: 23 additions & 9 deletions marlowe-client/src/Language/Marlowe/Runtime/Client/Transfer.hs
Expand Up @@ -4,6 +4,7 @@ module Language.Marlowe.Runtime.Client.Transfer (
importIncremental,
exportContract,
exportIncremental,
BundlePart (..),
) where

import Data.Map (Map)
Expand All @@ -28,24 +29,37 @@ importBundle bundle =
, recvMsgUploaded = pure . SendMsgImported . SendMsgDone . Right
}

-- | A data structure that carries an object bundle and an indication of whether to expect more bundles.
data BundlePart
= IntermediatePart ObjectBundle
| FinalPart ObjectBundle

-- | Streams a multi-part object bundle into the Runtime. It will link the bundle, merkleize the contracts, and
-- save them to the store. Yields mappings of the original contract labels to their store hashes.
-- save them to the store. Yields mappings of the original contract labels to their store hashes. sending it a FinalPart
-- finalizes the import, and is necessary to actually import the bundle. If no FinalPart is sent, nothing will be
-- imported. The final mapping will be returned in the result, and not yielded.
importIncremental
:: (Functor m) => MarloweTransferClient (Pipe ObjectBundle (Map Label DatumHash) m) (Maybe ImportError)
:: (Functor m)
=> MarloweTransferClient (Pipe BundlePart (Map Label DatumHash) m) (Either ImportError (Map Label DatumHash))
importIncremental = MarloweTransferClient $ SendMsgStartImport <$> upload
where
upload = do
bundle <- await
case bundle of
ObjectBundle [] -> do
yield mempty
pure $ SendMsgImported $ SendMsgDone Nothing
_ ->
part <- await
case part of
FinalPart bundle ->
pure $
SendMsgUpload
bundle
ClientStUpload
{ recvMsgUploadFailed = pure . SendMsgDone . Left
, recvMsgUploaded = \hashes -> pure $ SendMsgImported $ SendMsgDone $ Right hashes
}
IntermediatePart bundle ->
pure $
SendMsgUpload
bundle
ClientStUpload
{ recvMsgUploadFailed = pure . SendMsgDone . Just
{ recvMsgUploadFailed = pure . SendMsgDone . Left
, recvMsgUploaded = \hashes -> do
yield hashes
upload
Expand Down
40 changes: 38 additions & 2 deletions marlowe-integration-tests/test/Language/Marlowe/Runtime/CliSpec.hs
Expand Up @@ -28,6 +28,8 @@ import qualified Data.Time.Clock.POSIX as POSIX
import Data.Void (Void)
import GHC.IO.Exception (ExitCode (ExitSuccess))
import qualified Language.Marlowe.Core.V1.Semantics.Types as V1
import Language.Marlowe.Object.Archive (packArchive)
import Language.Marlowe.Object.Types (LabelledObject (LabelledObject), ObjectType (ContractType), fromCoreContract)
import Language.Marlowe.Runtime.Cardano.Api (cardanoEraToAsType)
import qualified Language.Marlowe.Runtime.ChainSync.Api as ChainSync.Api
import Language.Marlowe.Runtime.Client (runMarloweTxClient)
Expand Down Expand Up @@ -59,7 +61,6 @@ import qualified Plutus.V2.Ledger.Api
import Test.Hspec (
Spec,
describe,
focus,
it,
shouldBe,
)
Expand Down Expand Up @@ -609,7 +610,42 @@ bugPLT6773 = do
stderr `shouldBe` ""
(code, actualContractJSON) `shouldBe` (ExitSuccess, Just expectedContract)

focus $ it "Marlowe runtime can load an exported contract" \CLISpecTestData{..} -> flip runIntegrationTest runtime do
it "Marlowe runtime can load a bundle archive" \CLISpecTestData{..} -> flip runIntegrationTest runtime do
workspace <- Reader.asks $ workspace . testnet
let contractHashRelation :: [(String, V1.Contract, Aeson.Value)]
contractHashRelation =
[
( "a5a461145b2621873bd8f23d6b1b2d511d07b5afabfff8cc24134a657c9fb23b"
, V1.Assert V1.TrueObs $ V1.Assert V1.TrueObs V1.Close
, Aeson.object
[ ("assert", Aeson.Bool True)
, ("then", Aeson.object [("assert", Aeson.Bool True), ("then", Aeson.String "close")])
]
)
]

for_ contractHashRelation \(expectedHash :: String, contract :: V1.Contract, expectedContract :: Aeson.Value) -> do
let archivePath = resolveWorkspacePath workspace "archive.zip"
packArchive archivePath "main" \writeObject -> do
writeObject $ LabelledObject "main" ContractType $ fromCoreContract contract

do
(code, stdout, stderr) <- Runtime.Integration.Common.execMarlowe' ["load", archivePath]

liftIO do
stderr `shouldBe` ""
(code, stdout) `shouldBe` (ExitSuccess, expectedHash ++ "\n")

do
(code, stdout, stderr) <- Runtime.Integration.Common.execMarlowe' ["query", "store", "contract", expectedHash]

let actualContractJSON :: Maybe Aeson.Value = Aeson.decode $ fromString stdout

liftIO do
stderr `shouldBe` ""
(code, actualContractJSON) `shouldBe` (ExitSuccess, Just expectedContract)

it "Marlowe runtime can load an exported contract" \CLISpecTestData{..} -> flip runIntegrationTest runtime do
workspace <- Reader.asks $ workspace . testnet
let contractHashRelation :: [(String, V1.Contract)]
contractHashRelation =
Expand Down
Expand Up @@ -49,6 +49,7 @@ import Language.Marlowe.Runtime.Client (
importIncremental,
runClientStreaming,
)
import Language.Marlowe.Runtime.Client.Transfer (BundlePart (..))
import qualified Language.Marlowe.Runtime.Contract as Contract
import Language.Marlowe.Runtime.Contract.Api (ContractWithAdjacency (adjacency), merkleizeInputs)
import qualified Language.Marlowe.Runtime.Contract.Api as Api
Expand All @@ -58,7 +59,7 @@ import Network.Protocol.Driver.Trace (HasSpanContext (..))
import Network.Protocol.Peer.Trace (defaultSpanContext)
import Network.Protocol.Query.Client (QueryClient, serveQueryClient)
import Network.TypedProtocol (unsafeIntToNat)
import Pipes (each, (>->))
import Pipes (each, yield, (>->))
import qualified Pipes.Internal as PI
import qualified Pipes.Prelude as P
import qualified Plutus.V2.Ledger.Api as PV2
Expand Down Expand Up @@ -233,11 +234,11 @@ transferSpec = do
actual <-
runContractTest do
P.fold (<>) mempty id $
each (ObjectBundle <$> chunks) >-> do
each (toParts $ ObjectBundle <$> chunks) >-> do
result <- runTransferIncremental importIncremental
case result of
Nothing -> pure ()
Just err -> throwIO $ userError $ "Failed to import contract incrementally: " <> show err
Left err -> throwIO $ userError $ "Failed to import contract incrementally: " <> show err
Right ids -> yield ids
expected `shouldBe` actual

describe "Export" do
Expand Down Expand Up @@ -274,6 +275,13 @@ transferSpec = do
unless success $ throwIO $ userError "Failed to export contract incrementally"
liftIO $ expected `shouldBe` actual

toParts :: [ObjectBundle] -> [BundlePart]
toParts = reverse . go . reverse
where
go :: [ObjectBundle] -> [BundlePart]
go [] = []
go (x : xs) = FinalPart x : (IntermediatePart <$> xs)

genChunks :: [a] -> Gen [[a]]
genChunks [] = pure []
genChunks as = do
Expand Down
Expand Up @@ -8,13 +8,12 @@ module Language.Marlowe.Runtime.CLI.Command.Load (
runLoadCommand,
) where

import Control.Monad (replicateM, unless)
import Control.Monad (replicateM, unless, (<=<))
import Control.Monad.Trans (lift)
import Control.Monad.Trans.Marlowe.Class (runMarloweTransferClient)
import Data.Aeson (eitherDecodeFileStrict)
import Data.Bifunctor (Bifunctor (..))
import Data.ByteString.Base16 (encodeBase16)
import Data.Foldable (traverse_)
import Data.Map (Map)
import qualified Data.Map as Map
import Data.Maybe (catMaybes)
import qualified Data.Text as T
Expand All @@ -27,13 +26,13 @@ import Language.Marlowe.Object.Types (
SomeObjectType (..),
fromCoreContract,
)
import Language.Marlowe.Protocol.Transfer.Types (ImportError)
import Language.Marlowe.Protocol.Transfer.Types (ImportError (..))
import Language.Marlowe.Runtime.CLI.Monad (CLI)
import Language.Marlowe.Runtime.ChainSync.Api (DatumHash (..))
import Language.Marlowe.Runtime.Client (importBundle, importIncremental)
import Language.Marlowe.Runtime.Client.Transfer (BundlePart (..))
import Options.Applicative (ParserInfo, flag, help, info, long, metavar, progDesc, short, strArgument)
import Pipes (Pipe, Producer, await, yield, (>->))
import qualified Pipes.Prelude as P
import Pipes (Producer, next, yield, (>->))
import System.Exit (die)
import UnliftIO (liftIO)
import UnliftIO.Directory (doesFileExist)
Expand All @@ -43,6 +42,10 @@ data LoadCommand = LoadCommand
, archivePath :: FilePath
}

data LoadError
= LoadReadArchiveError ReadArchiveError
| LoadImportError ImportError

loadCommandParser :: ParserInfo LoadCommand
loadCommandParser = info parser $ progDesc "Load a contract into the runtime"
where
Expand Down Expand Up @@ -74,18 +77,27 @@ runLoadCommand LoadCommand{..} = do
bundle <- readJSONFile archivePath
result <- runMarloweTransferClient $ importBundle bundle
case result of
Left err -> liftIO $ die $ "Failed to import bundle: " <> show err
Left err -> pure $ Left $ LoadImportError err
Right hashes -> pure $ pure $ Map.lookup "main" hashes
else unpackArchive archivePath \mainIs readObject ->
P.head $
bundles readObject
>-> (lift . handleError =<< runMarloweTransferClient importIncremental)
>-> collectMain mainIs
else
nestArchiveError <$> unpackArchive archivePath \mainIs readObject -> do
result <-
runProducer $
parts mainIs readObject >-> (first LoadImportError <$> runMarloweTransferClient importIncremental)
case result of
Left err -> pure $ Left err
Right hashes -> pure $ pure $ Map.lookup mainIs hashes
liftIO case result of
Left err -> die $ formatReadArchiveError err
Left err -> die $ formatLoadError err
Right Nothing -> die "Error: main not linked. This is a bug, please report it with the archive you were trying to load attached."
Right (Just mainHash) -> putStrLn $ T.unpack $ encodeBase16 $ unDatumHash mainHash

nestArchiveError :: Either ReadArchiveError (Either LoadError a) -> Either LoadError a
nestArchiveError = either (Left . LoadReadArchiveError) id

runProducer :: (Monad m) => Producer a m r -> m r
runProducer = either pure (runProducer . snd) <=< next

readJSONFile :: FilePath -> CLI ObjectBundle
readJSONFile path = do
result <- liftIO $ eitherDecodeFileStrict path
Expand All @@ -94,26 +106,27 @@ readJSONFile path = do
Right contract -> do
pure $ ObjectBundle $ pure $ LabelledObject "main" ContractType $ fromCoreContract contract

bundles :: CLI (Maybe LabelledObject) -> Producer ObjectBundle CLI ()
bundles readObject = do
objects <- lift $ catMaybes <$> replicateM 50 readObject
yield $ ObjectBundle objects
parts :: Label -> CLI (Maybe LabelledObject) -> Producer BundlePart CLI (Either LoadError a)
parts mainIs readObject = do
objects <- lift $ catMaybes <$> replicateM 512 readObject
case objects of
-- End if empty
[] -> pure ()
_ -> bundles readObject
[] -> pure $ Left $ LoadReadArchiveError MissingMain
_ -> do
yield
if any ((== mainIs) . _label) objects
then FinalPart $ ObjectBundle objects
else IntermediatePart $ ObjectBundle objects
parts mainIs readObject

handleError :: Maybe ImportError -> CLI ()
handleError = maybe (pure ()) (liftIO . die . ("Failed to import bundle: " <>) . show)
formatLoadError :: LoadError -> String
formatLoadError = \case
LoadReadArchiveError err -> formatReadArchiveError err
LoadImportError err -> formatImportError err

collectMain :: Label -> Pipe (Map Label DatumHash) DatumHash CLI ()
collectMain mainIs = readUntilEmpty Nothing
where
readUntilEmpty mMain = do
hashes <- await
if Map.null hashes
then traverse_ yield mMain
else readUntilEmpty $ Map.lookup mainIs hashes
formatImportError :: ImportError -> String
formatImportError = \case
ContinuationNotInStore hash -> "Referenced continuation hash not found in store: " <> show hash
LinkError err -> "Linking failed: " <> show err

formatReadArchiveError :: ReadArchiveError -> String
formatReadArchiveError = \case
Expand Down
Expand Up @@ -6,9 +6,7 @@ module Language.Marlowe.Runtime.Web.Server.ContractClient where

import Control.Arrow (arr)
import Control.Concurrent.Component
import Control.Monad.Except (ExceptT, MonadError (throwError))
import Control.Monad.IO.Unlift (MonadUnliftIO)
import Control.Monad.Trans.Class (lift)
import Control.Monad.Trans.Marlowe.Class (runClientStreaming)
import Data.List (find)
import Data.Map (Map)
Expand All @@ -24,18 +22,19 @@ import Language.Marlowe.Protocol.Client (MarloweRuntimeClient (..), hoistMarlowe
import Language.Marlowe.Protocol.Transfer.Types (ImportError (..))
import Language.Marlowe.Runtime.ChainSync.Api (DatumHash)
import Language.Marlowe.Runtime.Client (importIncremental)
import Language.Marlowe.Runtime.Client.Transfer (BundlePart (..))
import Language.Marlowe.Runtime.Contract.Api (ContractWithAdjacency)
import qualified Language.Marlowe.Runtime.Contract.Api as Contract
import Network.Protocol.Connection (Connector, runConnector)
import Pipes (MFunctor (..), Pipe, await, yield, (>->))
import Pipes (Pipe, await, yield, (>->))
import Unsafe.Coerce (unsafeCoerce)

newtype ContractClientDependencies m = ContractClientDependencies
{ connector :: Connector MarloweRuntimeClient m
}

-- | Signature for a delegate that imports a bundle into the runtime.
type ImportBundle m = Label -> Pipe ObjectBundle (Map Label DatumHash) (ExceptT ImportError m) ()
type ImportBundle m = Label -> Pipe ObjectBundle (Map Label DatumHash) m (Either ImportError (Map Label DatumHash))

-- | Signature for a delegate that gets a contract from the runtime.
type GetContract m = DatumHash -> m (Maybe ContractWithAdjacency)
Expand All @@ -50,30 +49,28 @@ contractClient :: (MonadUnliftIO m) => Component m (ContractClientDependencies m
contractClient = arr \ContractClientDependencies{..} ->
ContractClient
{ importBundle = \main ->
watchForMain main >-> do
result <-
hoist lift $
runClientStreaming
hoistMarloweRuntimeClient
(runConnector connector)
(RunMarloweTransferClient importIncremental)
case result of
Nothing -> pure ()
Just err -> throwError err
watchForMain main
>-> runClientStreaming
hoistMarloweRuntimeClient
(runConnector connector)
(RunMarloweTransferClient importIncremental)
, getContract = runConnector connector . RunContractQueryClient . Contract.getContract
}

watchForMain :: (Monad m) => Label -> Pipe ObjectBundle ObjectBundle (ExceptT ImportError m) ()
watchForMain :: (Monad m) => Label -> Pipe ObjectBundle BundlePart m (Either ImportError (Map Label DatumHash))
watchForMain main = do
ObjectBundle bundle <- await
yield $ ObjectBundle bundle
case find ((main ==) . _label) bundle of
Nothing -> watchForMain main
Just (LabelledObject _ ContractType _) ->
yield $ ObjectBundle [] -- send an empty bundle to indicate the end.
Nothing -> do
yield $ IntermediatePart $ ObjectBundle bundle
watchForMain main
Just (LabelledObject _ ContractType _) -> do
yield $ FinalPart $ ObjectBundle bundle
pure $ Right mempty
Just (LabelledObject _ t _) ->
throwError $
LinkError $
TypeMismatch
(UnsafeSomeObjectType $ unsafeCoerce ContractType)
(UnsafeSomeObjectType $ unsafeCoerce t)
pure $
Left $
LinkError $
TypeMismatch
(UnsafeSomeObjectType $ unsafeCoerce ContractType)
(UnsafeSomeObjectType $ unsafeCoerce t)
Expand Up @@ -18,7 +18,6 @@ import Control.Monad.Except (MonadError)
import Control.Monad.IO.Class (MonadIO, liftIO)
import Control.Monad.Reader (MonadReader (..), ReaderT (..), asks)
import Control.Monad.Trans.Control (MonadBaseControl)
import Control.Monad.Trans.Except (mapExceptT)
import Language.Marlowe.Runtime.ChainSync.Api (TxId)
import Language.Marlowe.Runtime.Core.Api (ContractId)
import Language.Marlowe.Runtime.Web.Server.ContractClient (GetContract, ImportBundle)
Expand Down Expand Up @@ -76,7 +75,7 @@ data AppEnv = forall r s.
importBundle :: ImportBundle ServerM
importBundle label = do
AppEnv{_eventBackend = backend, _importBundle = ib} <- ask
hoist (mapExceptT $ liftBackendM backend) $ ib label
hoist (liftBackendM backend) $ ib label

-- | Load a list of contract headers.
loadContractHeaders :: LoadContractHeaders ServerM
Expand Down

0 comments on commit d568002

Please sign in to comment.