Skip to content

Commit

Permalink
Merge remote-tracking branch 'obsidian/fix-backend-start' into to-ups…
Browse files Browse the repository at this point in the history
…tream/obsidian-systems/dex
  • Loading branch information
Ericson2314 committed Sep 24, 2021
2 parents 862ca6d + aa4a914 commit e7d4852
Show file tree
Hide file tree
Showing 7 changed files with 130 additions and 63 deletions.
1 change: 1 addition & 0 deletions use-case-2/backend/backend.cabal
Expand Up @@ -32,6 +32,7 @@ library
, rhyolite-backend
, rhyolite-backend-db
, rhyolite-backend-notification-postgres
, rhyolite-common
, scientific
, statistics
, text
Expand Down
141 changes: 90 additions & 51 deletions use-case-2/backend/src/Backend.hs
Expand Up @@ -17,7 +17,9 @@ import Data.Aeson.Lens
import qualified Data.Aeson as Aeson
import Data.ByteString (ByteString)
import qualified Data.ByteString.Lazy.Char8 as BS
import Data.Dependent.Sum
import Data.Int (Int32)
import qualified Data.Map as Map
import Data.Maybe
import Data.Pool
import Data.Proxy
Expand All @@ -42,6 +44,7 @@ import Rhyolite.Backend.App
import Rhyolite.Backend.DB
import Rhyolite.Backend.DB.Serializable
import Rhyolite.Backend.Listen
import Rhyolite.Concurrent
import Safe (lastMay)
import Statistics.Regression

Expand All @@ -62,16 +65,16 @@ backend = Backend
httpManager <- newManager defaultManagerSettings
withDb "db" $ \pool -> do
withResource pool runMigrations
getWallets httpManager pool
getPooledTokens httpManager pool
stopSyncUniswapUsers <- worker (1000 * 1000 * 5) $ syncUniswapUsers httpManager pool
stopSyncPooledTokens <- worker (1000 * 1000 * 5) $ syncPooledTokens httpManager pool
(handleListen, finalizeServeDb) <- serveDbOverWebsockets
pool
(requestHandler httpManager pool)
(\(nm :: DbNotification Notification) q -> fmap (fromMaybe emptyV) $ mapDecomposedV (notifyHandler nm) q)
(QueryHandler $ \q -> fmap (fromMaybe emptyV) $ mapDecomposedV (queryHandler pool) q)
vesselFromWire
vesselPipeline -- (tracePipeline "==> " . vesselPipeline)
flip finally finalizeServeDb $ serve $ \case
flip finally (stopSyncPooledTokens >> stopSyncUniswapUsers >> finalizeServeDb) $ serve $ \case
BackendRoute_Listen :/ () -> handleListen
_ -> return ()
, _backend_routeEncoder = fullRouteEncoder
Expand All @@ -90,67 +93,83 @@ requestHandler httpManager pool = RequestHandler $ \case
Api_EstimateTransactionFee action -> estimateTransactionFee pool action

notifyHandler :: DbNotification Notification -> DexV Proxy -> IO (DexV Identity)
notifyHandler _ _ = return mempty
notifyHandler dbNotification _ = case _dbNotification_message dbNotification of
Notification_Contract :=> Identity contract -> do
return $ singletonV Q_ContractList $ IdentityV $ Identity $
Map.singleton (_contract_walletId contract) $ First $ Just $ _contract_id contract
Notification_Pool :=> Identity pool -> do
return $ singletonV Q_Pools $ IdentityV $ Identity $
Map.singleton (_pool_liquiditySymbol pool) $ First $ Just pool

queryHandler :: Pool Pg.Connection -> DexV Proxy -> IO (DexV Identity)
queryHandler pool v = buildV v $ \case
-- Handle View to see list of available wallet contracts
Q_ContractList -> \_ -> runNoLoggingT $ runDb (Identity pool) $ runBeamSerializable $ do
contracts <- runSelectReturningList $ select $ all_ (_db_contracts db)
return $ IdentityV $ Identity $ First $ Just $ _contract_id <$> contracts
return $ IdentityV $ Identity $ Map.fromList $
fmap (\c -> (_contract_walletId c, First $ Just $ _contract_id c)) contracts
Q_PooledTokens -> \_ -> runNoLoggingT $ runDb (Identity pool) $ runBeamSerializable $ do
pooledTokens <- runSelectReturningList $ select $ all_ (_db_pooledTokens db)
return $ IdentityV $ Identity $ First $ Just $ pooledTokens
Q_Pools -> \_ -> runNoLoggingT $ runDb (Identity pool) $ runBeamSerializable $ do
pools <- runSelectReturningList $ select $ all_ (_db_pools db)
return $ IdentityV $ Identity $ Map.fromList $ flip fmap pools $ \p -> (_pool_liquiditySymbol p, First $ Just p)

getWallets :: Manager -> Pool Pg.Connection -> IO ()
getWallets httpManager pool = do
-- Query for active instances from the PAB and upsert new UniswapUser instance ids.
syncUniswapUsers :: Manager -> Pool Pg.Connection -> IO ()
syncUniswapUsers httpManager pool = do
initReq <- parseRequest "http://localhost:8080/api/new/contract/instances"
let req = initReq { method = "GET" }
resp <- httpLbs req httpManager
let val = Aeson.eitherDecode (responseBody resp) :: Either String Aeson.Value
let val = Aeson.eitherDecode (responseBody resp) :: Either String [Aeson.Value]
case val of
Left err -> do
print $ "getWallets: failed to decode response body: " ++ err
return ()
Right obj -> do
let contractInstanceIds = obj ^.. values . key "cicContract". key "unContractInstanceId" . _String
walletIds = obj ^.. values . key "cicWallet". key "getWallet" . _Integer
walletContracts = zipWith (\a b -> Contract a (fromIntegral b)) contractInstanceIds walletIds
putStrLn $ "getWallets: failed to decode response body: " ++ err
Right objs -> do
let walletContracts = flip mapMaybe objs $ \obj -> do
contractInstanceId <- obj ^? key "cicContract". key "unContractInstanceId" . _String
walletId <- obj ^? key "cicWallet". key "getWallet" . _Integer
definition <- obj ^? key "cicDefintion". key "tag" . _String
guard $ definition == "UniswapUser"
return $ Contract contractInstanceId (fromIntegral walletId)
print $ "Wallet Ids persisted: " ++ show walletContracts -- DEBUG: Logging incoming wallets/contract ids
-- Persist participating wallet addresses to Postgresql
runNoLoggingT $ runDb (Identity pool) $ runBeamSerializable $ do
runInsert $ insertOnConflict (_db_contracts db) (insertValues walletContracts)
(conflictingFields _contract_id)
onConflictDoNothing
return ()
runNoLoggingT $ runDb (Identity pool) $ do
rows <- runBeamSerializable $ runInsertReturningList $ insertOnConflict (_db_contracts db) (insertValues walletContracts)
(conflictingFields primaryKey)
onConflictUpdateAll
mapM_ (notify NotificationType_Insert Notification_Contract) rows

getPooledTokens :: Manager -> Pool Pg.Connection -> IO ()
getPooledTokens httpManager pool = do
syncPooledTokens :: Manager -> Pool Pg.Connection -> IO ()
syncPooledTokens httpManager pool = do
-- use admin wallet id to populate db with current pool tokens available
mAdminWallet <- runNoLoggingT $ runDb (Identity pool) $ runBeamSerializable $
-- SELECT _contract_id FROM _db_contracts WHERE _contract_walletId =1;
runSelectReturningOne $ select $ filter_ (\ct -> _contract_walletId ct ==. val_ 1) $ all_ (_db_contracts db)
case mAdminWallet of
Nothing -> do
print ("getPooledTokens: Admin user wallet not found" :: Text)
return ()
Just wid -> do
-- In order to retreive list of pooled tokens, a request must be made to the pools endpoint first and then the response
-- can be found be found in instances within the observable state key
let prString = "http://localhost:8080/api/new/contract/instance/" ++ (T.unpack $ _contract_id wid) ++ "/endpoint/pools"
print $ "prString: " ++ prString -- DEBUG
poolReq <- parseRequest prString
let reqBody = "[]"
pReq = poolReq
{ method = "POST"
, requestHeaders = ("Content-Type","application/json"):(requestHeaders poolReq)
, requestBody = RequestBodyLBS reqBody
}
_ <- httpLbs pReq httpManager
return ()
wid <- case mAdminWallet of
Nothing -> fail "getPooledTokens: Admin user wallet not found"
Just wid -> return wid

-- In order to retreive list of pooled tokens, a request must be made to the pools endpoint first and then the response
-- can be found be found in instances within the observable state key
let contractInstanceId = T.unpack $ _contract_id wid
prString = "http://localhost:8080/api/new/contract/instance/" <> contractInstanceId <> "/endpoint/pools"
print $ "prString: " ++ prString -- DEBUG
poolReq <- parseRequest prString
let reqBody = "[]"
pReq = poolReq
{ method = "POST"
, requestHeaders = ("Content-Type","application/json"):(requestHeaders poolReq)
, requestBody = RequestBodyLBS reqBody
}
_ <- httpLbs pReq httpManager

-- This delay is necessary to give the chain 1 second to process the previous request and update the observable state
threadDelay 1000000
initReq <- parseRequest "http://localhost:8080/api/new/contract/instances"
putStrLn $ "http://localhost:8080/api/new/contract/instance/" <> contractInstanceId <> "/status"
initReq <- parseRequest $ "http://localhost:8080/api/new/contract/instance/" <> contractInstanceId <> "/status"


let req = initReq { method = "GET" }
resp <- httpLbs req httpManager
let val = Aeson.eitherDecode (responseBody resp) :: Either String Aeson.Value
Expand All @@ -160,19 +179,39 @@ getPooledTokens httpManager pool = do
return ()
Right obj -> do
-- aeson-lens happened here in order to get currency symbols and token names from json
let objList = obj ^..
values . key "cicCurrentState". key "observableState" . key "Right" . key "contents" . values . values . _Array
tokenInfo = (V.! 0) <$> objList
currencySymbols = tokenInfo ^.. traverse . key "unAssetClass" . values . key "unCurrencySymbol" . _String
tokenNames = tokenInfo ^.. traverse . key "unAssetClass" . values . key "unTokenName" . _String
pooledTokens = zipWith (\a b -> PooledToken a b) currencySymbols tokenNames
let tokenInfo :: Maybe [((Aeson.Value, Int32), (Aeson.Value, Int32), (Aeson.Value, Int32))]
tokenInfo = obj ^? key "cicCurrentState". key "observableState" . key "Right" . key "contents" . _Value . _JSON
-- currencySymbols = tokenInfo ^.. traverse . key "unAssetClass" . values . key "unCurrencySymbol" . _String
-- tokenNames = tokenInfo ^.. traverse . key "unAssetClass" . values . key "unTokenName" . _String
pooledTokens :: [PooledToken]
pooledTokens = []

pools :: [LPool]
pools = flip mapMaybe (fromMaybe mempty tokenInfo) $ \((tokenA, amountA), (tokenB, amountB), (lp, amountLp)) -> do
let curSymbol = key "unAssetClass" . nth 0 . key "unCurrencySymbol" . _String
tokenASymbol <- tokenA ^? curSymbol
tokenBSymbol <- tokenB ^? curSymbol
lpSymbol <- lp ^? key "unTokenName" . _String
return $ Pool
{ _pool_tokenASymbol = tokenASymbol
, _pool_tokenBSymbol = tokenBSymbol
, _pool_tokenAAmount = amountA
, _pool_tokenBAmount = amountB
, _pool_liquiditySymbol = lpSymbol
, _pool_liquidityAmount = amountLp
}
putStrLn $ "Pools: " <> show pools
print $ "Pool tokens persisted: " ++ show pooledTokens -- DEBUG: Logging incoming pooled tokens
-- Persist current state of pool tokens to Postgresql
runNoLoggingT $ runDb (Identity pool) $ runBeamSerializable $ do
runInsert $ insertOnConflict (_db_pooledTokens db) (insertValues pooledTokens)
(conflictingFields primaryKey)
onConflictDoNothing
return ()
runNoLoggingT $ runDb (Identity pool) $ do
rows <- runBeamSerializable $ do
runInsert $ insertOnConflict (_db_pooledTokens db) (insertValues pooledTokens)
(conflictingFields primaryKey)
onConflictDoNothing
runInsertReturningList $ insertOnConflict (_db_pools db) (insertValues pools)
(conflictingFields primaryKey)
onConflictDoNothing -- FIXME
mapM_ (notify NotificationType_Insert Notification_Pool) rows
return ()

-- This function's is modeled after the following curl that submits a request to perform a swap against PAB.
Expand Down
6 changes: 3 additions & 3 deletions use-case-2/backend/src/Backend/Notification.hs
Expand Up @@ -12,12 +12,12 @@ import Data.Aeson
import Data.Aeson.GADT.TH
import Data.Constraint.Extras.TH
import Data.GADT.Show.TH
import Data.Int

-- import Common.Schema
import Common.Schema

data Notification :: * -> * where
Notification_Counter :: Notification Int32
Notification_Contract :: Notification Contract
Notification_Pool :: Notification LPool

deriveJSONGADT ''Notification
deriveArgDict ''Notification
Expand Down
1 change: 1 addition & 0 deletions use-case-2/common/common.cabal
Expand Up @@ -10,6 +10,7 @@ library
, base
, beam-core
, categories
, containers
, constraints-extras
, dependent-sum-template
, mtl
Expand Down
5 changes: 4 additions & 1 deletion use-case-2/common/src/Common/Api.hs
Expand Up @@ -13,6 +13,8 @@ import Data.Aeson.GADT.TH
import Data.Constraint.Extras.TH
import Data.GADT.Compare.TH
import Data.GADT.Show.TH
import Data.Int
import Data.Map (Map)
import Data.Semigroup (First(..))
import Data.Text (Text)
import Data.Vessel
Expand All @@ -34,8 +36,9 @@ type DexV = Vessel Q

-- Note: This is view
data Q (v :: (* -> *) -> *) where
Q_ContractList :: Q (IdentityV (First (Maybe [Text])))
Q_ContractList :: Q (IdentityV (Map Int32 (First (Maybe Text))))
Q_PooledTokens :: Q (IdentityV (First (Maybe [PooledToken])))
Q_Pools :: Q (IdentityV (Map Text (First (Maybe LPool))))

data Api :: * -> * where
Api_Swap :: ContractInstanceId Text -> Coin AssetClass -> Coin AssetClass -> Amount Integer -> Amount Integer -> Api (Either String Aeson.Value)
Expand Down
29 changes: 27 additions & 2 deletions use-case-2/common/src/Common/Schema.hs
Expand Up @@ -19,6 +19,7 @@ data Db f = Db
{ _db_contracts :: f (TableEntity ContractT)
, _db_pooledTokens :: f (TableEntity PooledTokenT)
, _db_txFeeDataSet :: f (TableEntity TxFeeDataSetT)
, _db_pools :: f (TableEntity PoolT)
}
deriving (Generic, Database be)

Expand Down Expand Up @@ -46,14 +47,26 @@ data TxFeeDataSetT f = TxFeeDataSet
}
deriving (Generic)

-- Liquidity Pool
data PoolT f = Pool
{ _pool_tokenASymbol :: Columnar f Text
, _pool_tokenBSymbol :: Columnar f Text
, _pool_tokenAAmount :: Columnar f Int32
, _pool_tokenBAmount :: Columnar f Int32
, _pool_liquiditySymbol :: Columnar f Text
, _pool_liquidityAmount :: Columnar f Int32
}
deriving (Generic)

instance Beamable ContractT
instance Beamable PooledTokenT
instance Beamable TxFeeDataSetT
instance Beamable PoolT

instance Table ContractT where
newtype PrimaryKey ContractT f = ContractId { _contractId_id :: Columnar f Text }
newtype PrimaryKey ContractT f = ContractId { _contractId_id :: Columnar f Int32 }
deriving (Generic)
primaryKey = ContractId . _contract_id
primaryKey = ContractId . _contract_walletId

instance Table PooledTokenT where
data PrimaryKey PooledTokenT f = PooledTokenId { _pooledTokenId_symbol :: Columnar f Text, _pooledTokenId_name :: Columnar f Text }
Expand All @@ -65,13 +78,20 @@ instance Table TxFeeDataSetT where
deriving (Generic)
primaryKey = TxFeeDataSetId . _txFeeDataSet_id

instance Table PoolT where
newtype PrimaryKey PoolT f = LiquiditySymbol { _liquiditySymbol_id :: Columnar f Text }
deriving (Generic)
primaryKey = LiquiditySymbol . _pool_liquiditySymbol

instance Beamable (PrimaryKey ContractT)
instance Beamable (PrimaryKey PooledTokenT)
instance Beamable (PrimaryKey TxFeeDataSetT)
instance Beamable (PrimaryKey PoolT)

type Contract = ContractT Identity
type PooledToken = PooledTokenT Identity
type TxFeeData = TxFeeDataSetT Identity
type LPool = PoolT Identity

deriving instance Show Contract
deriving instance Eq Contract
Expand All @@ -88,3 +108,8 @@ deriving instance Show TxFeeData
deriving instance Eq TxFeeData
deriving instance FromJSON TxFeeData
deriving instance ToJSON TxFeeData

deriving instance Show LPool
deriving instance Eq LPool
deriving instance FromJSON LPool
deriving instance ToJSON LPool
10 changes: 4 additions & 6 deletions use-case-2/frontend/src/Frontend/ChooseWallet.hs
Expand Up @@ -22,6 +22,7 @@ import Control.Applicative
import Control.Lens
import Control.Monad
import Control.Monad.IO.Class (MonadIO)
import qualified Data.Map as Map
import Data.Semigroup (First(..))
import Data.Text (Text)
import Data.Vessel
Expand Down Expand Up @@ -57,10 +58,7 @@ chooseWallet = do
dyn_ $ ffor dmmWalletIds $ \case
Nothing -> do
text "There are no wallets avaiable"
Just mWalletIds -> case mWalletIds of
Nothing -> do
text "There are no wallets avaiable"
Just walletIds -> do
Just walletIds -> do
elClass "ul" "list-group" $ do
forM_ walletIds $ \wid -> do
(e,_) <- elAttr' "li" ("class" =: "list-group-item list-group-item-dark" <> "style" =: "cursor:pointer") $ text wid
Expand All @@ -70,5 +68,5 @@ viewContracts
:: ( MonadQuery t (Vessel Q (Const SelectedCount)) m
, Reflex t
)
=> m (Dynamic t (Maybe (Maybe [Text])))
viewContracts = (fmap.fmap.fmap) (getFirst . runIdentity) $ queryViewMorphism 1 $ constDyn $ vessel Q_ContractList . identityV
=> m (Dynamic t (Maybe ([Text])))
viewContracts = (fmap.fmap.fmap) (Map.elems . Map.mapMaybe getFirst . runIdentity) $ queryViewMorphism 1 $ constDyn $ vessel Q_ContractList . identityV

0 comments on commit e7d4852

Please sign in to comment.