Skip to content

Commit

Permalink
Plutus streaming PoC
Browse files Browse the repository at this point in the history
  • Loading branch information
andreabedini authored and raduom committed May 21, 2022
1 parent 37c54aa commit 6b5d107
Show file tree
Hide file tree
Showing 7 changed files with 414 additions and 0 deletions.
1 change: 1 addition & 0 deletions cabal.project
Expand Up @@ -16,6 +16,7 @@ packages: doc
plutus-playground-server
plutus-script-utils
plutus-use-cases
plutus-streaming
quickcheck-dynamic
web-ghc

Expand Down
5 changes: 5 additions & 0 deletions plutus-streaming/CHANGELOG.md
@@ -0,0 +1,5 @@
# Revision history for plutus-streaming

## 0.1.0.0 -- YYYY-mm-dd

* First version. Released on an unsuspecting world.
29 changes: 29 additions & 0 deletions plutus-streaming/README.md
@@ -0,0 +1,29 @@
# Plutus Streaming PoC

Just testing ideas on a streaming interface.

# CLI

You can run the CLI in many ways:

From Genesis

```
$ cabal run -- main --socket-path /tmp/node.socket
```

Passing a starting point

```
$ cabal run -- main --socket-path /tmp/node.socket --slot-no 53427524 --block-hash 5e2bde4e504a9888a4f218dafc79a7619083f97d48684fcdba9dc78190df8f99
```

Running different examples

```
$ cabal run -- main --socket-path /tmp/node.socket --example HowManyBlocksBeforeRollback
```

```
$ cabal run -- main --socket-path /tmp/node.socket --example HowManyBlocksBeforeRollbackImpurely
```
155 changes: 155 additions & 0 deletions plutus-streaming/app/Main.hs
@@ -0,0 +1,155 @@
{-# LANGUAGE NamedFieldPuns #-}

module Main where

import Cardano.Api
import Cardano.Api.Extras ()
import Data.Maybe qualified as Maybe
import Options.Applicative
import Plutus.ChainIndex.Tx
import Plutus.Contract.CardanoAPI
import Plutus.Streaming
import Streaming
import Streaming.Prelude qualified as S

--
-- Options parsing
--

data Example
= JustBlocks
| HowManyBlocksBeforeRollback
| HowManyBlocksBeforeRollbackImpure
| ComposePureAndImpure
deriving (Show, Read)

data Options = Options
{ optionsSocketPath :: String,
optionsChainPoint :: ChainPoint,
optionsExample :: Example
}
deriving (Show)

optionsParser :: Parser Options
optionsParser =
Options
<$> strOption (long "socket-path" <> help "Node socket path")
<*> chainPointParser
<*> option auto (long "example" <> value JustBlocks)

chainPointParser :: Parser ChainPoint
chainPointParser =
pure ChainPointAtGenesis
<|> ( ChainPoint
<$> option (SlotNo <$> auto) (long "slot-no" <> metavar "SLOT-NO")
<*> option str (long "block-hash" <> metavar "BLOCK-HASH")
)

--
-- Example consumers
--

justBlocks ::
Stream (Of ChainSyncEvent) IO () ->
Stream (Of (Either FromCardanoError [ChainIndexTx])) IO ()
justBlocks =
--
-- read the following back to front
--

-- format
S.map fromCardanoBlock
-- filter out rollbacks (Do we have optics?)
. S.catMaybes
. S.map (\case RollForward bim _ -> Just bim; _ -> Nothing)
-- take 10 blocks
. S.take 10

howManyBlocksBeforeRollback ::
Stream (Of ChainSyncEvent) IO () ->
Stream (Of Int) IO ()
howManyBlocksBeforeRollback =
S.scan
( \acc ->
\case
RollForward _ _ -> acc + 1
RollBackward _ _ -> acc
)
0
id
. S.take 100

howManyBlocksBeforeRollbackImpure ::
Stream (Of ChainSyncEvent) IO () ->
Stream (Of Int) IO ()
howManyBlocksBeforeRollbackImpure =
S.scanM
( \acc ->
\case
RollForward _ _ ->
pure $ acc + 1
RollBackward _ _ -> do
putStrLn $ "Rollback after " ++ show acc ++ " blocks"
pure acc
)
(pure 0)
pure
. S.take 100

--
-- Ah! This doesn't do what I meant and I think for good reasons
--
composePureAndImpure ::
Stream (Of ChainSyncEvent) IO () ->
Stream (Of (Int, Int)) IO ()
composePureAndImpure s =
S.zip
(howManyBlocksBeforeRollback s)
(howManyBlocksBeforeRollbackImpure s)

--
-- Main
--

main :: IO ()
main = do
Options {optionsSocketPath, optionsChainPoint, optionsExample} <-
execParser $
info
(optionsParser <**> helper)
( fullDesc
<> progDesc "Print a greeting for TARGET"
<> header "hello - a test for optparse-applicative"
)

withChainSyncEventStream
optionsSocketPath
Mainnet
optionsChainPoint
$ case optionsExample of
JustBlocks -> S.print . justBlocks
HowManyBlocksBeforeRollback -> S.print . howManyBlocksBeforeRollback
HowManyBlocksBeforeRollbackImpure -> S.print . howManyBlocksBeforeRollbackImpure
ComposePureAndImpure -> S.print . composePureAndImpure

--
-- Utilities for development
--

nthBlock :: Int -> IO (BlockInMode CardanoMode)
nthBlock = nthBlockAt ChainPointAtGenesis

nthBlockAt :: ChainPoint -> Int -> IO (BlockInMode CardanoMode)
nthBlockAt point n = do
withChainSyncEventStream
"/tmp/node.socket"
Mainnet
point
( fmap Maybe.fromJust
. S.head_
. S.drop n
. S.catMaybes
. S.drop n
. S.map (\case RollForward bim _ -> Just bim; _ -> Nothing)
)

56 changes: 56 additions & 0 deletions plutus-streaming/plutus-streaming.cabal
@@ -0,0 +1,56 @@
cabal-version: 2.4
name: plutus-streaming
version: 0.1.0.0
author: Andrea Bedini
maintainer: andrea@andreabedini.com
extra-source-files: CHANGELOG.md

common lang
default-language: Haskell2010
default-extensions:
DeriveFoldable
DeriveFunctor
DeriveGeneric
DeriveLift
DeriveTraversable
ExplicitForAll
GeneralizedNewtypeDeriving
ImportQualifiedPost
LambdaCase
ScopedTypeVariables
StandaloneDeriving
ghc-options: -Wall -Wnoncanonical-monad-instances -Wunused-packages
-Wincomplete-uni-patterns -Wincomplete-record-updates
-Wredundant-constraints -Widentities

library
import: lang
exposed-modules:
Cardano.Api.Extras
Plutus.Streaming
build-depends:
base >=4.9 && <5,
base16-bytestring,
bytestring,
async,
cardano-api,
streaming,
hs-source-dirs: src
default-language: Haskell2010

executable main
import: lang
main-is: Main.hs
build-depends:
plutus-streaming,
base >=4.9 && <5,
aeson,
bytestring,
cardano-api,
optparse-applicative,
plutus-chain-index-core,
-- pretty-simple,
streaming,

hs-source-dirs: app
default-language: Haskell2010
23 changes: 23 additions & 0 deletions plutus-streaming/src/Cardano/Api/Extras.hs
@@ -0,0 +1,23 @@
{-# LANGUAGE FlexibleInstances #-}

module Cardano.Api.Extras where

import Cardano.Api
import Data.ByteString.Base16 qualified as Base16
import Data.ByteString.Char8 qualified as C8
import Data.Proxy
import Data.String

-- FIXME orphan instance
-- https://github.com/input-output-hk/cardano-node/pull/3608
instance IsString (Hash BlockHeader) where
fromString = either error id . deserialiseFromRawBytesBase16 . C8.pack
where
deserialiseFromRawBytesBase16 str =
case Base16.decode str of
Right raw -> case deserialiseFromRawBytes ttoken raw of
Just x -> Right x
Nothing -> Left ("cannot deserialise " ++ show str)
Left msg -> Left ("invalid hex " ++ show str ++ ", " ++ msg)
where
ttoken = proxyToAsType (Proxy :: Proxy a)

0 comments on commit 6b5d107

Please sign in to comment.