diff --git a/use-case-2/backend/backend.cabal b/use-case-2/backend/backend.cabal index ec5e2bd34..b328e66b1 100644 --- a/use-case-2/backend/backend.cabal +++ b/use-case-2/backend/backend.cabal @@ -32,6 +32,7 @@ library , rhyolite-backend , rhyolite-backend-db , rhyolite-backend-notification-postgres + , rhyolite-common , scientific , statistics , text diff --git a/use-case-2/backend/src/Backend.hs b/use-case-2/backend/src/Backend.hs index 7c5743735..dba873681 100644 --- a/use-case-2/backend/src/Backend.hs +++ b/use-case-2/backend/src/Backend.hs @@ -17,7 +17,9 @@ import Data.Aeson.Lens import qualified Data.Aeson as Aeson import Data.ByteString (ByteString) import qualified Data.ByteString.Lazy.Char8 as BS +import Data.Dependent.Sum import Data.Int (Int32) +import qualified Data.Map as Map import Data.Maybe import Data.Pool import Data.Proxy @@ -42,6 +44,7 @@ import Rhyolite.Backend.App import Rhyolite.Backend.DB import Rhyolite.Backend.DB.Serializable import Rhyolite.Backend.Listen +import Rhyolite.Concurrent import Safe (lastMay) import Statistics.Regression @@ -62,8 +65,8 @@ backend = Backend httpManager <- newManager defaultManagerSettings withDb "db" $ \pool -> do withResource pool runMigrations - getWallets httpManager pool - getPooledTokens httpManager pool + stopSyncUniswapUsers <- worker (1000 * 1000 * 5) $ syncUniswapUsers httpManager pool + stopSyncPooledTokens <- worker (1000 * 1000 * 5) $ syncPooledTokens httpManager pool (handleListen, finalizeServeDb) <- serveDbOverWebsockets pool (requestHandler httpManager pool) @@ -71,7 +74,7 @@ backend = Backend (QueryHandler $ \q -> fmap (fromMaybe emptyV) $ mapDecomposedV (queryHandler pool) q) vesselFromWire vesselPipeline -- (tracePipeline "==> " . vesselPipeline) - flip finally finalizeServeDb $ serve $ \case + flip finally (stopSyncPooledTokens >> stopSyncUniswapUsers >> finalizeServeDb) $ serve $ \case BackendRoute_Listen :/ () -> handleListen _ -> return () , _backend_routeEncoder = fullRouteEncoder @@ -90,67 +93,83 @@ requestHandler httpManager pool = RequestHandler $ \case Api_EstimateTransactionFee action -> estimateTransactionFee pool action notifyHandler :: DbNotification Notification -> DexV Proxy -> IO (DexV Identity) -notifyHandler _ _ = return mempty +notifyHandler dbNotification _ = case _dbNotification_message dbNotification of + Notification_Contract :=> Identity contract -> do + return $ singletonV Q_ContractList $ IdentityV $ Identity $ + Map.singleton (_contract_walletId contract) $ First $ Just $ _contract_id contract + Notification_Pool :=> Identity pool -> do + return $ singletonV Q_Pools $ IdentityV $ Identity $ + Map.singleton (_pool_liquiditySymbol pool) $ First $ Just pool queryHandler :: Pool Pg.Connection -> DexV Proxy -> IO (DexV Identity) queryHandler pool v = buildV v $ \case -- Handle View to see list of available wallet contracts Q_ContractList -> \_ -> runNoLoggingT $ runDb (Identity pool) $ runBeamSerializable $ do contracts <- runSelectReturningList $ select $ all_ (_db_contracts db) - return $ IdentityV $ Identity $ First $ Just $ _contract_id <$> contracts + return $ IdentityV $ Identity $ Map.fromList $ + fmap (\c -> (_contract_walletId c, First $ Just $ _contract_id c)) contracts Q_PooledTokens -> \_ -> runNoLoggingT $ runDb (Identity pool) $ runBeamSerializable $ do pooledTokens <- runSelectReturningList $ select $ all_ (_db_pooledTokens db) return $ IdentityV $ Identity $ First $ Just $ pooledTokens + Q_Pools -> \_ -> runNoLoggingT $ runDb (Identity pool) $ runBeamSerializable $ do + pools <- runSelectReturningList $ select $ all_ (_db_pools db) + return $ IdentityV $ Identity $ Map.fromList $ flip fmap pools $ \p -> (_pool_liquiditySymbol p, First $ Just p) -getWallets :: Manager -> Pool Pg.Connection -> IO () -getWallets httpManager pool = do +-- Query for active instances from the PAB and upsert new UniswapUser instance ids. +syncUniswapUsers :: Manager -> Pool Pg.Connection -> IO () +syncUniswapUsers httpManager pool = do initReq <- parseRequest "http://localhost:8080/api/new/contract/instances" let req = initReq { method = "GET" } resp <- httpLbs req httpManager - let val = Aeson.eitherDecode (responseBody resp) :: Either String Aeson.Value + let val = Aeson.eitherDecode (responseBody resp) :: Either String [Aeson.Value] case val of Left err -> do - print $ "getWallets: failed to decode response body: " ++ err - return () - Right obj -> do - let contractInstanceIds = obj ^.. values . key "cicContract". key "unContractInstanceId" . _String - walletIds = obj ^.. values . key "cicWallet". key "getWallet" . _Integer - walletContracts = zipWith (\a b -> Contract a (fromIntegral b)) contractInstanceIds walletIds + putStrLn $ "getWallets: failed to decode response body: " ++ err + Right objs -> do + let walletContracts = flip mapMaybe objs $ \obj -> do + contractInstanceId <- obj ^? key "cicContract". key "unContractInstanceId" . _String + walletId <- obj ^? key "cicWallet". key "getWallet" . _Integer + definition <- obj ^? key "cicDefintion". key "tag" . _String + guard $ definition == "UniswapUser" + return $ Contract contractInstanceId (fromIntegral walletId) print $ "Wallet Ids persisted: " ++ show walletContracts -- DEBUG: Logging incoming wallets/contract ids -- Persist participating wallet addresses to Postgresql - runNoLoggingT $ runDb (Identity pool) $ runBeamSerializable $ do - runInsert $ insertOnConflict (_db_contracts db) (insertValues walletContracts) - (conflictingFields _contract_id) - onConflictDoNothing - return () + runNoLoggingT $ runDb (Identity pool) $ do + rows <- runBeamSerializable $ runInsertReturningList $ insertOnConflict (_db_contracts db) (insertValues walletContracts) + (conflictingFields primaryKey) + onConflictUpdateAll + mapM_ (notify NotificationType_Insert Notification_Contract) rows -getPooledTokens :: Manager -> Pool Pg.Connection -> IO () -getPooledTokens httpManager pool = do +syncPooledTokens :: Manager -> Pool Pg.Connection -> IO () +syncPooledTokens httpManager pool = do -- use admin wallet id to populate db with current pool tokens available mAdminWallet <- runNoLoggingT $ runDb (Identity pool) $ runBeamSerializable $ -- SELECT _contract_id FROM _db_contracts WHERE _contract_walletId =1; runSelectReturningOne $ select $ filter_ (\ct -> _contract_walletId ct ==. val_ 1) $ all_ (_db_contracts db) - case mAdminWallet of - Nothing -> do - print ("getPooledTokens: Admin user wallet not found" :: Text) - return () - Just wid -> do - -- In order to retreive list of pooled tokens, a request must be made to the pools endpoint first and then the response - -- can be found be found in instances within the observable state key - let prString = "http://localhost:8080/api/new/contract/instance/" ++ (T.unpack $ _contract_id wid) ++ "/endpoint/pools" - print $ "prString: " ++ prString -- DEBUG - poolReq <- parseRequest prString - let reqBody = "[]" - pReq = poolReq - { method = "POST" - , requestHeaders = ("Content-Type","application/json"):(requestHeaders poolReq) - , requestBody = RequestBodyLBS reqBody - } - _ <- httpLbs pReq httpManager - return () + wid <- case mAdminWallet of + Nothing -> fail "getPooledTokens: Admin user wallet not found" + Just wid -> return wid + + -- In order to retreive list of pooled tokens, a request must be made to the pools endpoint first and then the response + -- can be found be found in instances within the observable state key + let contractInstanceId = T.unpack $ _contract_id wid + prString = "http://localhost:8080/api/new/contract/instance/" <> contractInstanceId <> "/endpoint/pools" + print $ "prString: " ++ prString -- DEBUG + poolReq <- parseRequest prString + let reqBody = "[]" + pReq = poolReq + { method = "POST" + , requestHeaders = ("Content-Type","application/json"):(requestHeaders poolReq) + , requestBody = RequestBodyLBS reqBody + } + _ <- httpLbs pReq httpManager + -- This delay is necessary to give the chain 1 second to process the previous request and update the observable state threadDelay 1000000 - initReq <- parseRequest "http://localhost:8080/api/new/contract/instances" + putStrLn $ "http://localhost:8080/api/new/contract/instance/" <> contractInstanceId <> "/status" + initReq <- parseRequest $ "http://localhost:8080/api/new/contract/instance/" <> contractInstanceId <> "/status" + + let req = initReq { method = "GET" } resp <- httpLbs req httpManager let val = Aeson.eitherDecode (responseBody resp) :: Either String Aeson.Value @@ -160,19 +179,39 @@ getPooledTokens httpManager pool = do return () Right obj -> do -- aeson-lens happened here in order to get currency symbols and token names from json - let objList = obj ^.. - values . key "cicCurrentState". key "observableState" . key "Right" . key "contents" . values . values . _Array - tokenInfo = (V.! 0) <$> objList - currencySymbols = tokenInfo ^.. traverse . key "unAssetClass" . values . key "unCurrencySymbol" . _String - tokenNames = tokenInfo ^.. traverse . key "unAssetClass" . values . key "unTokenName" . _String - pooledTokens = zipWith (\a b -> PooledToken a b) currencySymbols tokenNames + let tokenInfo :: Maybe [((Aeson.Value, Int32), (Aeson.Value, Int32), (Aeson.Value, Int32))] + tokenInfo = obj ^? key "cicCurrentState". key "observableState" . key "Right" . key "contents" . _Value . _JSON + -- currencySymbols = tokenInfo ^.. traverse . key "unAssetClass" . values . key "unCurrencySymbol" . _String + -- tokenNames = tokenInfo ^.. traverse . key "unAssetClass" . values . key "unTokenName" . _String + pooledTokens :: [PooledToken] + pooledTokens = [] + + pools :: [LPool] + pools = flip mapMaybe (fromMaybe mempty tokenInfo) $ \((tokenA, amountA), (tokenB, amountB), (lp, amountLp)) -> do + let curSymbol = key "unAssetClass" . nth 0 . key "unCurrencySymbol" . _String + tokenASymbol <- tokenA ^? curSymbol + tokenBSymbol <- tokenB ^? curSymbol + lpSymbol <- lp ^? key "unTokenName" . _String + return $ Pool + { _pool_tokenASymbol = tokenASymbol + , _pool_tokenBSymbol = tokenBSymbol + , _pool_tokenAAmount = amountA + , _pool_tokenBAmount = amountB + , _pool_liquiditySymbol = lpSymbol + , _pool_liquidityAmount = amountLp + } + putStrLn $ "Pools: " <> show pools print $ "Pool tokens persisted: " ++ show pooledTokens -- DEBUG: Logging incoming pooled tokens -- Persist current state of pool tokens to Postgresql - runNoLoggingT $ runDb (Identity pool) $ runBeamSerializable $ do - runInsert $ insertOnConflict (_db_pooledTokens db) (insertValues pooledTokens) - (conflictingFields primaryKey) - onConflictDoNothing - return () + runNoLoggingT $ runDb (Identity pool) $ do + rows <- runBeamSerializable $ do + runInsert $ insertOnConflict (_db_pooledTokens db) (insertValues pooledTokens) + (conflictingFields primaryKey) + onConflictDoNothing + runInsertReturningList $ insertOnConflict (_db_pools db) (insertValues pools) + (conflictingFields primaryKey) + onConflictDoNothing -- FIXME + mapM_ (notify NotificationType_Insert Notification_Pool) rows return () -- This function's is modeled after the following curl that submits a request to perform a swap against PAB. diff --git a/use-case-2/backend/src/Backend/Notification.hs b/use-case-2/backend/src/Backend/Notification.hs index 8918c5fe0..df06b96b6 100644 --- a/use-case-2/backend/src/Backend/Notification.hs +++ b/use-case-2/backend/src/Backend/Notification.hs @@ -12,12 +12,12 @@ import Data.Aeson import Data.Aeson.GADT.TH import Data.Constraint.Extras.TH import Data.GADT.Show.TH -import Data.Int --- import Common.Schema +import Common.Schema data Notification :: * -> * where - Notification_Counter :: Notification Int32 + Notification_Contract :: Notification Contract + Notification_Pool :: Notification LPool deriveJSONGADT ''Notification deriveArgDict ''Notification diff --git a/use-case-2/common/common.cabal b/use-case-2/common/common.cabal index 453f4c4f4..df57341b5 100644 --- a/use-case-2/common/common.cabal +++ b/use-case-2/common/common.cabal @@ -10,6 +10,7 @@ library , base , beam-core , categories + , containers , constraints-extras , dependent-sum-template , mtl diff --git a/use-case-2/common/src/Common/Api.hs b/use-case-2/common/src/Common/Api.hs index 3dbd3f705..36d9c0b73 100644 --- a/use-case-2/common/src/Common/Api.hs +++ b/use-case-2/common/src/Common/Api.hs @@ -13,6 +13,8 @@ import Data.Aeson.GADT.TH import Data.Constraint.Extras.TH import Data.GADT.Compare.TH import Data.GADT.Show.TH +import Data.Int +import Data.Map (Map) import Data.Semigroup (First(..)) import Data.Text (Text) import Data.Vessel @@ -34,8 +36,9 @@ type DexV = Vessel Q -- Note: This is view data Q (v :: (* -> *) -> *) where - Q_ContractList :: Q (IdentityV (First (Maybe [Text]))) + Q_ContractList :: Q (IdentityV (Map Int32 (First (Maybe Text)))) Q_PooledTokens :: Q (IdentityV (First (Maybe [PooledToken]))) + Q_Pools :: Q (IdentityV (Map Text (First (Maybe LPool)))) data Api :: * -> * where Api_Swap :: ContractInstanceId Text -> Coin AssetClass -> Coin AssetClass -> Amount Integer -> Amount Integer -> Api (Either String Aeson.Value) diff --git a/use-case-2/common/src/Common/Schema.hs b/use-case-2/common/src/Common/Schema.hs index 0be062692..c29106f4e 100644 --- a/use-case-2/common/src/Common/Schema.hs +++ b/use-case-2/common/src/Common/Schema.hs @@ -19,6 +19,7 @@ data Db f = Db { _db_contracts :: f (TableEntity ContractT) , _db_pooledTokens :: f (TableEntity PooledTokenT) , _db_txFeeDataSet :: f (TableEntity TxFeeDataSetT) + , _db_pools :: f (TableEntity PoolT) } deriving (Generic, Database be) @@ -46,14 +47,26 @@ data TxFeeDataSetT f = TxFeeDataSet } deriving (Generic) +-- Liquidity Pool +data PoolT f = Pool + { _pool_tokenASymbol :: Columnar f Text + , _pool_tokenBSymbol :: Columnar f Text + , _pool_tokenAAmount :: Columnar f Int32 + , _pool_tokenBAmount :: Columnar f Int32 + , _pool_liquiditySymbol :: Columnar f Text + , _pool_liquidityAmount :: Columnar f Int32 + } + deriving (Generic) + instance Beamable ContractT instance Beamable PooledTokenT instance Beamable TxFeeDataSetT +instance Beamable PoolT instance Table ContractT where - newtype PrimaryKey ContractT f = ContractId { _contractId_id :: Columnar f Text } + newtype PrimaryKey ContractT f = ContractId { _contractId_id :: Columnar f Int32 } deriving (Generic) - primaryKey = ContractId . _contract_id + primaryKey = ContractId . _contract_walletId instance Table PooledTokenT where data PrimaryKey PooledTokenT f = PooledTokenId { _pooledTokenId_symbol :: Columnar f Text, _pooledTokenId_name :: Columnar f Text } @@ -65,13 +78,20 @@ instance Table TxFeeDataSetT where deriving (Generic) primaryKey = TxFeeDataSetId . _txFeeDataSet_id +instance Table PoolT where + newtype PrimaryKey PoolT f = LiquiditySymbol { _liquiditySymbol_id :: Columnar f Text } + deriving (Generic) + primaryKey = LiquiditySymbol . _pool_liquiditySymbol + instance Beamable (PrimaryKey ContractT) instance Beamable (PrimaryKey PooledTokenT) instance Beamable (PrimaryKey TxFeeDataSetT) +instance Beamable (PrimaryKey PoolT) type Contract = ContractT Identity type PooledToken = PooledTokenT Identity type TxFeeData = TxFeeDataSetT Identity +type LPool = PoolT Identity deriving instance Show Contract deriving instance Eq Contract @@ -88,3 +108,8 @@ deriving instance Show TxFeeData deriving instance Eq TxFeeData deriving instance FromJSON TxFeeData deriving instance ToJSON TxFeeData + +deriving instance Show LPool +deriving instance Eq LPool +deriving instance FromJSON LPool +deriving instance ToJSON LPool diff --git a/use-case-2/frontend/src/Frontend/ChooseWallet.hs b/use-case-2/frontend/src/Frontend/ChooseWallet.hs index 1b073f707..6354d7120 100644 --- a/use-case-2/frontend/src/Frontend/ChooseWallet.hs +++ b/use-case-2/frontend/src/Frontend/ChooseWallet.hs @@ -22,6 +22,7 @@ import Control.Applicative import Control.Lens import Control.Monad import Control.Monad.IO.Class (MonadIO) +import qualified Data.Map as Map import Data.Semigroup (First(..)) import Data.Text (Text) import Data.Vessel @@ -57,10 +58,7 @@ chooseWallet = do dyn_ $ ffor dmmWalletIds $ \case Nothing -> do text "There are no wallets avaiable" - Just mWalletIds -> case mWalletIds of - Nothing -> do - text "There are no wallets avaiable" - Just walletIds -> do + Just walletIds -> do elClass "ul" "list-group" $ do forM_ walletIds $ \wid -> do (e,_) <- elAttr' "li" ("class" =: "list-group-item list-group-item-dark" <> "style" =: "cursor:pointer") $ text wid @@ -70,5 +68,5 @@ viewContracts :: ( MonadQuery t (Vessel Q (Const SelectedCount)) m , Reflex t ) - => m (Dynamic t (Maybe (Maybe [Text]))) -viewContracts = (fmap.fmap.fmap) (getFirst . runIdentity) $ queryViewMorphism 1 $ constDyn $ vessel Q_ContractList . identityV + => m (Dynamic t (Maybe ([Text]))) +viewContracts = (fmap.fmap.fmap) (Map.elems . Map.mapMaybe getFirst . runIdentity) $ queryViewMorphism 1 $ constDyn $ vessel Q_ContractList . identityV