Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
126 changes: 71 additions & 55 deletions app/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,6 @@ import (
"github.com/sei-protocol/sei-chain/sei-cosmos/codec"
"github.com/sei-protocol/sei-chain/sei-cosmos/codec/types"
cryptotypes "github.com/sei-protocol/sei-chain/sei-cosmos/crypto/types"
"github.com/sei-protocol/sei-chain/sei-cosmos/server"
"github.com/sei-protocol/sei-chain/sei-cosmos/server/api"
"github.com/sei-protocol/sei-chain/sei-cosmos/server/config"
servertypes "github.com/sei-protocol/sei-chain/sei-cosmos/server/types"
Expand Down Expand Up @@ -121,7 +120,6 @@ import (
gigautils "github.com/sei-protocol/sei-chain/giga/executor/utils"
"github.com/sei-protocol/sei-chain/precompiles"
putils "github.com/sei-protocol/sei-chain/precompiles/utils"
ssconfig "github.com/sei-protocol/sei-chain/sei-db/config"
"github.com/sei-protocol/sei-chain/sei-ibc-go/modules/apps/transfer"
ibctransferkeeper "github.com/sei-protocol/sei-chain/sei-ibc-go/modules/apps/transfer/keeper"
ibctransfertypes "github.com/sei-protocol/sei-chain/sei-ibc-go/modules/apps/transfer/types"
Expand Down Expand Up @@ -683,10 +681,10 @@ func New(
wasmOpts...,
)

receiptStorePath := filepath.Join(homePath, "data", "receipt.db")
receiptConfig := ssconfig.DefaultReceiptStoreConfig()
receiptConfig.DBDirectory = receiptStorePath
receiptConfig.KeepRecent = cast.ToInt(appOpts.Get(server.FlagMinRetainBlocks))
receiptConfig, err := readReceiptStoreConfig(homePath, appOpts)
if err != nil {
panic(fmt.Sprintf("error reading receipt store config: %s", err))
}
if app.receiptStore == nil {
receiptStore, err := receipt.NewReceiptStore(receiptConfig, keys[evmtypes.StoreKey])
if err != nil {
Expand Down Expand Up @@ -758,7 +756,7 @@ func New(
if gigaExecutorConfig.OCCEnabled {
logger.Info("benchmark: Giga Executor with OCC is ENABLED - using new EVM execution path with parallel execution")
} else {
logger.Info("benchmark: Giga Executor (evmone-based) is ENABLED - using new EVM execution path (sequential)")
logger.Info("benchmark: Giga Executor is ENABLED - using new EVM execution path (sequential)")
}
} else {
logger.Info("benchmark: Giga Executor is DISABLED - using default GETH interpreter")
Expand Down Expand Up @@ -1560,71 +1558,89 @@ func (app *App) ProcessTXsWithOCCV2(ctx sdk.Context, txs [][]byte, typedTxs []sd
func (app *App) ProcessTXsWithOCCGiga(ctx sdk.Context, txs [][]byte, typedTxs []sdk.Tx) ([]*abci.ExecTxResult, sdk.Context) {
evmEntries := make([]*sdk.DeliverTxEntry, 0, len(txs))
v2Entries := make([]*sdk.DeliverTxEntry, 0, len(txs))
firstCosmosSeen := false
for txIndex, tx := range txs {
if app.GetEVMMsg(typedTxs[txIndex]) != nil {
if firstCosmosSeen {
logger.Error("Giga OCC cannot execute block due to tx ordering, falling back to V2")
// Oops! This isn't "all EVM txs, then all Cosmos txs" - we need to fallback to V2.
return app.ProcessTXsWithOCCV2(ctx, txs, typedTxs)
}

evmEntries = append(evmEntries, app.GetDeliverTxEntry(ctx, txIndex, tx, typedTxs[txIndex]))
} else {
if !firstCosmosSeen {
firstCosmosSeen = true
}
v2Entries = append(v2Entries, app.GetDeliverTxEntry(ctx, txIndex, tx, typedTxs[txIndex]))
}
}

// Run EVM txs against a cache so we can discard all changes on fallback.
evmCtx, evmCache := app.CacheContext(ctx)
var evmBatchResult []abci.ResponseDeliverTx
fallbackToV2 := false

// Cache block-level constants (identical for all txs in this block).
// Must use evmCtx (not ctx) because giga KV stores are registered in CacheContext.
cache, cacheErr := newGigaBlockCache(evmCtx, &app.GigaEvmKeeper)
if cacheErr != nil {
logger.Error("failed to build giga block cache", "error", cacheErr, "height", ctx.BlockHeight())
return nil, ctx
}
if len(evmEntries) > 0 {
// Run EVM txs against a cache so we can discard all changes on fallback.
evmCtx, evmCache := app.CacheContext(ctx)

// Create OCC scheduler with giga executor deliverTx capturing the cache.
evmScheduler := tasks.NewScheduler(
app.ConcurrencyWorkers(),
app.TracingInfo,
app.makeGigaDeliverTx(cache),
)
// Cache block-level constants (identical for all txs in this block).
// Must use evmCtx (not ctx) because giga KV stores are registered in CacheContext.
cache, cacheErr := newGigaBlockCache(evmCtx, &app.GigaEvmKeeper)
if cacheErr != nil {
logger.Error("failed to build giga block cache", "error", cacheErr, "height", ctx.BlockHeight())
return nil, ctx
}

evmBatchResult, evmSchedErr := evmScheduler.ProcessAll(evmCtx, evmEntries)
if evmSchedErr != nil {
// TODO: DeliverTxBatch panics in this case
// TODO: detect if it was interop, and use v2 if so
logger.Error("benchmark OCC scheduler error (EVM txs)", "error", evmSchedErr, "height", ctx.BlockHeight(), "txCount", len(evmEntries))
return nil, ctx
}
// Create OCC scheduler with giga executor deliverTx capturing the cache.
evmScheduler := tasks.NewScheduler(
app.ConcurrencyWorkers(),
app.TracingInfo,
app.makeGigaDeliverTx(cache),
)

fallbackToV2 := false
for _, r := range evmBatchResult {
if r.Code == gigautils.GigaAbortCode && r.Codespace == gigautils.GigaAbortCodespace {
fallbackToV2 = true
break
var evmSchedErr error
evmBatchResult, evmSchedErr = evmScheduler.ProcessAll(evmCtx, evmEntries)
if evmSchedErr != nil {
logger.Error("benchmark OCC scheduler error (EVM txs)", "error", evmSchedErr, "height", ctx.BlockHeight(), "txCount", len(evmEntries))
return nil, ctx
}
}

if fallbackToV2 {
metrics.IncrGigaFallbackToV2Counter()
// Discard all EVM changes by skipping cache writes, then re-run all txs via DeliverTx.
evmBatchResult = nil
v2Entries = make([]*sdk.DeliverTxEntry, len(txs))
for txIndex, tx := range txs {
v2Entries[txIndex] = app.GetDeliverTxEntry(ctx, txIndex, tx, typedTxs[txIndex])
for _, r := range evmBatchResult {
if r.Code == gigautils.GigaAbortCode && r.Codespace == gigautils.GigaAbortCodespace {
fallbackToV2 = true
break
}
}

if fallbackToV2 {
metrics.IncrGigaFallbackToV2Counter()
// Discard all EVM changes by skipping cache writes, then re-run all txs via DeliverTx.
evmBatchResult = nil
v2Entries = make([]*sdk.DeliverTxEntry, len(txs))
for txIndex, tx := range txs {
v2Entries[txIndex] = app.GetDeliverTxEntry(ctx, txIndex, tx, typedTxs[txIndex])
}
} else {
// Commit EVM cache to main store before processing non-EVM txs.
evmCache.Write()
evmCtx.GigaMultiStore().WriteGiga()
}
} else {
// Commit EVM cache to main store before processing non-EVM txs.
evmCache.Write()
evmCtx.GigaMultiStore().WriteGiga()
}

v2Scheduler := tasks.NewScheduler(
app.ConcurrencyWorkers(),
app.TracingInfo,
app.DeliverTx,
)
v2BatchResult, v2SchedErr := v2Scheduler.ProcessAll(ctx, v2Entries)
if v2SchedErr != nil {
logger.Error("benchmark OCC scheduler error", "error", v2SchedErr, "height", ctx.BlockHeight(), "txCount", len(v2Entries))
return nil, ctx
var v2BatchResult []abci.ResponseDeliverTx

if len(v2Entries) > 0 {
v2Scheduler := tasks.NewScheduler(
app.ConcurrencyWorkers(),
app.TracingInfo,
app.DeliverTx,
)
var v2SchedErr error
v2BatchResult, v2SchedErr = v2Scheduler.ProcessAll(ctx, v2Entries)
if v2SchedErr != nil {
logger.Error("benchmark OCC scheduler error", "error", v2SchedErr, "height", ctx.BlockHeight(), "txCount", len(v2Entries))
return nil, ctx
}
}

execResults := make([]*abci.ExecTxResult, 0, len(evmBatchResult)+len(v2BatchResult))
Expand Down
26 changes: 26 additions & 0 deletions app/receipt_store_config.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
package app

import (
"path/filepath"

seidbconfig "github.com/sei-protocol/sei-chain/sei-db/config"
)

const (
receiptStoreBackendKey = "receipt-store.rs-backend"
receiptStoreDBDirectoryKey = "receipt-store.db-directory"
receiptStoreAsyncWriteBufferKey = "receipt-store.async-write-buffer"
receiptStoreKeepRecentKey = "receipt-store.keep-recent"
receiptStorePruneIntervalSecondsKey = "receipt-store.prune-interval-seconds"
)

func readReceiptStoreConfig(homePath string, appOpts seidbconfig.AppOptions) (seidbconfig.ReceiptStoreConfig, error) {
receiptConfig, err := seidbconfig.ReadReceiptConfig(appOpts)
if err != nil {
return receiptConfig, err
}
if receiptConfig.DBDirectory == "" {
receiptConfig.DBDirectory = filepath.Join(homePath, "data", "receipt.db")
}
return receiptConfig, nil
}
3 changes: 3 additions & 0 deletions app/seidb.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ const (
FlagSCHistoricalProofBurst = "state-commit.sc-historical-proof-burst"
FlagSCWriteMode = "state-commit.sc-write-mode"
FlagSCReadMode = "state-commit.sc-read-mode"
FlagSCEnableLatticeHash = "state-commit.sc-enable-lattice-hash"

// SS Store configs
FlagSSEnable = "state-store.ss-enable"
Expand Down Expand Up @@ -118,6 +119,8 @@ func parseSCConfigs(appOpts servertypes.AppOptions) config.StateCommitConfig {
scConfig.ReadMode = parsedRM
}

scConfig.EnableLatticeHash = cast.ToBool(appOpts.Get(FlagSCEnableLatticeHash))

if v := appOpts.Get(FlagSCHistoricalProofMaxInFlight); v != nil {
scConfig.HistoricalProofMaxInFlight = cast.ToInt(v)
}
Expand Down
83 changes: 83 additions & 0 deletions app/seidb_test.go
Original file line number Diff line number Diff line change
@@ -1,10 +1,14 @@
package app

import (
"path/filepath"
"testing"

"github.com/sei-protocol/sei-chain/sei-cosmos/server"
"github.com/sei-protocol/sei-chain/sei-db/config"
"github.com/sei-protocol/sei-chain/sei-db/ledger_db/receipt"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)

type TestSeiDBAppOpts struct {
Expand All @@ -13,6 +17,7 @@ type TestSeiDBAppOpts struct {
func (t TestSeiDBAppOpts) Get(s string) interface{} {
defaultSCConfig := config.DefaultStateCommitConfig()
defaultSSConfig := config.DefaultStateStoreConfig()
defaultReceiptConfig := config.DefaultReceiptStoreConfig()
switch s {
case FlagSCEnable:
return defaultSCConfig.Enable
Expand All @@ -32,6 +37,8 @@ func (t TestSeiDBAppOpts) Get(s string) interface{} {
return defaultSCConfig.MemIAVLConfig.SnapshotPrefetchThreshold
case FlagSCSnapshotWriteRateMBps:
return defaultSCConfig.MemIAVLConfig.SnapshotWriteRateMBps
case FlagSCEnableLatticeHash:
return defaultSCConfig.EnableLatticeHash
case FlagSSEnable:
return defaultSSConfig.Enable
case FlagSSBackend:
Expand All @@ -46,6 +53,8 @@ func (t TestSeiDBAppOpts) Get(s string) interface{} {
return defaultSSConfig.PruneIntervalSeconds
case FlagSSImportNumWorkers:
return defaultSSConfig.ImportNumWorkers
case receiptStoreBackendKey:
return defaultReceiptConfig.Backend
case FlagEVMSSDirectory:
return defaultSSConfig.EVMDBDirectory
case FlagEVMSSWriteMode:
Expand All @@ -61,8 +70,11 @@ func TestNewDefaultConfig(t *testing.T) {
appOpts := TestSeiDBAppOpts{}
scConfig := parseSCConfigs(appOpts)
ssConfig := parseSSConfigs(appOpts)
receiptConfig, err := config.ReadReceiptConfig(appOpts)
assert.NoError(t, err)
assert.Equal(t, scConfig, config.DefaultStateCommitConfig())
assert.Equal(t, ssConfig, config.DefaultStateStoreConfig())
assert.Equal(t, receiptConfig, config.DefaultReceiptStoreConfig())
}

type mapAppOpts map[string]interface{}
Expand All @@ -85,3 +97,74 @@ func TestParseSCConfigs_HistoricalProofFlags(t *testing.T) {
assert.Equal(t, 12.5, scConfig.HistoricalProofRateLimit)
assert.Equal(t, 3, scConfig.HistoricalProofBurst)
}

func TestParseReceiptConfigs_DefaultsToPebbleWhenUnset(t *testing.T) {
receiptConfig, err := config.ReadReceiptConfig(mapAppOpts{})
assert.NoError(t, err)
assert.Equal(t, config.DefaultReceiptStoreConfig(), receiptConfig)
}

func TestParseReceiptConfigs_UsesConfiguredBackend(t *testing.T) {
receiptConfig, err := config.ReadReceiptConfig(mapAppOpts{
receiptStoreBackendKey: "parquet",
})
assert.NoError(t, err)
assert.Equal(t, "parquet", receiptConfig.Backend)
assert.Equal(t, config.DefaultReceiptStoreConfig().AsyncWriteBuffer, receiptConfig.AsyncWriteBuffer)
assert.Equal(t, config.DefaultReceiptStoreConfig().KeepRecent, receiptConfig.KeepRecent)
}

func TestParseReceiptConfigs_UsesConfiguredValues(t *testing.T) {
receiptConfig, err := config.ReadReceiptConfig(mapAppOpts{
receiptStoreDBDirectoryKey: "/tmp/custom-receipt-db",
receiptStoreBackendKey: "parquet",
receiptStoreAsyncWriteBufferKey: 7,
receiptStoreKeepRecentKey: 42,
receiptStorePruneIntervalSecondsKey: 9,
})
assert.NoError(t, err)
assert.Equal(t, "/tmp/custom-receipt-db", receiptConfig.DBDirectory)
assert.Equal(t, "parquet", receiptConfig.Backend)
assert.Equal(t, 7, receiptConfig.AsyncWriteBuffer)
assert.Equal(t, 42, receiptConfig.KeepRecent)
assert.Equal(t, 9, receiptConfig.PruneIntervalSeconds)
}

func TestParseReceiptConfigs_RejectsInvalidBackend(t *testing.T) {
_, err := config.ReadReceiptConfig(mapAppOpts{
receiptStoreBackendKey: "rocksdb",
})
assert.Error(t, err)
assert.Contains(t, err.Error(), "unsupported receipt-store backend")
assert.Contains(t, err.Error(), "rocksdb")
}

func TestReadReceiptStoreConfigUsesConfiguredValues(t *testing.T) {
homePath := t.TempDir()
receiptConfig, err := readReceiptStoreConfig(homePath, mapAppOpts{
receiptStoreDBDirectoryKey: "/tmp/custom-receipt-db",
receiptStoreKeepRecentKey: 5,
server.FlagMinRetainBlocks: 100,
})
require.NoError(t, err)
assert.Equal(t, "/tmp/custom-receipt-db", receiptConfig.DBDirectory)
assert.Equal(t, 5, receiptConfig.KeepRecent)
}

func TestReadReceiptStoreConfigUsesDefaultDirectoryWhenUnset(t *testing.T) {
homePath := t.TempDir()
receiptConfig, err := readReceiptStoreConfig(homePath, mapAppOpts{})
require.NoError(t, err)
assert.Equal(t, filepath.Join(homePath, "data", "receipt.db"), receiptConfig.DBDirectory)
}

// TestFullAppPathWithParquetReceiptStore exercises the full app.New path with rs-backend = "parquet"
// and asserts the parquet receipt store is actually instantiated (not pebble).
func TestFullAppPathWithParquetReceiptStore(t *testing.T) {
app := SetupWithScReceiptFromOpts(t, false, false, TestAppOpts{
UseSc: true,
ReceiptBackend: "parquet",
})
require.NotNil(t, app.receiptStore, "receipt store should be created")
assert.Equal(t, "parquet", receipt.BackendTypeName(app.receiptStore), "receipt store backend should be parquet")
}
Loading
Loading