diff --git a/schema/migration-2-0002-20200806.sql b/schema/migration-2-0002-20200806.sql new file mode 100644 index 0000000..c504ad4 --- /dev/null +++ b/schema/migration-2-0002-20200806.sql @@ -0,0 +1,25 @@ +-- Persistent generated migration. + +CREATE FUNCTION migrate() RETURNS void AS $$ +DECLARE + next_version int ; +BEGIN + SELECT stage_two + 1 INTO next_version FROM schema_version ; + IF next_version = 2 THEN + ALTER TABLE "pool_metadata_reference" ALTER COLUMN "pool_id" TYPE text; + ALTER TABLE "pool_metadata_reference" ALTER COLUMN "url" TYPE text; + ALTER TABLE "pool_metadata" ALTER COLUMN "pool_id" TYPE text; + ALTER TABLE "pool_metadata" ALTER COLUMN "ticker_name" TYPE text; + ALTER TABLE "pool_metadata" ALTER COLUMN "metadata" TYPE text; + ALTER TABLE "pool_metadata" ADD COLUMN "pmr_id" INT8 NULL; + ALTER TABLE "reserved_ticker" ALTER COLUMN "name" TYPE text; + -- Hand written SQL statements can be added here. + UPDATE schema_version SET stage_two = 2 ; + RAISE NOTICE 'DB has been migrated to stage_two version %', next_version ; + END IF ; +END ; +$$ LANGUAGE plpgsql ; + +SELECT migrate() ; + +DROP FUNCTION migrate() ; diff --git a/smash.cabal b/smash.cabal index 8824b2b..0da3fea 100644 --- a/smash.cabal +++ b/smash.cabal @@ -29,6 +29,9 @@ library exposed-modules: Lib + Offline + FetchQueue + FetchQueue.Retry , Types , DB , DbSyncPlugin @@ -49,7 +52,6 @@ library , Cardano.Db.Query , Cardano.Db.Types - other-modules: Paths_smash hs-source-dirs: @@ -58,6 +60,7 @@ library base >=4.7 && <5 , cardano-prelude , containers + , quiet , servant , servant-server , servant-swagger diff --git a/src/Cardano/Db/Schema.hs b/src/Cardano/Db/Schema.hs index 953ed8f..edd76da 100644 --- a/src/Cardano/Db/Schema.hs +++ b/src/Cardano/Db/Schema.hs @@ -55,6 +55,14 @@ share stageTwo Int stageThree Int + -- The table containing pools' on-chain reference to its off-chain metadata. + + PoolMetadataReference + poolId Types.PoolId sqltype=text + url Types.PoolUrl sqltype=text + hash Types.PoolMetadataHash sqltype=base16type + UniquePoolMetadataReference poolId hash + -- The table containing the metadata. PoolMetadata @@ -62,16 +70,9 @@ share tickerName Types.TickerName sqltype=text hash Types.PoolMetadataHash sqltype=base16type metadata Types.PoolMetadataRaw sqltype=text + pmrId PoolMetadataReferenceId Maybe UniquePoolMetadata poolId hash - -- The table containing pools' on-chain reference to its off-chain metadata. - - PoolMetadataReference - poolId Types.PoolId sqltype=text - url Types.PoolUrl sqltype=text - hash Types.PoolMetadataHash sqltype=base16type - UniquePoolMetadataReference poolId hash - -- The pools themselves (identified by the owner vkey hash) Pool diff --git a/src/Cardano/SmashDbSync.hs b/src/Cardano/SmashDbSync.hs index 6b998ba..40dc377 100644 --- a/src/Cardano/SmashDbSync.hs +++ b/src/Cardano/SmashDbSync.hs @@ -32,7 +32,7 @@ import Control.Tracer (Tracer) import Cardano.BM.Data.Tracer (ToLogObject (..)) import qualified Cardano.BM.Setup as Logging import Cardano.BM.Trace (Trace, appendName, - logInfo) + modifyName, logInfo) import qualified Cardano.BM.Trace as Logging import Cardano.Client.Subscription (subscribe) @@ -84,6 +84,8 @@ import Network.Socket (SockAddr import Network.TypedProtocol.Pipelined (Nat (Succ, Zero)) +import Offline (runOfflineFetchThread) + import Ouroboros.Network.Driver.Simple (runPipelinedPeer) import Ouroboros.Consensus.Block.Abstract (ConvertRawHash (..)) @@ -403,7 +405,10 @@ dbSyncProtocols trce env plugin _version codecs _connectionId = actionQueue <- newDbActionQueue (metrics, server) <- registerMetricsServer race_ - (runDbThread trce env plugin metrics actionQueue) + (race_ + (runDbThread trce env plugin metrics actionQueue) + (runOfflineFetchThread $ modifyName (const "fetch") trce) + ) (runPipelinedPeer localChainSyncTracer (cChainSyncCodec codecs) diff --git a/src/DB.hs b/src/DB.hs index 4d1d1d0..cd706b2 100644 --- a/src/DB.hs +++ b/src/DB.hs @@ -51,8 +51,8 @@ import qualified Cardano.Db.Types as Types -- TODO(KS): Newtype wrapper around @Text@ for the metadata. data DataLayer = DataLayer { dlGetPoolMetadata :: PoolId -> PoolMetadataHash -> IO (Either DBFail (Text, Text)) - , dlAddPoolMetadata :: PoolId -> PoolMetadataHash -> Text -> PoolTicker -> IO (Either DBFail Text) - , dlAddMetaDataReference :: PoolId -> PoolUrl -> PoolMetadataHash -> IO (Either DBFail PoolMetadataReferenceId) + , dlAddPoolMetadata :: Maybe PoolMetadataReferenceId -> PoolId -> PoolMetadataHash -> Text -> PoolTicker -> IO (Either DBFail Text) + , dlAddMetaDataReference :: PoolId -> PoolUrl -> PoolMetadataHash -> IO PoolMetadataReferenceId , dlAddReservedTicker :: Text -> PoolMetadataHash -> IO (Either DBFail ReservedTickerId) , dlCheckReservedTicker :: Text -> IO (Maybe ReservedTicker) , dlCheckBlacklistedPool :: PoolId -> IO Bool @@ -74,7 +74,7 @@ stubbedDataLayer ioDataMap ioBlacklistedPool = DataLayer Just poolOfflineMetadata' -> return . Right $ ("Test", poolOfflineMetadata') Nothing -> return $ Left (DbLookupPoolMetadataHash poolId poolmdHash) - , dlAddPoolMetadata = \poolId poolmdHash poolMetadata poolTicker -> do + , dlAddPoolMetadata = \ _ poolId poolmdHash poolMetadata poolTicker -> do -- TODO(KS): What if the pool metadata already exists? _ <- modifyIORef ioDataMap (Map.insert (poolId, poolmdHash) poolMetadata) return . Right $ poolMetadata @@ -117,9 +117,9 @@ postgresqlDataLayer = DataLayer -- Ugh. Very sorry about this. return $ (,) <$> poolTickerName <*> poolMetadata' - , dlAddPoolMetadata = \poolId poolHash poolMetadata poolTicker -> do + , dlAddPoolMetadata = \ mRefId poolId poolHash poolMetadata poolTicker -> do let poolTickerName = Types.TickerName $ getPoolTicker poolTicker - _ <- runDbAction Nothing $ insertPoolMetadata $ PoolMetadata poolId poolTickerName poolHash (Types.PoolMetadataRaw poolMetadata) + _ <- runDbAction Nothing $ insertPoolMetadata $ PoolMetadata poolId poolTickerName poolHash (Types.PoolMetadataRaw poolMetadata) mRefId return $ Right poolMetadata , dlAddMetaDataReference = \poolId poolUrl poolMetadataHash -> do @@ -129,7 +129,7 @@ postgresqlDataLayer = DataLayer , poolMetadataReferenceHash = poolMetadataHash , poolMetadataReferencePoolId = poolId } - return $ Right poolMetadataRefId + return poolMetadataRefId , dlAddReservedTicker = \tickerName poolMetadataHash -> do reservedTickerId <- runDbAction Nothing $ insertReservedTicker $ ReservedTicker tickerName poolMetadataHash diff --git a/src/DbSyncPlugin.hs b/src/DbSyncPlugin.hs index 339eb5f..f72fd4b 100644 --- a/src/DbSyncPlugin.hs +++ b/src/DbSyncPlugin.hs @@ -7,39 +7,24 @@ module DbSyncPlugin import Cardano.Prelude -import Cardano.BM.Trace (Trace, logError, +import Cardano.BM.Trace (Trace, logInfo) import Control.Monad.Logger (LoggingT) -import Control.Monad.Trans.Except.Extra (firstExceptT, - handleExceptT, - left, newExceptT, - runExceptT) +import Control.Monad.Trans.Except.Extra (firstExceptT, newExceptT, runExceptT) import Control.Monad.Trans.Reader (ReaderT) import DB (DBFail (..), DataLayer (..), postgresqlDataLayer) +import Offline (fetchInsertNewPoolMetadata) import Types (PoolId (..), PoolMetadataHash (..), - PoolOfflineMetadata (..), PoolUrl (..)) -import Data.Aeson (eitherDecode') -import qualified Data.ByteString.Lazy as LBS -import qualified Data.Text.Encoding as Text - import qualified Cardano.Chain.Block as Byron -import qualified Cardano.Crypto.Hash.Class as Crypto -import qualified Cardano.Crypto.Hash.Blake2b as Crypto - import qualified Data.ByteString.Base16 as B16 -import Network.HTTP.Client (HttpExceptionContent (..), HttpException (..)) -import qualified Network.HTTP.Client as Http -import Network.HTTP.Client.TLS (tlsManagerSettings) -import qualified Network.HTTP.Types.Status as Http - import Database.Persist.Sql (IsolationLevel (..), SqlBackend, transactionSaveWithIsolation) import qualified Cardano.Db.Insert as DB @@ -130,7 +115,7 @@ insertPoolCert -> ExceptT DbSyncNodeError (ReaderT SqlBackend m) () insertPoolCert tracer pCert = case pCert of - Shelley.RegPool pParams -> void $ insertPoolRegister tracer pParams + Shelley.RegPool pParams -> insertPoolRegister tracer pParams Shelley.RetirePool _keyHash _epochNum -> pure () -- Currently we just maintain the data for the pool, we might not want to -- know whether it's registered @@ -139,117 +124,27 @@ insertPoolRegister :: forall m. (MonadIO m) => Trace IO Text -> ShelleyPoolParams - -> ExceptT DbSyncNodeError (ReaderT SqlBackend m) (Maybe DB.PoolMetadataReferenceId) + -> ExceptT DbSyncNodeError (ReaderT SqlBackend m) () insertPoolRegister tracer params = do let poolIdHash = B16.encode . Shelley.unKeyHashBS $ Shelley._poolPubKey params let poolId = PoolId poolIdHash - liftIO . logInfo tracer $ "Inserting pool register with pool id: " <> decodeUtf8 poolIdHash - poolMetadataId <- case strictMaybeToMaybe $ Shelley._poolMD params of + case strictMaybeToMaybe $ Shelley._poolMD params of Just md -> do - liftIO $ fetchInsertPoolMetadataWrap tracer poolId md - liftIO . logInfo tracer $ "Inserting metadata." let metadataUrl = PoolUrl . Shelley.urlToText $ Shelley._poolMDUrl md let metadataHash = PoolMetadataHash . B16.encode $ Shelley._poolMDHash md - -- Move this upward, this doesn't make sense here. Kills any testing efforts here. - let dataLayer :: DataLayer - dataLayer = postgresqlDataLayer - - let addMetaDataReference = dlAddMetaDataReference dataLayer - -- Ah. We can see there is garbage all over the code. Needs refactoring. - pmId <- lift . liftIO $ rightToMaybe <$> addMetaDataReference poolId metadataUrl metadataHash + refId <- lift . liftIO $ (dlAddMetaDataReference postgresqlDataLayer) poolId metadataUrl metadataHash - liftIO . logInfo tracer $ "Metadata inserted." + liftIO $ fetchInsertNewPoolMetadata tracer refId poolId md - return pmId + liftIO . logInfo tracer $ "Metadata inserted." - Nothing -> pure Nothing + Nothing -> pure () liftIO . logInfo tracer $ "Inserted pool register." - return poolMetadataId - -fetchInsertPoolMetadataWrap - :: Trace IO Text - -> PoolId - -> Shelley.PoolMetaData - -> IO () -fetchInsertPoolMetadataWrap tracer poolId md = do - res <- runExceptT $ fetchInsertPoolMetadata tracer poolId md - case res of - Left err -> logError tracer $ renderDbSyncNodeError err - Right response -> logInfo tracer (decodeUtf8 response) - - -fetchInsertPoolMetadata - :: Trace IO Text - -> PoolId - -> Shelley.PoolMetaData - -> ExceptT DbSyncNodeError IO ByteString -fetchInsertPoolMetadata tracer poolId md = do - -- Fetch the JSON info! - liftIO . logInfo tracer $ "Fetching JSON metadata." - - let poolUrl = Shelley.urlToText (Shelley._poolMDUrl md) - - -- This is a bit bad to do each time, but good enough for now. - manager <- liftIO $ Http.newManager tlsManagerSettings - - liftIO . logInfo tracer $ "Request created with URL '" <> poolUrl <> "'." - - request <- handleExceptT (\(e :: HttpException) -> NEError $ show e) (Http.parseRequest $ toS poolUrl) - - liftIO . logInfo tracer $ "HTTP Client GET request." - - (respBS, status) <- liftIO $ httpGetMax512Bytes request manager - - liftIO . logInfo tracer $ "HTTP GET request response: " <> show status - - liftIO . logInfo tracer $ "Inserting pool with hash: " <> renderByteStringHex (Shelley._poolMDHash md) - - -- Let us try to decode the contents to JSON. - decodedMetadata <- case eitherDecode' (LBS.fromStrict respBS) of - Left err -> left $ NEError (show $ UnableToEncodePoolMetadataToJSON (toS err)) - Right result -> pure result - - -- Let's check the hash - let hashFromMetadata = Crypto.digest (Proxy :: Proxy Crypto.Blake2b_256) respBS - - when (hashFromMetadata /= Shelley._poolMDHash md) $ - left . NEError $ - mconcat - [ "Pool hash mismatch. Expected ", renderByteStringHex (Shelley._poolMDHash md) - , " but got ", renderByteStringHex hashFromMetadata - ] - - liftIO . logInfo tracer $ "Inserting JSON offline metadata." - - let addPoolMetadata = dlAddPoolMetadata postgresqlDataLayer - _ <- liftIO $ addPoolMetadata - poolId - (PoolMetadataHash . B16.encode $ Shelley._poolMDHash md) - (decodeUtf8 respBS) - (pomTicker decodedMetadata) - - pure respBS - - -httpGetMax512Bytes :: Http.Request -> Http.Manager -> IO (ByteString, Http.Status) -httpGetMax512Bytes request manager = - Http.withResponse request manager $ \responseBR -> do - -- We read the first chunk that should contain all the bytes from the reponse. - responseBSFirstChunk <- Http.brReadSome (Http.responseBody responseBR) 512 - -- If there are more bytes in the second chunk, we don't go any further since that - -- violates the size constraint. - responseBSSecondChunk <- Http.brReadSome (Http.responseBody responseBR) 1 - if LBS.null responseBSSecondChunk - then pure $ (LBS.toStrict responseBSFirstChunk, Http.responseStatus responseBR) - -- TODO: this is just WRONG. - else throwIO $ HttpExceptionRequest request NoResponseDataReceived - -renderByteStringHex :: ByteString -> Text -renderByteStringHex = Text.decodeUtf8 . B16.encode + pure () diff --git a/src/FetchQueue.hs b/src/FetchQueue.hs new file mode 100644 index 0000000..9ada9ec --- /dev/null +++ b/src/FetchQueue.hs @@ -0,0 +1,63 @@ +module FetchQueue + ( FetchQueue -- opaque + , PoolFetchRetry (..) + , Retry -- opaque + , emptyFetchQueue + , lenFetchQueue + , nullFetchQueue + , insertFetchQueue + , partitionFetchQueue + , newRetry + , nextRetry + ) where + + +import Cardano.Prelude + +import Data.Time.Clock.POSIX (POSIXTime) +import Data.Map.Strict (Map) +import qualified Data.Map.Strict as Map + +import Cardano.Db.Schema (PoolMetadataReferenceId) +import Cardano.Db.Types (PoolId) + +import FetchQueue.Retry + + +-- Unfortunately I am way too pressed for time and way too tired to make this less savage. +-- Figuring out how to use an existing priority queue for this task would be more time +-- consuming that writing this from scratch. + +newtype FetchQueue = FetchQueue (Map Text PoolFetchRetry) + +data PoolFetchRetry = PoolFetchRetry + { pfrReferenceId :: !PoolMetadataReferenceId + , pfrPoolIdWtf :: !PoolId + , pfrPoolUrl :: !Text + , pfrPoolMDHash :: !ByteString + , pfrRetry :: !Retry + } + +emptyFetchQueue :: FetchQueue +emptyFetchQueue = FetchQueue mempty + +lenFetchQueue :: FetchQueue -> Int +lenFetchQueue (FetchQueue m) = Map.size m + +nullFetchQueue :: FetchQueue -> Bool +nullFetchQueue (FetchQueue m) = Map.null m + +insertFetchQueue :: [PoolFetchRetry] -> FetchQueue -> FetchQueue +insertFetchQueue xs (FetchQueue mp) = + FetchQueue $ Map.union mp (Map.fromList $ map build xs) + where + build :: PoolFetchRetry -> (Text, PoolFetchRetry) + build pfr = (pfrPoolUrl pfr, pfr) + +partitionFetchQueue :: FetchQueue -> POSIXTime -> ([PoolFetchRetry], FetchQueue) +partitionFetchQueue (FetchQueue mp) now = + case Map.partition isRunnable mp of + (runnable, unrunnable) -> (Map.elems runnable, FetchQueue unrunnable) + where + isRunnable :: PoolFetchRetry -> Bool + isRunnable pfr = retryWhen (pfrRetry pfr) <= now diff --git a/src/FetchQueue/Retry.hs b/src/FetchQueue/Retry.hs new file mode 100644 index 0000000..fc01275 --- /dev/null +++ b/src/FetchQueue/Retry.hs @@ -0,0 +1,44 @@ +{-# LANGUAGE DeriveGeneric #-} +{-# LANGUAGE DerivingStrategies #-} +{-# LANGUAGE DerivingVia #-} +module FetchQueue.Retry + ( Retry (..) + , newRetry + , nextRetry + ) where + + +import Cardano.Prelude + +import qualified Data.Time.Clock as Time +import Data.Time.Clock.POSIX (POSIXTime) + +import GHC.Generics (Generic (..)) + +import Quiet (Quiet (..)) + +data Retry = Retry + { retryWhen :: !POSIXTime + , retryNext :: !POSIXTime + , retryCount :: !Word + } deriving (Eq, Generic) + deriving Show via (Quiet Retry) + +newRetry :: POSIXTime -> Retry +newRetry now = + Retry + { retryWhen = now + , retryNext = now + 60 -- 60 seconds from now + , retryCount = 0 + } + +-- Update a Retry with an exponential (* 3) backoff. +nextRetry :: POSIXTime -> Retry -> Retry +nextRetry now r = + -- Assuming 'now' is correct, 'retryWhen' and 'retryNext' should always be in the future. + let udiff = min Time.nominalDay (max 30 (3 * (retryNext r - retryWhen r))) in + Retry + { retryWhen = now + udiff + , retryNext = now + 3 * udiff + , retryCount = 1 + retryCount r + } diff --git a/src/Lib.hs b/src/Lib.hs index 22b6ff6..88d9805 100644 --- a/src/Lib.hs +++ b/src/Lib.hs @@ -166,7 +166,7 @@ runPoolInsertion poolMetadataJsonPath poolId poolHash = do let addPoolMetadata = dlAddPoolMetadata dataLayer - addPoolMetadata poolId poolHash poolMetadataJson (pomTicker decodedMetadata) + addPoolMetadata Nothing poolId poolHash poolMetadataJson (pomTicker decodedMetadata) runTickerNameInsertion :: Text -> PoolMetadataHash -> IO (Either DBFail ReservedTickerId) runTickerNameInsertion tickerName poolMetadataHash = do diff --git a/src/Offline.hs b/src/Offline.hs new file mode 100644 index 0000000..d2fd9ee --- /dev/null +++ b/src/Offline.hs @@ -0,0 +1,236 @@ +{-# LANGUAGE ScopedTypeVariables #-} +{-# LANGUAGE LambdaCase #-} + +module Offline + ( fetchInsertNewPoolMetadata + , runOfflineFetchThread + ) where + +import Cardano.Prelude hiding (from, groupBy, retry) + +import Cardano.BM.Trace (Trace, logWarning, logInfo) + +import Control.Concurrent (threadDelay) +import Control.Monad.Trans.Except.Extra (handleExceptT, hoistEither, left) + +import DB (DataLayer (..), PoolMetadataReference (..), PoolMetadataReferenceId, + postgresqlDataLayer, runDbAction) +import FetchQueue +import Types (PoolId, PoolMetadataHash (..), getPoolMetadataHash, getPoolUrl, pomTicker) + +import Data.Aeson (eitherDecode') +import qualified Data.ByteString.Lazy as LBS +import qualified Data.Text as Text +import qualified Data.Text.Encoding as Text +import qualified Data.Time.Clock.POSIX as Time + +import qualified Cardano.Crypto.Hash.Class as Crypto +import qualified Cardano.Crypto.Hash.Blake2b as Crypto +import qualified Cardano.Db.Schema as DB + +import qualified Data.ByteString.Base16 as B16 + +import Database.Esqueleto (Entity (..), SqlExpr, ValueList, (^.), (==.), + entityKey, entityVal, from, groupBy, in_, just, max_, notExists, + select, subList_select, where_) +import Database.Persist.Sql (SqlBackend) + +import Network.HTTP.Client (HttpException (..)) +import qualified Network.HTTP.Client as Http +import Network.HTTP.Client.TLS (tlsManagerSettings) +import qualified Network.HTTP.Types.Status as Http + +import qualified Shelley.Spec.Ledger.BaseTypes as Shelley +import qualified Shelley.Spec.Ledger.TxData as Shelley + +-- This is an incredibly rough hack that adds asynchronous fetching of offline metadata. +-- This is not my best work. + + +data FetchError + = FEHashMismatch !Text !Text + | FEDataTooLong + | FEUrlParseFail !Text + | FEJsonDecodeFail !Text + | FEHttpException !Text + | FEHttpResponse !Int + | FEIOException !Text + | FETimeout !Text + | FEConnectionFailure + +fetchInsertNewPoolMetadata + :: Trace IO Text + -> DB.PoolMetadataReferenceId + -> PoolId + -> Shelley.PoolMetaData + -> IO () +fetchInsertNewPoolMetadata tracer refId poolId md = do + now <- Time.getPOSIXTime + void . fetchInsertNewPoolMetadataOld tracer $ + PoolFetchRetry + { pfrReferenceId = refId + , pfrPoolIdWtf = poolId + , pfrPoolUrl = Shelley.urlToText (Shelley._poolMDUrl md) + , pfrPoolMDHash = Shelley._poolMDHash md + , pfrRetry = newRetry now + } + +fetchInsertNewPoolMetadataOld + :: Trace IO Text + -> PoolFetchRetry + -> IO (Maybe PoolFetchRetry) +fetchInsertNewPoolMetadataOld tracer pfr = do + res <- runExceptT fetchInsert + case res of + Right () -> pure Nothing + Left err -> do + logWarning tracer $ renderFetchError err + -- Update retry timeout here as a psuedo-randomisation of retry. + now <- Time.getPOSIXTime + pure . Just $ pfr { pfrRetry = nextRetry now (pfrRetry pfr) } + where + fetchInsert :: ExceptT FetchError IO () + fetchInsert = do + -- This is a bit bad to do each time, but good enough for now. + manager <- liftIO $ Http.newManager tlsManagerSettings + + liftIO . logInfo tracer $ "Request: " <> pfrPoolUrl pfr + + request <- handleExceptT (\(_ :: HttpException) -> FEUrlParseFail $ pfrPoolUrl pfr) + $ Http.parseRequest (toS $ pfrPoolUrl pfr) + + (respBS, status) <- httpGetMax512Bytes request manager + + when (Http.statusCode status /= 200) . + left $ FEHttpResponse (Http.statusCode status) + + liftIO . logInfo tracer $ "Response: " <> show (Http.statusCode status) + + decodedMetadata <- case eitherDecode' (LBS.fromStrict respBS) of + Left err -> left $ FEJsonDecodeFail (toS err) + Right result -> pure result + + -- Let's check the hash + let hashFromMetadata = Crypto.digest (Proxy :: Proxy Crypto.Blake2b_256) respBS + expectedHash = renderByteStringHex (pfrPoolMDHash pfr) + + if hashFromMetadata /= pfrPoolMDHash pfr + then left $ FEHashMismatch expectedHash (renderByteStringHex hashFromMetadata) + else liftIO . logInfo tracer $ "Inserting pool data with hash: " <> expectedHash + + _ <- liftIO $ + (dlAddPoolMetadata postgresqlDataLayer) + (Just $ pfrReferenceId pfr) + (pfrPoolIdWtf pfr) + (PoolMetadataHash . B16.encode $ pfrPoolMDHash pfr) + (decodeUtf8 respBS) + (pomTicker decodedMetadata) + + liftIO $ logInfo tracer (decodeUtf8 respBS) + +runOfflineFetchThread :: Trace IO Text -> IO () +runOfflineFetchThread trce = do + liftIO $ logInfo trce "Runing Offline fetch thread" + fetchLoop trce emptyFetchQueue + +-- ------------------------------------------------------------------------------------------------- + +fetchLoop :: Trace IO Text -> FetchQueue -> IO () +fetchLoop trce = + loop + where + loop :: FetchQueue -> IO () + loop fq = do + now <- Time.getPOSIXTime + pools <- runDbAction Nothing $ queryPoolFetchRetry (newRetry now) + let newFq = insertFetchQueue pools fq + (runnable, unrunnable) = partitionFetchQueue newFq now + logInfo trce $ + mconcat + [ "fetchLoop: ", show (length runnable), " runnable, " + , show (lenFetchQueue unrunnable), " pending" + ] + if null runnable + then do + threadDelay (20 * 1000 * 1000) -- 20 seconds + loop unrunnable + else do + liftIO $ logInfo trce $ "Pools without offline metadata: " <> show (length runnable) + rs <- catMaybes <$> mapM (fetchInsertNewPoolMetadataOld trce) pools + loop $ insertFetchQueue rs unrunnable + +httpGetMax512Bytes :: Http.Request -> Http.Manager -> ExceptT FetchError IO (ByteString, Http.Status) +httpGetMax512Bytes request manager = do + res <- handleExceptT convertHttpException $ + Http.withResponse request manager $ \responseBR -> do + -- We read the first chunk that should contain all the bytes from the reponse. + responseBSFirstChunk <- Http.brReadSome (Http.responseBody responseBR) 512 + -- If there are more bytes in the second chunk, we don't go any further since that + -- violates the size constraint. + responseBSSecondChunk <- Http.brReadSome (Http.responseBody responseBR) 1 + if LBS.null responseBSSecondChunk + then pure $ Right (LBS.toStrict responseBSFirstChunk, Http.responseStatus responseBR) + else pure $ Left FEDataTooLong + hoistEither res + +convertHttpException :: HttpException -> FetchError +convertHttpException he = + case he of + HttpExceptionRequest _req hec -> + case hec of + Http.ResponseTimeout -> FETimeout "Response" + Http.ConnectionTimeout -> FETimeout "Connection" + Http.ConnectionFailure {} -> FEConnectionFailure + other -> FEHttpException (show other) + InvalidUrlException url _ -> FEUrlParseFail (Text.pack url) + + +-- select * from pool_metadata_reference +-- where id in (select max(id) from pool_metadata_reference group by pool_id) +-- and not exists (select * from pool_metadata where pmr_id = pool_metadata_reference.id) ; + +-- Get a list of the pools for which there is a PoolMetadataReference entry but there is +-- no PoolMetadata entry. +-- This is a bit questionable because it assumes that the autogenerated 'id' primary key +-- is a reliable proxy for time, ie higher 'id' was added later in time. +queryPoolFetchRetry :: MonadIO m => Retry -> ReaderT SqlBackend m [PoolFetchRetry] +queryPoolFetchRetry retry = do + res <- select . from $ \ pmr -> do + where_ (just (pmr ^. DB.PoolMetadataReferenceId) `in_` latestReferences) + where_ (notExists . from $ \ pod -> where_ (pod ^. DB.PoolMetadataPmrId ==. just (pmr ^. DB.PoolMetadataReferenceId))) + pure pmr + pure $ map convert res + where + latestReferences :: SqlExpr (ValueList (Maybe PoolMetadataReferenceId)) + latestReferences = + subList_select . from $ \ pfr -> do + groupBy (pfr ^. DB.PoolMetadataReferencePoolId) + pure $ max_ (pfr ^. DB.PoolMetadataReferenceId) + + convert :: Entity PoolMetadataReference -> PoolFetchRetry + convert entity = + let pmr = entityVal entity in + PoolFetchRetry + { pfrReferenceId = entityKey entity + , pfrPoolIdWtf = DB.poolMetadataReferencePoolId pmr + , pfrPoolUrl = getPoolUrl $ poolMetadataReferenceUrl pmr + , pfrPoolMDHash = fst . B16.decode $ getPoolMetadataHash (poolMetadataReferenceHash pmr) + , pfrRetry = retry + } + + +renderByteStringHex :: ByteString -> Text +renderByteStringHex = Text.decodeUtf8 . B16.encode + +renderFetchError :: FetchError -> Text +renderFetchError fe = + case fe of + FEHashMismatch xpt act -> mconcat [ "Hash mismatch. Expected ", xpt, " but got ", act, "." ] + FEDataTooLong -> "Offline pool data exceeded 512 bytes." + FEUrlParseFail err -> "URL parse error: " <> err + FEJsonDecodeFail err -> "JSON decode error: " <> err + FEHttpException err -> "HTTP Exception: " <> err + FEHttpResponse sc -> "HTTP Response : " <> show sc + FEIOException err -> "IO Exception: " <> err + FETimeout ctx -> ctx <> " timeout" + FEConnectionFailure -> "Connection failure" diff --git a/test/SmashSpecSM.hs b/test/SmashSpecSM.hs index 80b6baf..78d8f83 100644 --- a/test/SmashSpecSM.hs +++ b/test/SmashSpecSM.hs @@ -179,7 +179,7 @@ smashSM dataLayer = StateMachine mSemantics (InsertPool poolId poolHash poolOfflineMeta) = do let addPoolMetadata = dlAddPoolMetadata dataLayer -- TODO(KS): Fix this. - result <- addPoolMetadata poolId poolHash poolOfflineMeta (PoolTicker "tickerName") + result <- addPoolMetadata Nothing poolId poolHash poolOfflineMeta (PoolTicker "tickerName") case result of Left err -> return $ MissingPoolHash poolId poolHash Right poolOfflineMeta' -> return $ PoolInserted poolId poolHash poolOfflineMeta'