Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Updated master #17

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
304 changes: 162 additions & 142 deletions euler-hs.cabal

Large diffs are not rendered by default.

295 changes: 271 additions & 24 deletions flake.lock

Large diffs are not rendered by default.

52 changes: 47 additions & 5 deletions flake.nix
Original file line number Diff line number Diff line change
Expand Up @@ -7,15 +7,27 @@
haskell-flake.follows = "common/haskell-flake";

# Haskell dependencies
sequelize.url = "github:juspay/haskell-sequelize/beckn-compatible";
cereal.url = "github:juspay/cereal/213f145ccbd99e630ee832d2f5b22894c810d3cc";
cereal.flake = false;

juspay-extra.url = "github:juspay/euler-haskell-common";
juspay-extra.inputs.haskell-flake.follows = "common/haskell-flake";

euler-events-hs.url = "github:juspay/euler-events-hs/main";
euler-events-hs.inputs.haskell-flake.follows = "common/haskell-flake";

sequelize.url = "github:juspay/haskell-sequelize/dc01b0f9e6ba5a51dd8f9d0744a549dc9c0ba244";
sequelize.flake = false;
beam.url = "github:srid/beam/ghc810"; # https://github.com/juspay/beam/pull/14

beam.url = "github:srid/beam/ghc810";
beam.flake = false;
beam-mysql.url = "github:juspay/beam-mysql/4c876ea2eae60bf3402d6f5c1ecb60a386fe3ace";

beam-mysql.url = "github:juspay/beam-mysql/b4dbc91276f6a8b5356633492f89bdac34ccd9a1";
beam-mysql.flake = false;

mysql-haskell.url = "github:juspay/mysql-haskell/788022d65538db422b02ecc0be138b862d2e5cee"; # https://github.com/winterland1989/mysql-haskell/pull/38
mysql-haskell.flake = false;
hedis.url = "github:juspay/hedis/22d814672d8476a6f8fb43047af2897afbf77ac6";
hedis.url = "git+https://github.com/juspay/hedis?rev=92a3d5ab73dcb0ea11139a01d6f2950a8b8e7e0e";
hedis.flake = false;
};
outputs = inputs@{ nixpkgs, flake-parts, ... }:
Expand All @@ -29,6 +41,10 @@
packages.default = self'.packages.euler-hs;
haskellProjects.default = {
projectFlakeName = "euler-hs";
imports = [
inputs.euler-events-hs.haskellFlakeProjectModules.output
inputs.juspay-extra.haskellFlakeProjectModules.output
];
basePackages = config.haskellProjects.ghc810.outputs.finalPackages;
packages = {
beam-core.source = inputs.beam + /beam-core;
Expand All @@ -39,6 +55,7 @@
hedis.source = inputs.hedis;
mysql-haskell.source = inputs.mysql-haskell;
sequelize.source = inputs.sequelize;
cereal.source = inputs.cereal;
};
settings = {
beam-core.jailbreak = true;
Expand All @@ -61,8 +78,33 @@
jailbreak = true;
};
sequelize.check = false;

cereal = {
check = false;
jailbreak = true;
};
euler-events-hs = {
check = false;
jailbreak = true;
};
juspay-extra = {
check = false;
jailbreak = true;
};
nonempty-containers = {
jailbreak = true;
};
servant-client = {
jailbreak = true;
};
servant-client-core = {
jailbreak = true;
};
servant-server = {
jailbreak = true;
};
};
};
};
};
}
}
281 changes: 281 additions & 0 deletions src/EulerHS/ART/DBReplay.hs
Original file line number Diff line number Diff line change
@@ -0,0 +1,281 @@
{- |
Module : EulerHS.ART.DBReplay
Copyright : (C) Juspay Technologies Pvt Ltd 2019-2022
License : Apache 2.0 (see the file LICENSE)
Maintainer : opensource@juspay.in
Stability : experimental
Portability : non-portable

This module contains interpreters and methods for running `Flow` scenarios.
-}

{-# LANGUAGE DerivingStrategies #-}
{-# LANGUAGE RankNTypes #-}
{-# LANGUAGE ScopedTypeVariables #-}
{-# LANGUAGE TypeApplications #-}

module EulerHS.ART.DBReplay where

import qualified Data.Aeson as A
import Data.Either.Extra (mapLeft)
import Data.Time.Clock.POSIX (getPOSIXTime)
import qualified Database.Beam as B
import qualified EulerHS.Language as L
import EulerHS.Prelude
import qualified EulerHS.SqlDB.Language as DB
import EulerHS.Types (DBConfig)
import qualified EulerHS.Types as T
import EulerHS.KVConnector.InMemConfig.Flow (searchInMemoryCache)
import Sequelize (Model, Set (..), Where)
import qualified Servant as S
import qualified Data.Serialize as Serialize
import EulerHS.ART.FlowUtils (addRecToState)
import qualified EulerHS.ART.EnvVars as Env
import EulerHS.KVConnector.Types (KVConnector(..), MeshResult, MeshMeta(..), tableName, Source(..))
import EulerHS.ART.Types (RunDBEntry(..), RecordingEntry(..),RunInMemEntry(..))
import EulerHS.KVConnector.Utils
import EulerHS.KVConnector.DBSync (whereClauseToJson)
import EulerHS.SqlDB.Types (BeamRunner, BeamRuntime)
import qualified EulerHS.ART.ReplayFunctions as ER
import EulerHS.KVDB.Types (MeshError(..))
import EulerHS.PIIEncryption (PII(..))
import qualified Data.ByteString.Lazy as BS

getCurrentDateInMillis :: (L.MonadFlow m) => m Int
getCurrentDateInMillis = L.runIO $ do
t <- (* 1000) <$> getPOSIXTime
pure . floor $ t

getLatencyInMicroSeconds :: Integer -> Integer
getLatencyInMicroSeconds execTime = execTime `div` 1000000

parseDataReplayList ::(FromJSON b,L.MonadFlow m) => BS.ByteString -> m (Either T.DBError [b])
parseDataReplayList res = do
let eReply = A.eitherDecode res :: (FromJSON b) => Either String (Either T.DBError [b])
case eReply of
Left err -> do
let errorMessage = "Failed to decode response: " <> encodeUtf8 err
L.throwException $ S.err400 {S.errBody = errorMessage}
Right reply -> pure reply

parseDataReplay ::(FromJSON b, L.MonadFlow m) => BS.ByteString -> m (Either MeshError b)
parseDataReplay res = do
let eReply = A.eitherDecode res :: (FromJSON b) => Either String (Either MeshError b)
case eReply of
Left err -> do
let errorMessage = "Failed to decode response: " <> encodeUtf8 err
L.throwException $ S.err400 {S.errBody = errorMessage}
Right reply -> pure reply

runWithArtFindALL ::
forall be beM table m.
(Model be table
, FromJSON (table Identity)
, ToJSON (table Identity)
, KVConnector (table Identity)
, MeshMeta be table
, L.MonadFlow m
) =>
DBConfig beM ->
Where be table ->
Text ->
m (Either T.DBError [table Identity]) ->
m (Either T.DBError [table Identity])
runWithArtFindALL _dbConf whereClause method hsDbFunc = do
do
if Env.isArtReplayEnabled
then do
recTimestamp <- L.getCurrentTimeUTC
msessionId <- L.getLoggerContext "x-request-id"
resp <- L.runIO $ ER.callBrahmaReplayDB (RunDBEntryT (RunDBEntry method A.Null (whereClauseToJson whereClause) (tableName @(table Identity)) (A.Null) recTimestamp)) msessionId
parseDataReplayList resp
else do
tmp_res <- hsDbFunc
when Env.isArtRecEnabled $ do
recTimestamp <- L.getCurrentTimeUTC
addRecToState $ RunDBEntryT (RunDBEntry method A.Null (whereClauseToJson whereClause) (tableName @(table Identity)) (toJSON tmp_res) recTimestamp)
pure tmp_res

runWithArtFindAllExtended ::
forall be beM table m.
(Model be table
, FromJSON (table Identity)
, ToJSON (table Identity)
, KVConnector (table Identity)
, MeshMeta be table
, L.MonadFlow m
) =>
DBConfig beM ->
DB.SqlDB beM [table Identity] ->
Where be table ->
Text ->
m (Either T.DBError [table Identity]) ->
m (Either T.DBError [table Identity])
runWithArtFindAllExtended _dbConf _query whereClause method hsDbFunc = do
do
if Env.isArtReplayEnabled
then do
recTimestamp <- L.getCurrentTimeUTC
msessionId <- L.getLoggerContext "x-request-id"
resp <- L.runIO $ ER.callBrahmaReplayDB (RunDBEntryT (RunDBEntry method A.Null (whereClauseToJson whereClause) (tableName @(table Identity)) (A.Null) recTimestamp)) msessionId
parseDataReplayList resp
else do
tmp_res <- hsDbFunc
when Env.isArtRecEnabled $ do
recTimestamp <- L.getCurrentTimeUTC
addRecToState $ RunDBEntryT (RunDBEntry method A.Null (whereClauseToJson whereClause) (tableName @(table Identity)) (toJSON tmp_res) recTimestamp)
pure tmp_res

runWithArtFind ::
forall be beM table m.
(Model be table
, KVConnector (table Identity)
, FromJSON (table Identity)
, ToJSON (table Identity)
, MeshMeta be table
, L.MonadFlow m
) =>
DBConfig beM ->
Where be table ->
Text ->
m (Either T.DBError (Maybe (table Identity))) ->
m (MeshResult (Maybe (table Identity)))
runWithArtFind _dbConf whereClause method hsDbFunc = do
do
if Env.isArtReplayEnabled
then do
recTimestamp <- L.getCurrentTimeUTC
msessionId <- L.getLoggerContext "x-request-id"
resp <- L.runIO $ ER.callBrahmaReplayDB (RunDBEntryT (RunDBEntry method A.Null (whereClauseToJson whereClause) (tableName @(table Identity)) (A.Null) recTimestamp)) msessionId
pure $
case A.decode resp of
Just val -> val
Nothing -> Right Nothing
else do
res <- hsDbFunc
when Env.isArtRecEnabled $ do
recTimestamp <- L.getCurrentTimeUTC
addRecToState $ RunDBEntryT (RunDBEntry method A.Null (whereClauseToJson whereClause) (tableName @(table Identity)) (toJSON res) recTimestamp)
pure $ mapLeft MDBError $ res

runWithArtUpdate ::
forall be beM a table m.
(Model be table
, FromJSON a
, ToJSON a
, KVConnector (table Identity)
, MeshMeta be table
, L.MonadFlow m
) =>
DBConfig beM ->
[Set be table] ->
Where be table ->
Text ->
m (T.DBResult a) ->
m (MeshResult a)
runWithArtUpdate _ setClause whereClause method hsDbFunc = do
do
if Env.isArtReplayEnabled
then do
recTimestamp <- L.getCurrentTimeUTC
msessionId <- L.getLoggerContext "x-request-id"
resp <- L.runIO $ ER.callBrahmaReplayDB (RunDBEntryT (RunDBEntry method (toJSON (jsonKeyValueUpdates setClause)) (whereClauseToJson whereClause) (tableName @(table Identity)) (A.Null) recTimestamp)) msessionId
parseDataReplay resp
else do
tmp_res <- hsDbFunc
when Env.isArtRecEnabled $ do
recTimestamp <- L.getCurrentTimeUTC
addRecToState $ RunDBEntryT (RunDBEntry method (toJSON (jsonKeyValueUpdates setClause)) (whereClauseToJson whereClause) (tableName @(table Identity)) (toJSON tmp_res) recTimestamp)
pure $ mapLeft MDBError $ tmp_res

runWithArtCreatemSQl ::
forall beM a table m.
( ToJSON (table Identity)
, FromJSON a
, ToJSON a
, KVConnector (table Identity)
, L.MonadFlow m
) =>
DBConfig beM ->
table Identity ->
Text ->
m (T.DBResult a) ->
m (MeshResult a)
runWithArtCreatemSQl _ value method hsDbFunc = do
do
if Env.isArtReplayEnabled
then do
recTimestamp <- L.getCurrentTimeUTC
msessionId <- L.getLoggerContext "x-request-id"
resp <- L.runIO $ ER.callBrahmaReplayDB (RunDBEntryT (RunDBEntry method (toJSON value) A.Null (tableName @(table Identity)) (A.Null) recTimestamp)) msessionId
parseDataReplay resp
else do
tmp_res <- hsDbFunc
when Env.isArtRecEnabled $ do
recTimestamp <- L.getCurrentTimeUTC
addRecToState $ RunDBEntryT (RunDBEntry method (toJSON value) A.Null (tableName @(table Identity)) (toJSON tmp_res) recTimestamp)
pure $ mapLeft MDBError $ tmp_res

runWithArtDelete ::
forall be beM a table m.
(Model be table
, FromJSON a
, ToJSON a
, KVConnector (table Identity)
, MeshMeta be table
, L.MonadFlow m
) =>
DBConfig beM ->
Where be table ->
Text ->
m (T.DBResult a) ->
m (MeshResult a)
runWithArtDelete _ whereClause method hsDbFunc = do
do
if Env.isArtReplayEnabled
then do
recTimestamp <- L.getCurrentTimeUTC
msessionId <- L.getLoggerContext "x-request-id"
resp <- L.runIO $ ER.callBrahmaReplayDB (RunDBEntryT (RunDBEntry method A.Null (whereClauseToJson whereClause) (tableName @(table Identity)) (A.Null) recTimestamp)) msessionId
parseDataReplay resp
else do
tmp_res <- hsDbFunc
when Env.isArtRecEnabled $ do
recTimestamp <- L.getCurrentTimeUTC
addRecToState $ RunDBEntryT (RunDBEntry method A.Null (whereClauseToJson whereClause) (tableName @(table Identity)) (toJSON tmp_res) recTimestamp)
pure $ mapLeft MDBError $ tmp_res

searchInMemoryCacheRecRepWrapper :: forall be beM table m.
(
BeamRuntime be beM,
BeamRunner beM,
B.HasQBuilder be,
HasCallStack,
KVConnector (table Identity),
ToJSON (table Identity),
Show (table Identity),
Serialize.Serialize (table Identity),
FromJSON (table Identity),
Model be table,
MeshMeta be table,
PII table,
L.MonadFlow m
) => Text ->
DBConfig beM ->
Where be table ->
m (Source, MeshResult (Maybe (table Identity)))
searchInMemoryCacheRecRepWrapper method dbConf whereClause = do
if Env.isArtReplayEnabled
then do
recTimestamp <- L.getCurrentTimeUTC
let recInmem = RunInMemEntryT (RunInMemEntry method A.Null (whereClauseToJson whereClause) (toJSON $ tableName @(table Identity)) (Left A.Null) recTimestamp)
msessionId <- L.getLoggerContext "x-request-id"
resp <- L.runIO $ ER.callBrahmaReplayDB recInmem msessionId
meshRes <- parseDataReplay resp
pure (IN_MEM,meshRes)
else do
(src,meshResult) <- searchInMemoryCache dbConf whereClause
when Env.isArtRecEnabled $ do
recTimestamp <- L.getCurrentTimeUTC
addRecToState $ RunInMemEntryT (RunInMemEntry method A.Null (whereClauseToJson whereClause) (toJSON $ tableName @(table Identity)) (either (Left . toJSON) (Right . toJSON) meshResult) recTimestamp)
pure (src,meshResult)