Skip to content

Commit

Permalink
Merge pull request #267 from multiversx/events-execution-order
Browse files Browse the repository at this point in the history
Execution order events index
  • Loading branch information
miiu96 committed Apr 8, 2024
2 parents 3cda5ea + 7db8387 commit 56519cb
Show file tree
Hide file tree
Showing 17 changed files with 116 additions and 63 deletions.
1 change: 1 addition & 0 deletions data/event.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ type LogEvent struct {
AdditionalData []string `json:"additionalData,omitempty"`
Topics []string `json:"topics"`
Order int `json:"order"`
TxOrder int `json:"txOrder"`
ShardID uint32 `json:"shardID"`
Timestamp time.Duration `json:"timestamp,omitempty"`
}
1 change: 1 addition & 0 deletions data/scresult.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ type ScResult struct {
CanBeIgnored bool `json:"canBeIgnored,omitempty"`
OriginalSender string `json:"originalSender,omitempty"`
HasLogs bool `json:"hasLogs,omitempty"`
ExecutionOrder int `json:"-"`
SenderAddressBytes []byte `json:"-"`
InitialTxGasUsed uint64 `json:"-"`
InitialTxFee string `json:"-"`
Expand Down
5 changes: 3 additions & 2 deletions data/transaction.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,8 @@ import (
)

// Transaction is a structure containing all the fields that need
// to be saved for a transaction. It has all the default fields
// plus some extra information for ease of search and filter
// to be saved for a transaction. It has all the default fields
// plus some extra information for ease of search and filter
type Transaction struct {
MBHash string `json:"miniBlockHash"`
Nonce uint64 `json:"nonce"`
Expand Down Expand Up @@ -48,6 +48,7 @@ type Transaction struct {
GuardianSignature string `json:"guardianSignature,omitempty"`
ErrorEvent bool `json:"errorEvent,omitempty"`
CompletedEvent bool `json:"completedEvent,omitempty"`
ExecutionOrder int `json:"-"`
SmartContractResults []*ScResult `json:"-"`
Hash string `json:"-"`
BlockHash string `json:"-"`
Expand Down
1 change: 0 additions & 1 deletion docker-compose.yml
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
version: "3.0"
services:
elasticsearch:
container_name: es-container
Expand Down
36 changes: 31 additions & 5 deletions integrationtests/logsCrossShard_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,13 +30,21 @@ func TestIndexLogSourceShardAndAfterDestinationAndAgainSource(t *testing.T) {
Round: 50,
TimeStamp: 5040,
}
body := &dataBlock.Body{}

txHash := []byte("cross-log")
logID := hex.EncodeToString(txHash)

body := &dataBlock.Body{
MiniBlocks: []*dataBlock.MiniBlock{
{
TxHashes: [][]byte{txHash},
},
},
}

address1 := "erd1ju8pkvg57cwdmjsjx58jlmnuf4l9yspstrhr9tgsrt98n9edpm2qtlgy99"
address2 := "erd1w7jyzuj6cv4ngw8luhlkakatjpmjh3ql95lmxphd3vssc4vpymks6k5th7"

logID := hex.EncodeToString([]byte("cross-log"))

// index on source
pool := &outport.TransactionPool{
Logs: []*outport.LogData{
Expand All @@ -55,6 +63,12 @@ func TestIndexLogSourceShardAndAfterDestinationAndAgainSource(t *testing.T) {
},
},
},
Transactions: map[string]*outport.TxInfo{
logID: {
Transaction: &transaction.Transaction{},
ExecutionOrder: 0,
},
},
}
err = esProc.SaveTransactions(createOutportBlockWithHeader(body, header, pool, map[string]*alteredAccount.AlteredAccount{}, testNumOfShards))
require.Nil(t, err)
Expand All @@ -68,7 +82,7 @@ func TestIndexLogSourceShardAndAfterDestinationAndAgainSource(t *testing.T) {
string(genericResponse.Docs[0].Source),
)

event1ID := "75dcc2d7542c8a8be1006dd2d0f8e847c00cea5e55b6b8a53e0a5483e73f4431"
event1ID := logID + "-0-0"
ids = []string{event1ID}
err = esClient.DoMultiGet(context.Background(), ids, indexerdata.EventsIndex, true, genericResponse)
require.Nil(t, err)
Expand Down Expand Up @@ -106,6 +120,12 @@ func TestIndexLogSourceShardAndAfterDestinationAndAgainSource(t *testing.T) {
},
},
},
Transactions: map[string]*outport.TxInfo{
logID: {
Transaction: &transaction.Transaction{},
ExecutionOrder: 0,
},
},
}
err = esProc.SaveTransactions(createOutportBlockWithHeader(body, header, pool, map[string]*alteredAccount.AlteredAccount{}, testNumOfShards))
require.Nil(t, err)
Expand All @@ -118,7 +138,7 @@ func TestIndexLogSourceShardAndAfterDestinationAndAgainSource(t *testing.T) {
string(genericResponse.Docs[0].Source),
)

event2ID, event3ID := "c7d0e7abaaf188655537da1ed642b151182aa64bbe3fed316198208bf089713a", "3a6f93093be7b045938a2a03e45a059af602331602f63a45e5aec3866d3df126"
event2ID, event3ID := logID+"-1-0", logID+"-1-1"
ids = []string{event2ID, event3ID}
err = esClient.DoMultiGet(context.Background(), ids, indexerdata.EventsIndex, true, genericResponse)
require.Nil(t, err)
Expand Down Expand Up @@ -153,6 +173,12 @@ func TestIndexLogSourceShardAndAfterDestinationAndAgainSource(t *testing.T) {
},
},
},
Transactions: map[string]*outport.TxInfo{
logID: {
Transaction: &transaction.Transaction{},
ExecutionOrder: 0,
},
},
}
err = esProc.SaveTransactions(createOutportBlockWithHeader(body, header, pool, map[string]*alteredAccount.AlteredAccount{}, testNumOfShards))
require.Nil(t, err)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,5 +9,6 @@
"shardID": 1,
"txHash": "63726f73732d6c6f67",
"order": 1,
"timestamp": 6040
"timestamp": 6040,
"txOrder": 0
}
Original file line number Diff line number Diff line change
Expand Up @@ -10,5 +10,6 @@
"shardID": 1,
"txHash": "63726f73732d6c6f67",
"order": 0,
"timestamp": 6040
"timestamp": 6040,
"txOrder": 0
}
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
"01"
],
"shardID": 0,
"txOrder": 0,
"txHash": "63726f73732d6c6f67",
"order": 0,
"timestamp": 5040
Expand Down
2 changes: 1 addition & 1 deletion process/elasticproc/converters/field.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ func TruncateFieldIfExceedsMaxLengthBase64(field string) string {
return field
}

//TruncateSliceElementsIfExceedsMaxLength will truncate the provided slice of the field if the max length is exceeded
// TruncateSliceElementsIfExceedsMaxLength will truncate the provided slice of the field if the max length is exceeded
func TruncateSliceElementsIfExceedsMaxLength(fields []string) []string {
var localFields []string
for _, field := range fields {
Expand Down
17 changes: 12 additions & 5 deletions process/elasticproc/elasticProcessor.go
Original file line number Diff line number Diff line change
Expand Up @@ -518,18 +518,25 @@ func (ei *elasticProcessor) indexTransactionsFeeData(txsHashFeeData map[string]*
}

func (ei *elasticProcessor) prepareAndIndexLogs(logsAndEvents []*outport.LogData, timestamp uint64, buffSlice *data.BufferSlice, shardID uint32) error {
logsDB, eventsDB := ei.logsAndEventsProc.PrepareLogsForDB(logsAndEvents, timestamp, shardID)
err := ei.indexEvents(eventsDB, buffSlice)
if err != nil {
return err
}

if !ei.isIndexEnabled(elasticIndexer.LogsIndex) {
return nil
}

logsDB, eventsDB := ei.logsAndEventsProc.PrepareLogsForDB(logsAndEvents, timestamp, shardID)
return ei.logsAndEventsProc.SerializeLogs(logsDB, buffSlice, elasticIndexer.LogsIndex)
}

err := ei.logsAndEventsProc.SerializeEvents(eventsDB, buffSlice, elasticIndexer.EventsIndex)
if err != nil {
return err
func (ei *elasticProcessor) indexEvents(eventsDB []*data.LogEvent, buffSlice *data.BufferSlice) error {
if !ei.isIndexEnabled(elasticIndexer.EventsIndex) {
return nil
}

return ei.logsAndEventsProc.SerializeLogs(logsDB, buffSlice, elasticIndexer.LogsIndex)
return ei.logsAndEventsProc.SerializeEvents(eventsDB, buffSlice, elasticIndexer.EventsIndex)
}

func (ei *elasticProcessor) indexScDeploys(deployData map[string]*data.ScDeployInfo, changeOwnerOperation map[string]*data.OwnerData, buffSlice *data.BufferSlice) error {
Expand Down
50 changes: 27 additions & 23 deletions process/elasticproc/logsevents/logsAndEventsProcessor.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ package logsevents

import (
"encoding/hex"
"encoding/json"
"fmt"
"time"

"github.com/multiversx/mx-chain-core-go/core"
Expand All @@ -16,6 +16,8 @@ import (
"github.com/multiversx/mx-chain-es-indexer-go/process/dataindexer"
)

const eventIDFormat = "%s-%d-%d"

// ArgsLogsAndEventsProcessor holds all dependencies required to create new instances of logsAndEventsProcessor
type ArgsLogsAndEventsProcessor struct {
PubKeyConverter core.PubkeyConverter
Expand Down Expand Up @@ -46,7 +48,6 @@ func NewLogsAndEventsProcessor(args ArgsLogsAndEventsProcessor) (*logsAndEventsP
pubKeyConverter: args.PubKeyConverter,
eventsProcessors: eventsProcessors,
hasher: args.Hasher,
marshaller: args.Marshalizer,
}, nil
}

Expand Down Expand Up @@ -243,18 +244,14 @@ func (lep *logsAndEventsProcessor) prepareLogsForDB(
}
logsDB.Events = append(logsDB.Events, logEvent)

dbEvent, ok := lep.prepareLogEvent(logsDB, logEvent, shardID)
if !ok {
continue
}

dbEvents = append(dbEvents, dbEvent)
executionOrder := lep.getExecutionOrder(logHashHex)
dbEvents = append(dbEvents, lep.prepareLogEvent(logsDB, logEvent, shardID, executionOrder))
}

return logsDB, dbEvents
}

func (lep *logsAndEventsProcessor) prepareLogEvent(dbLog *data.Logs, event *data.Event, shardID uint32) (*data.LogEvent, bool) {
func (lep *logsAndEventsProcessor) prepareLogEvent(dbLog *data.Logs, event *data.Event, shardID uint32, execOrder int) *data.LogEvent {
dbEvent := &data.LogEvent{
TxHash: dbLog.ID,
LogAddress: dbLog.Address,
Expand All @@ -265,22 +262,13 @@ func (lep *logsAndEventsProcessor) prepareLogEvent(dbLog *data.Logs, event *data
Topics: hexEncodeSlice(event.Topics),
Order: event.Order,
ShardID: shardID,
TxOrder: execOrder,
OriginalTxHash: dbLog.OriginalTxHash,
Timestamp: dbLog.Timestamp,
ID: fmt.Sprintf(eventIDFormat, dbLog.ID, shardID, event.Order),
}

dbEventBytes, err := json.Marshal(dbEvent)
if err != nil {
log.Warn("cannot marshal event",
"txHash", dbLog.ID,
"order", event.Order,
"error", err,
)
}

dbEvent.OriginalTxHash = dbLog.OriginalTxHash
dbEvent.Timestamp = dbLog.Timestamp
dbEvent.ID = hex.EncodeToString(lep.hasher.Compute(string(dbEventBytes)))

return dbEvent, true
return dbEvent
}

func (lep *logsAndEventsProcessor) getOriginalTxHash(logHashHex string) string {
Expand All @@ -296,6 +284,22 @@ func (lep *logsAndEventsProcessor) getOriginalTxHash(logHashHex string) string {
return ""
}

func (lep *logsAndEventsProcessor) getExecutionOrder(logHashHex string) int {
tx, ok := lep.logsData.txsMap[logHashHex]
if ok {
return tx.ExecutionOrder
}

scr, ok := lep.logsData.scrsMap[logHashHex]
if ok {
return scr.ExecutionOrder
}

log.Warn("cannot find hash in the txs map or scrs map", "hash", logHashHex)

return -1
}

func hexEncodeSlice(input [][]byte) []string {
hexEncoded := make([]string, 0, len(input))
for idx := 0; idx < len(input); idx++ {
Expand Down
6 changes: 4 additions & 2 deletions process/elasticproc/logsevents/logsAndEventsProcessor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -338,7 +338,7 @@ func TestPrepareLogsAndEvents_LogEvents(t *testing.T) {
_, eventsDB := proc.PrepareLogsForDB(logsAndEvents, 1234, 1)
require.Equal(t, []*data.LogEvent{
{
ID: "df358a19d2ed48f29c7fdba5132da589176cc6bb698cd8ee084b4efd6e8a86b3",
ID: "747848617368-1-0",
TxHash: "747848617368",
OriginalTxHash: "originalHash",
LogAddress: "61646472657373",
Expand All @@ -349,9 +349,10 @@ func TestPrepareLogsAndEvents_LogEvents(t *testing.T) {
Order: 0,
ShardID: 1,
Timestamp: 1234,
TxOrder: 0,
},
{
ID: "cd4f37eff9d15471034bbaf0886fcf62fa00eecf59410be9bdd2be8d36bab42a",
ID: "747848617368-1-1",
TxHash: "747848617368",
OriginalTxHash: "originalHash",
LogAddress: "61646472657373",
Expand All @@ -363,6 +364,7 @@ func TestPrepareLogsAndEvents_LogEvents(t *testing.T) {
Order: 1,
ShardID: 1,
Timestamp: 1234,
TxOrder: 0,
},
}, eventsDB)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -189,6 +189,7 @@ func (proc *smartContractResultsProcessor) prepareSmartContractResult(
OriginalSender: originalSenderAddr,
InitialTxFee: feeInfo.Fee.String(),
InitialTxGasUsed: feeInfo.GasUsed,
ExecutionOrder: int(scrInfo.ExecutionOrder),
}
}

Expand Down
40 changes: 21 additions & 19 deletions process/elasticproc/transactions/transactionDBBuilder.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ import (
"github.com/multiversx/mx-chain-core-go/data/block"
"github.com/multiversx/mx-chain-core-go/data/outport"
"github.com/multiversx/mx-chain-core-go/data/receipt"
"github.com/multiversx/mx-chain-core-go/data/rewardTx"
"github.com/multiversx/mx-chain-es-indexer-go/data"
"github.com/multiversx/mx-chain-es-indexer-go/process/dataindexer"
"github.com/multiversx/mx-chain-es-indexer-go/process/elasticproc/converters"
Expand Down Expand Up @@ -122,17 +121,19 @@ func (dtb *dbTransactionBuilder) prepareTransaction(
Version: tx.Version,
GuardianAddress: guardianAddress,
GuardianSignature: hex.EncodeToString(tx.GuardianSignature),
ExecutionOrder: int(txInfo.ExecutionOrder),
}
}

func (dtb *dbTransactionBuilder) prepareRewardTransaction(
rTx *rewardTx.RewardTx,
rTxInfo *outport.RewardInfo,
txHash []byte,
mbHash []byte,
mb *block.MiniBlock,
header coreData.HeaderHandler,
txStatus string,
) *data.Transaction {
rTx := rTxInfo.Reward
valueNum, err := dtb.balanceConverter.ConvertBigValueToFloat(rTx.Value)
if err != nil {
log.Warn("dbTransactionBuilder.prepareRewardTransaction cannot compute value as num", "value", rTx.Value,
Expand All @@ -142,23 +143,24 @@ func (dtb *dbTransactionBuilder) prepareRewardTransaction(
receiverAddr := dtb.addressPubkeyConverter.SilentEncode(rTx.RcvAddr, log)

return &data.Transaction{
Hash: hex.EncodeToString(txHash),
MBHash: hex.EncodeToString(mbHash),
Nonce: 0,
Round: rTx.Round,
Value: rTx.Value.String(),
ValueNum: valueNum,
Receiver: receiverAddr,
Sender: fmt.Sprintf("%d", core.MetachainShardId),
ReceiverShard: mb.ReceiverShardID,
SenderShard: mb.SenderShardID,
GasPrice: 0,
GasLimit: 0,
Data: make([]byte, 0),
Signature: "",
Timestamp: time.Duration(header.GetTimeStamp()),
Status: txStatus,
Operation: rewardsOperation,
Hash: hex.EncodeToString(txHash),
MBHash: hex.EncodeToString(mbHash),
Nonce: 0,
Round: rTx.Round,
Value: rTx.Value.String(),
ValueNum: valueNum,
Receiver: receiverAddr,
Sender: fmt.Sprintf("%d", core.MetachainShardId),
ReceiverShard: mb.ReceiverShardID,
SenderShard: mb.SenderShardID,
GasPrice: 0,
GasLimit: 0,
Data: make([]byte, 0),
Signature: "",
Timestamp: time.Duration(header.GetTimeStamp()),
Status: txStatus,
Operation: rewardsOperation,
ExecutionOrder: int(rTxInfo.ExecutionOrder),
}
}

Expand Down

0 comments on commit 56519cb

Please sign in to comment.