From 4794f45006aff0287bc6cf4630910a1ec3a01fbd Mon Sep 17 00:00:00 2001 From: jeff <113397187+cyberhorsey@users.noreply.github.com> Date: Thu, 22 Jun 2023 02:22:20 -0700 Subject: [PATCH] fix(relayer): Relayer reorg (#14033) --- packages/relayer/event.go | 5 +++++ .../indexer/detect_and_handle_reorg.go | 9 +++++++-- ..._events_table_add_msg_hash_event_index.sql | 9 +++++++++ packages/relayer/mock/event_repository.go | 14 +++++++++++++ packages/relayer/repo/event.go | 20 +++++++++++++++++++ 5 files changed, 55 insertions(+), 2 deletions(-) create mode 100644 packages/relayer/migrations/1666650708_alter_events_table_add_msg_hash_event_index.sql diff --git a/packages/relayer/event.go b/packages/relayer/event.go index f61c9f8af3..4c0a701fab 100644 --- a/packages/relayer/event.go +++ b/packages/relayer/event.go @@ -101,5 +101,10 @@ type EventRepository interface { ctx context.Context, msgHash string, ) (*Event, error) + FirstByEventAndMsgHash( + ctx context.Context, + event string, + msgHash string, + ) (*Event, error) Delete(ctx context.Context, id int) error } diff --git a/packages/relayer/indexer/detect_and_handle_reorg.go b/packages/relayer/indexer/detect_and_handle_reorg.go index 3ff30a8107..55cfac8b7e 100644 --- a/packages/relayer/indexer/detect_and_handle_reorg.go +++ b/packages/relayer/indexer/detect_and_handle_reorg.go @@ -4,19 +4,24 @@ import ( "context" "github.com/pkg/errors" + + log "github.com/sirupsen/logrus" ) func (svc *Service) detectAndHandleReorg(ctx context.Context, eventType string, msgHash string) error { - e, err := svc.eventRepo.FirstByMsgHash(ctx, msgHash) + e, err := svc.eventRepo.FirstByEventAndMsgHash(ctx, eventType, msgHash) if err != nil { return errors.Wrap(err, "svc.eventRepo.FirstByMsgHash") } - if e == nil || e.MsgHash == "" { + if e == nil || e.MsgHash == "" || e.Event != eventType { return nil } // reorg detected + + log.Infof("reorg detected for msgHash %v and eventType %v", msgHash, eventType) + err = svc.eventRepo.Delete(ctx, e.ID) if err != nil { return errors.Wrap(err, "svc.eventRepo.Delete") diff --git a/packages/relayer/migrations/1666650708_alter_events_table_add_msg_hash_event_index.sql b/packages/relayer/migrations/1666650708_alter_events_table_add_msg_hash_event_index.sql new file mode 100644 index 0000000000..a384f311ae --- /dev/null +++ b/packages/relayer/migrations/1666650708_alter_events_table_add_msg_hash_event_index.sql @@ -0,0 +1,9 @@ +-- +goose Up +-- +goose StatementBegin +ALTER TABLE `events` ADD INDEX `msg_hash_event_index` (`msg_hash`, `event`); + +-- +goose StatementEnd +-- +goose Down +-- +goose StatementBegin +DROP INDEX msg_hash_event_index on events; +-- +goose StatementEnd \ No newline at end of file diff --git a/packages/relayer/mock/event_repository.go b/packages/relayer/mock/event_repository.go index 039509661e..7e52dcca68 100644 --- a/packages/relayer/mock/event_repository.go +++ b/packages/relayer/mock/event_repository.go @@ -106,6 +106,20 @@ func (r *EventRepository) FirstByMsgHash( return nil, nil } +func (r *EventRepository) FirstByEventAndMsgHash( + ctx context.Context, + event string, + msgHash string, +) (*relayer.Event, error) { + for _, e := range r.events { + if e.MsgHash == msgHash && e.Event == event { + return e, nil + } + } + + return nil, nil +} + func (r *EventRepository) Delete( ctx context.Context, id int, diff --git a/packages/relayer/repo/event.go b/packages/relayer/repo/event.go index 655b76f18c..60ee33b27f 100644 --- a/packages/relayer/repo/event.go +++ b/packages/relayer/repo/event.go @@ -83,6 +83,26 @@ func (r *EventRepository) FirstByMsgHash( return e, nil } +func (r *EventRepository) FirstByEventAndMsgHash( + ctx context.Context, + event string, + msgHash string, +) (*relayer.Event, error) { + e := &relayer.Event{} + // find all message sent events + if err := r.db.GormDB().Where("msg_hash = ?", msgHash). + Where("event = ?", event). + First(&e).Error; err != nil { + if err == gorm.ErrRecordNotFound { + return nil, nil + } + + return nil, errors.Wrap(err, "r.db.First") + } + + return e, nil +} + func (r *EventRepository) FindAllByAddress( ctx context.Context, req *http.Request,