Skip to content
This repository has been archived by the owner on Dec 8, 2022. It is now read-only.

Commit

Permalink
Merge #40
Browse files Browse the repository at this point in the history
40: Add asynchronous updating of offline metadata r=ksaric a=erikd

I would prefer not to merge this before QA has looked at it.

Co-authored-by: Erik de Castro Lopo <erikd@mega-nerd.com>
  • Loading branch information
iohk-bors[bot] and erikd committed Aug 6, 2020
2 parents 8d60b4d + 5a1cdc5 commit 49780cb
Show file tree
Hide file tree
Showing 11 changed files with 407 additions and 135 deletions.
25 changes: 25 additions & 0 deletions schema/migration-2-0002-20200806.sql
@@ -0,0 +1,25 @@
-- Persistent generated migration.

CREATE FUNCTION migrate() RETURNS void AS $$
DECLARE
next_version int ;
BEGIN
SELECT stage_two + 1 INTO next_version FROM schema_version ;
IF next_version = 2 THEN
ALTER TABLE "pool_metadata_reference" ALTER COLUMN "pool_id" TYPE text;
ALTER TABLE "pool_metadata_reference" ALTER COLUMN "url" TYPE text;
ALTER TABLE "pool_metadata" ALTER COLUMN "pool_id" TYPE text;
ALTER TABLE "pool_metadata" ALTER COLUMN "ticker_name" TYPE text;
ALTER TABLE "pool_metadata" ALTER COLUMN "metadata" TYPE text;
ALTER TABLE "pool_metadata" ADD COLUMN "pmr_id" INT8 NULL;
ALTER TABLE "reserved_ticker" ALTER COLUMN "name" TYPE text;
-- Hand written SQL statements can be added here.
UPDATE schema_version SET stage_two = 2 ;
RAISE NOTICE 'DB has been migrated to stage_two version %', next_version ;
END IF ;
END ;
$$ LANGUAGE plpgsql ;

SELECT migrate() ;

DROP FUNCTION migrate() ;
5 changes: 4 additions & 1 deletion smash.cabal
Expand Up @@ -29,6 +29,9 @@ library

exposed-modules:
Lib
Offline
FetchQueue
FetchQueue.Retry
, Types
, DB
, DbSyncPlugin
Expand All @@ -49,7 +52,6 @@ library
, Cardano.Db.Query
, Cardano.Db.Types


other-modules:
Paths_smash
hs-source-dirs:
Expand All @@ -58,6 +60,7 @@ library
base >=4.7 && <5
, cardano-prelude
, containers
, quiet
, servant
, servant-server
, servant-swagger
Expand Down
17 changes: 9 additions & 8 deletions src/Cardano/Db/Schema.hs
Expand Up @@ -55,23 +55,24 @@ share
stageTwo Int
stageThree Int

-- The table containing pools' on-chain reference to its off-chain metadata.

PoolMetadataReference
poolId Types.PoolId sqltype=text
url Types.PoolUrl sqltype=text
hash Types.PoolMetadataHash sqltype=base16type
UniquePoolMetadataReference poolId hash

-- The table containing the metadata.

PoolMetadata
poolId Types.PoolId sqltype=text
tickerName Types.TickerName sqltype=text
hash Types.PoolMetadataHash sqltype=base16type
metadata Types.PoolMetadataRaw sqltype=text
pmrId PoolMetadataReferenceId Maybe
UniquePoolMetadata poolId hash

-- The table containing pools' on-chain reference to its off-chain metadata.

PoolMetadataReference
poolId Types.PoolId sqltype=text
url Types.PoolUrl sqltype=text
hash Types.PoolMetadataHash sqltype=base16type
UniquePoolMetadataReference poolId hash

-- The pools themselves (identified by the owner vkey hash)

Pool
Expand Down
9 changes: 7 additions & 2 deletions src/Cardano/SmashDbSync.hs
Expand Up @@ -32,7 +32,7 @@ import Control.Tracer (Tracer)
import Cardano.BM.Data.Tracer (ToLogObject (..))
import qualified Cardano.BM.Setup as Logging
import Cardano.BM.Trace (Trace, appendName,
logInfo)
modifyName, logInfo)
import qualified Cardano.BM.Trace as Logging

import Cardano.Client.Subscription (subscribe)
Expand Down Expand Up @@ -84,6 +84,8 @@ import Network.Socket (SockAddr

import Network.TypedProtocol.Pipelined (Nat (Succ, Zero))

import Offline (runOfflineFetchThread)

import Ouroboros.Network.Driver.Simple (runPipelinedPeer)

import Ouroboros.Consensus.Block.Abstract (ConvertRawHash (..))
Expand Down Expand Up @@ -403,7 +405,10 @@ dbSyncProtocols trce env plugin _version codecs _connectionId =
actionQueue <- newDbActionQueue
(metrics, server) <- registerMetricsServer
race_
(runDbThread trce env plugin metrics actionQueue)
(race_
(runDbThread trce env plugin metrics actionQueue)
(runOfflineFetchThread $ modifyName (const "fetch") trce)
)
(runPipelinedPeer
localChainSyncTracer
(cChainSyncCodec codecs)
Expand Down
12 changes: 6 additions & 6 deletions src/DB.hs
Expand Up @@ -51,8 +51,8 @@ import qualified Cardano.Db.Types as Types
-- TODO(KS): Newtype wrapper around @Text@ for the metadata.
data DataLayer = DataLayer
{ dlGetPoolMetadata :: PoolId -> PoolMetadataHash -> IO (Either DBFail (Text, Text))
, dlAddPoolMetadata :: PoolId -> PoolMetadataHash -> Text -> PoolTicker -> IO (Either DBFail Text)
, dlAddMetaDataReference :: PoolId -> PoolUrl -> PoolMetadataHash -> IO (Either DBFail PoolMetadataReferenceId)
, dlAddPoolMetadata :: Maybe PoolMetadataReferenceId -> PoolId -> PoolMetadataHash -> Text -> PoolTicker -> IO (Either DBFail Text)
, dlAddMetaDataReference :: PoolId -> PoolUrl -> PoolMetadataHash -> IO PoolMetadataReferenceId
, dlAddReservedTicker :: Text -> PoolMetadataHash -> IO (Either DBFail ReservedTickerId)
, dlCheckReservedTicker :: Text -> IO (Maybe ReservedTicker)
, dlCheckBlacklistedPool :: PoolId -> IO Bool
Expand All @@ -74,7 +74,7 @@ stubbedDataLayer ioDataMap ioBlacklistedPool = DataLayer
Just poolOfflineMetadata' -> return . Right $ ("Test", poolOfflineMetadata')
Nothing -> return $ Left (DbLookupPoolMetadataHash poolId poolmdHash)

, dlAddPoolMetadata = \poolId poolmdHash poolMetadata poolTicker -> do
, dlAddPoolMetadata = \ _ poolId poolmdHash poolMetadata poolTicker -> do
-- TODO(KS): What if the pool metadata already exists?
_ <- modifyIORef ioDataMap (Map.insert (poolId, poolmdHash) poolMetadata)
return . Right $ poolMetadata
Expand Down Expand Up @@ -117,9 +117,9 @@ postgresqlDataLayer = DataLayer
-- Ugh. Very sorry about this.
return $ (,) <$> poolTickerName <*> poolMetadata'

, dlAddPoolMetadata = \poolId poolHash poolMetadata poolTicker -> do
, dlAddPoolMetadata = \ mRefId poolId poolHash poolMetadata poolTicker -> do
let poolTickerName = Types.TickerName $ getPoolTicker poolTicker
_ <- runDbAction Nothing $ insertPoolMetadata $ PoolMetadata poolId poolTickerName poolHash (Types.PoolMetadataRaw poolMetadata)
_ <- runDbAction Nothing $ insertPoolMetadata $ PoolMetadata poolId poolTickerName poolHash (Types.PoolMetadataRaw poolMetadata) mRefId
return $ Right poolMetadata

, dlAddMetaDataReference = \poolId poolUrl poolMetadataHash -> do
Expand All @@ -129,7 +129,7 @@ postgresqlDataLayer = DataLayer
, poolMetadataReferenceHash = poolMetadataHash
, poolMetadataReferencePoolId = poolId
}
return $ Right poolMetadataRefId
return poolMetadataRefId

, dlAddReservedTicker = \tickerName poolMetadataHash -> do
reservedTickerId <- runDbAction Nothing $ insertReservedTicker $ ReservedTicker tickerName poolMetadataHash
Expand Down
127 changes: 11 additions & 116 deletions src/DbSyncPlugin.hs
Expand Up @@ -7,39 +7,24 @@ module DbSyncPlugin

import Cardano.Prelude

import Cardano.BM.Trace (Trace, logError,
import Cardano.BM.Trace (Trace,
logInfo)

import Control.Monad.Logger (LoggingT)
import Control.Monad.Trans.Except.Extra (firstExceptT,
handleExceptT,
left, newExceptT,
runExceptT)
import Control.Monad.Trans.Except.Extra (firstExceptT, newExceptT, runExceptT)
import Control.Monad.Trans.Reader (ReaderT)

import DB (DBFail (..),
DataLayer (..),
postgresqlDataLayer)
import Offline (fetchInsertNewPoolMetadata)
import Types (PoolId (..), PoolMetadataHash (..),
PoolOfflineMetadata (..),
PoolUrl (..))

import Data.Aeson (eitherDecode')
import qualified Data.ByteString.Lazy as LBS
import qualified Data.Text.Encoding as Text

import qualified Cardano.Chain.Block as Byron

import qualified Cardano.Crypto.Hash.Class as Crypto
import qualified Cardano.Crypto.Hash.Blake2b as Crypto

import qualified Data.ByteString.Base16 as B16

import Network.HTTP.Client (HttpExceptionContent (..), HttpException (..))
import qualified Network.HTTP.Client as Http
import Network.HTTP.Client.TLS (tlsManagerSettings)
import qualified Network.HTTP.Types.Status as Http

import Database.Persist.Sql (IsolationLevel (..), SqlBackend, transactionSaveWithIsolation)

import qualified Cardano.Db.Insert as DB
Expand Down Expand Up @@ -130,7 +115,7 @@ insertPoolCert
-> ExceptT DbSyncNodeError (ReaderT SqlBackend m) ()
insertPoolCert tracer pCert =
case pCert of
Shelley.RegPool pParams -> void $ insertPoolRegister tracer pParams
Shelley.RegPool pParams -> insertPoolRegister tracer pParams
Shelley.RetirePool _keyHash _epochNum -> pure ()
-- Currently we just maintain the data for the pool, we might not want to
-- know whether it's registered
Expand All @@ -139,117 +124,27 @@ insertPoolRegister
:: forall m. (MonadIO m)
=> Trace IO Text
-> ShelleyPoolParams
-> ExceptT DbSyncNodeError (ReaderT SqlBackend m) (Maybe DB.PoolMetadataReferenceId)
-> ExceptT DbSyncNodeError (ReaderT SqlBackend m) ()
insertPoolRegister tracer params = do
let poolIdHash = B16.encode . Shelley.unKeyHashBS $ Shelley._poolPubKey params
let poolId = PoolId poolIdHash


liftIO . logInfo tracer $ "Inserting pool register with pool id: " <> decodeUtf8 poolIdHash
poolMetadataId <- case strictMaybeToMaybe $ Shelley._poolMD params of
case strictMaybeToMaybe $ Shelley._poolMD params of
Just md -> do

liftIO $ fetchInsertPoolMetadataWrap tracer poolId md

liftIO . logInfo tracer $ "Inserting metadata."
let metadataUrl = PoolUrl . Shelley.urlToText $ Shelley._poolMDUrl md
let metadataHash = PoolMetadataHash . B16.encode $ Shelley._poolMDHash md

-- Move this upward, this doesn't make sense here. Kills any testing efforts here.
let dataLayer :: DataLayer
dataLayer = postgresqlDataLayer

let addMetaDataReference = dlAddMetaDataReference dataLayer

-- Ah. We can see there is garbage all over the code. Needs refactoring.
pmId <- lift . liftIO $ rightToMaybe <$> addMetaDataReference poolId metadataUrl metadataHash
refId <- lift . liftIO $ (dlAddMetaDataReference postgresqlDataLayer) poolId metadataUrl metadataHash

liftIO . logInfo tracer $ "Metadata inserted."
liftIO $ fetchInsertNewPoolMetadata tracer refId poolId md

return pmId
liftIO . logInfo tracer $ "Metadata inserted."

Nothing -> pure Nothing
Nothing -> pure ()

liftIO . logInfo tracer $ "Inserted pool register."
return poolMetadataId

fetchInsertPoolMetadataWrap
:: Trace IO Text
-> PoolId
-> Shelley.PoolMetaData
-> IO ()
fetchInsertPoolMetadataWrap tracer poolId md = do
res <- runExceptT $ fetchInsertPoolMetadata tracer poolId md
case res of
Left err -> logError tracer $ renderDbSyncNodeError err
Right response -> logInfo tracer (decodeUtf8 response)


fetchInsertPoolMetadata
:: Trace IO Text
-> PoolId
-> Shelley.PoolMetaData
-> ExceptT DbSyncNodeError IO ByteString
fetchInsertPoolMetadata tracer poolId md = do
-- Fetch the JSON info!
liftIO . logInfo tracer $ "Fetching JSON metadata."

let poolUrl = Shelley.urlToText (Shelley._poolMDUrl md)

-- This is a bit bad to do each time, but good enough for now.
manager <- liftIO $ Http.newManager tlsManagerSettings

liftIO . logInfo tracer $ "Request created with URL '" <> poolUrl <> "'."

request <- handleExceptT (\(e :: HttpException) -> NEError $ show e) (Http.parseRequest $ toS poolUrl)

liftIO . logInfo tracer $ "HTTP Client GET request."

(respBS, status) <- liftIO $ httpGetMax512Bytes request manager

liftIO . logInfo tracer $ "HTTP GET request response: " <> show status

liftIO . logInfo tracer $ "Inserting pool with hash: " <> renderByteStringHex (Shelley._poolMDHash md)

-- Let us try to decode the contents to JSON.
decodedMetadata <- case eitherDecode' (LBS.fromStrict respBS) of
Left err -> left $ NEError (show $ UnableToEncodePoolMetadataToJSON (toS err))
Right result -> pure result

-- Let's check the hash
let hashFromMetadata = Crypto.digest (Proxy :: Proxy Crypto.Blake2b_256) respBS

when (hashFromMetadata /= Shelley._poolMDHash md) $
left . NEError $
mconcat
[ "Pool hash mismatch. Expected ", renderByteStringHex (Shelley._poolMDHash md)
, " but got ", renderByteStringHex hashFromMetadata
]

liftIO . logInfo tracer $ "Inserting JSON offline metadata."

let addPoolMetadata = dlAddPoolMetadata postgresqlDataLayer
_ <- liftIO $ addPoolMetadata
poolId
(PoolMetadataHash . B16.encode $ Shelley._poolMDHash md)
(decodeUtf8 respBS)
(pomTicker decodedMetadata)

pure respBS


httpGetMax512Bytes :: Http.Request -> Http.Manager -> IO (ByteString, Http.Status)
httpGetMax512Bytes request manager =
Http.withResponse request manager $ \responseBR -> do
-- We read the first chunk that should contain all the bytes from the reponse.
responseBSFirstChunk <- Http.brReadSome (Http.responseBody responseBR) 512
-- If there are more bytes in the second chunk, we don't go any further since that
-- violates the size constraint.
responseBSSecondChunk <- Http.brReadSome (Http.responseBody responseBR) 1
if LBS.null responseBSSecondChunk
then pure $ (LBS.toStrict responseBSFirstChunk, Http.responseStatus responseBR)
-- TODO: this is just WRONG.
else throwIO $ HttpExceptionRequest request NoResponseDataReceived

renderByteStringHex :: ByteString -> Text
renderByteStringHex = Text.decodeUtf8 . B16.encode
pure ()
63 changes: 63 additions & 0 deletions src/FetchQueue.hs
@@ -0,0 +1,63 @@
module FetchQueue
( FetchQueue -- opaque
, PoolFetchRetry (..)
, Retry -- opaque
, emptyFetchQueue
, lenFetchQueue
, nullFetchQueue
, insertFetchQueue
, partitionFetchQueue
, newRetry
, nextRetry
) where


import Cardano.Prelude

import Data.Time.Clock.POSIX (POSIXTime)
import Data.Map.Strict (Map)
import qualified Data.Map.Strict as Map

import Cardano.Db.Schema (PoolMetadataReferenceId)
import Cardano.Db.Types (PoolId)

import FetchQueue.Retry


-- Unfortunately I am way too pressed for time and way too tired to make this less savage.
-- Figuring out how to use an existing priority queue for this task would be more time
-- consuming that writing this from scratch.

newtype FetchQueue = FetchQueue (Map Text PoolFetchRetry)

data PoolFetchRetry = PoolFetchRetry
{ pfrReferenceId :: !PoolMetadataReferenceId
, pfrPoolIdWtf :: !PoolId
, pfrPoolUrl :: !Text
, pfrPoolMDHash :: !ByteString
, pfrRetry :: !Retry
}

emptyFetchQueue :: FetchQueue
emptyFetchQueue = FetchQueue mempty

lenFetchQueue :: FetchQueue -> Int
lenFetchQueue (FetchQueue m) = Map.size m

nullFetchQueue :: FetchQueue -> Bool
nullFetchQueue (FetchQueue m) = Map.null m

insertFetchQueue :: [PoolFetchRetry] -> FetchQueue -> FetchQueue
insertFetchQueue xs (FetchQueue mp) =
FetchQueue $ Map.union mp (Map.fromList $ map build xs)
where
build :: PoolFetchRetry -> (Text, PoolFetchRetry)
build pfr = (pfrPoolUrl pfr, pfr)

partitionFetchQueue :: FetchQueue -> POSIXTime -> ([PoolFetchRetry], FetchQueue)
partitionFetchQueue (FetchQueue mp) now =
case Map.partition isRunnable mp of
(runnable, unrunnable) -> (Map.elems runnable, FetchQueue unrunnable)
where
isRunnable :: PoolFetchRetry -> Bool
isRunnable pfr = retryWhen (pfrRetry pfr) <= now

0 comments on commit 49780cb

Please sign in to comment.