/
repository.go
105 lines (93 loc) · 3.26 KB
/
repository.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
package storage
import (
"context"
"errors"
"time"
"github.com/wormhole-foundation/wormhole-explorer/common/client/alert"
"github.com/wormhole-foundation/wormhole-explorer/common/repository"
cwAlert "github.com/wormhole-foundation/wormhole-explorer/contract-watcher/internal/alert"
"github.com/wormhole-foundation/wormhole-explorer/contract-watcher/internal/metrics"
sdk "github.com/wormhole-foundation/wormhole/sdk/vaa"
"go.mongodb.org/mongo-driver/bson"
"go.mongodb.org/mongo-driver/mongo"
"go.mongodb.org/mongo-driver/mongo/options"
"go.uber.org/zap"
)
// repository errors
var ErrDocNotFound = errors.New("NOT FOUND")
// Repository definitions.
type Repository struct {
db *mongo.Database
log *zap.Logger
metrics metrics.Metrics
alerts alert.AlertClient
collections struct {
watcherBlock *mongo.Collection
globalTransactions *mongo.Collection
}
}
// NewRepository create a new respository instance.
func NewRepository(db *mongo.Database, metrics metrics.Metrics, alerts alert.AlertClient, log *zap.Logger) *Repository {
return &Repository{db, log, metrics, alerts, struct {
watcherBlock *mongo.Collection
globalTransactions *mongo.Collection
}{
watcherBlock: db.Collection("watcherBlock"),
globalTransactions: db.Collection("globalTransactions"),
}}
}
func (s *Repository) UpsertGlobalTransaction(ctx context.Context, chainID sdk.ChainID, globalTx TransactionUpdate) error {
update := bson.M{
"$set": globalTx,
"$setOnInsert": repository.IndexedAt(time.Now()),
"$inc": bson.D{{Key: "revision", Value: 1}},
}
_, err := s.collections.globalTransactions.UpdateByID(ctx, globalTx.ID, update, options.Update().SetUpsert(true))
if err != nil {
s.log.Error("Error inserting global transaction", zap.Error(err))
// send alert when exists an error saving ptth vaa.
alertContext := alert.AlertContext{
Details: globalTx.ToMap(),
Error: err,
}
s.alerts.CreateAndSend(ctx, cwAlert.ErrorSaveDestinationTx, alertContext)
return err
}
s.metrics.IncDestinationTrxSaved(chainID)
return err
}
func (s *Repository) GetGlobalTransactionByID(ctx context.Context, id string) (TransactionUpdate, error) {
var tx TransactionUpdate
err := s.collections.globalTransactions.FindOne(ctx, bson.M{"_id": id}).Decode(&tx)
if err != nil {
if err == mongo.ErrNoDocuments {
return tx, ErrDocNotFound
}
return tx, err
}
return tx, nil
}
func (s *Repository) UpdateWatcherBlock(ctx context.Context, chainID sdk.ChainID, watcherBlock WatcherBlock) error {
update := bson.M{
"$set": watcherBlock,
"$setOnInsert": repository.IndexedAt(time.Now()),
}
s.metrics.SetCurrentBlock(chainID, uint64(watcherBlock.BlockNumber))
_, err := s.collections.watcherBlock.UpdateByID(ctx, watcherBlock.ID, update, options.Update().SetUpsert(true))
if err != nil {
s.log.Error("Error inserting watcher block", zap.Error(err))
return err
}
return err
}
func (s *Repository) GetCurrentBlock(ctx context.Context, blockchain string, defaultBlock int64) (int64, error) {
var block WatcherBlock
err := s.collections.watcherBlock.FindOne(ctx, bson.M{"_id": blockchain}).Decode(&block)
if err != nil {
if err == mongo.ErrNoDocuments {
return defaultBlock, nil
}
return 0, err
}
return block.BlockNumber, nil
}