Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

(VDB-955) Associate diffs with headers #6

Merged
merged 5 commits into from
Nov 26, 2019
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 3 additions & 5 deletions cmd/execute.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ import (
"github.com/ethereum/go-ethereum/statediff"
"github.com/makerdao/vulcanizedb/libraries/shared/constants"
"github.com/makerdao/vulcanizedb/libraries/shared/fetcher"
storageUtils "github.com/makerdao/vulcanizedb/libraries/shared/storage/utils"
"github.com/makerdao/vulcanizedb/libraries/shared/streamer"
"github.com/makerdao/vulcanizedb/libraries/shared/transformer"
"github.com/makerdao/vulcanizedb/libraries/shared/watcher"
Expand Down Expand Up @@ -186,10 +185,9 @@ func watchEthStorage(w watcher.IStorageWatcher, wg *syn.WaitGroup) {
LogWithCommand.Info("executing storage transformers")
ticker := time.NewTicker(pollingInterval)
defer ticker.Stop()
for range ticker.C {
errs := make(chan error)
diffs := make(chan storageUtils.StorageDiff)
w.Execute(diffs, errs, queueRecheckInterval)
err := w.Execute(queueRecheckInterval)
if err != nil {
LogWithCommand.Fatalf("error executing storage watcher: %s", err.Error())
}
}

Expand Down
2 changes: 1 addition & 1 deletion libraries/shared/factories/storage/repository.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,6 @@ import (
)

type Repository interface {
Create(blockNumber int, blockHash string, metadata utils.StorageValueMetadata, value interface{}) error
Create(headerID int64, metadata utils.StorageValueMetadata, value interface{}) error
SetDB(db *postgres.DB)
}
2 changes: 1 addition & 1 deletion libraries/shared/factories/storage/transformer.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,5 +48,5 @@ func (transformer Transformer) Execute(diff utils.StorageDiff) error {
if decodeErr != nil {
return decodeErr
}
return transformer.Repository.Create(diff.BlockHeight, diff.BlockHash.Hex(), metadata, value)
return transformer.Repository.Create(diff.HeaderID, metadata, value)
}
24 changes: 10 additions & 14 deletions libraries/shared/factories/storage/transformer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@
package storage_test

import (
"math/rand"

"github.com/ethereum/go-ethereum/common"
"github.com/makerdao/vulcanizedb/libraries/shared/factories/storage"
"github.com/makerdao/vulcanizedb/libraries/shared/mocks"
Expand Down Expand Up @@ -69,21 +71,18 @@ var _ = Describe("Storage transformer", func() {
fakeMetadata := utils.StorageValueMetadata{Type: utils.Address}
storageKeysLookup.Metadata = fakeMetadata
rawValue := common.HexToAddress("0x12345")
fakeBlockNumber := 123
fakeBlockHash := "0x67890"
fakeHeaderID := rand.Int63()
fakeRow := utils.StorageDiff{
HashedAddress: common.Hash{},
BlockHash: common.HexToHash(fakeBlockHash),
BlockHeight: fakeBlockNumber,
StorageKey: common.Hash{},
StorageValue: rawValue.Hash(),
HeaderID: fakeHeaderID,
}

err := t.Execute(fakeRow)

Expect(err).NotTo(HaveOccurred())
Expect(repository.PassedBlockNumber).To(Equal(fakeBlockNumber))
Expect(repository.PassedBlockHash).To(Equal(common.HexToHash(fakeBlockHash).Hex()))
Expect(repository.PassedHeaderID).To(Equal(fakeHeaderID))
Expect(repository.PassedMetadata).To(Equal(fakeMetadata))
Expect(repository.PassedValue.(string)).To(Equal(rawValue.Hex()))
})
Expand All @@ -102,10 +101,9 @@ var _ = Describe("Storage transformer", func() {

Describe("when a storage row contains more than one item packed in storage", func() {
var (
rawValue = common.HexToAddress("000000000000000000000000000000000000000000000002a300000000002a30")
fakeBlockNumber = 123
fakeBlockHash = "0x67890"
packedTypes = make(map[int]utils.ValueType)
rawValue = common.HexToAddress("000000000000000000000000000000000000000000000002a300000000002a30")
fakeHeaderID = rand.Int63()
packedTypes = make(map[int]utils.ValueType)
)
packedTypes[0] = utils.Uint48
packedTypes[1] = utils.Uint48
Expand All @@ -121,17 +119,15 @@ var _ = Describe("Storage transformer", func() {
storageKeysLookup.Metadata = fakeMetadata
fakeRow := utils.StorageDiff{
HashedAddress: common.Hash{},
BlockHash: common.HexToHash(fakeBlockHash),
BlockHeight: fakeBlockNumber,
StorageKey: common.Hash{},
StorageValue: rawValue.Hash(),
HeaderID: fakeHeaderID,
}

err := t.Execute(fakeRow)

Expect(err).NotTo(HaveOccurred())
Expect(repository.PassedBlockNumber).To(Equal(fakeBlockNumber))
Expect(repository.PassedBlockHash).To(Equal(common.HexToHash(fakeBlockHash).Hex()))
Expect(repository.PassedHeaderID).To(Equal(fakeHeaderID))
Expect(repository.PassedMetadata).To(Equal(fakeMetadata))
expectedPassedValue := make(map[int]string)
expectedPassedValue[0] = "10800"
Expand Down
8 changes: 3 additions & 5 deletions libraries/shared/mocks/storage_fetcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,12 +28,10 @@ func NewMockStorageFetcher() *MockStorageFetcher {
}

func (fetcher *MockStorageFetcher) FetchStorageDiffs(out chan<- utils.StorageDiff, errs chan<- error) {
defer close(out)
defer close(errs)
for _, err := range fetcher.ErrsToReturn {
errs <- err
}
for _, diff := range fetcher.DiffsToReturn {
out <- diff
}
for _, err := range fetcher.ErrsToReturn {
errs <- err
}
}
14 changes: 6 additions & 8 deletions libraries/shared/mocks/storage_repository.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,16 +22,14 @@ import (
)

type MockStorageRepository struct {
CreateErr error
PassedBlockNumber int
PassedBlockHash string
PassedMetadata utils.StorageValueMetadata
PassedValue interface{}
CreateErr error
PassedHeaderID int64
PassedMetadata utils.StorageValueMetadata
PassedValue interface{}
}

func (repository *MockStorageRepository) Create(blockNumber int, blockHash string, metadata utils.StorageValueMetadata, value interface{}) error {
repository.PassedBlockNumber = blockNumber
repository.PassedBlockHash = blockHash
func (repository *MockStorageRepository) Create(headerID int64, metadata utils.StorageValueMetadata, value interface{}) error {
repository.PassedHeaderID = headerID
repository.PassedMetadata = metadata
repository.PassedValue = value
return repository.CreateErr
Expand Down
4 changes: 3 additions & 1 deletion libraries/shared/storage/utils/diff.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,12 @@
package utils

import (
"strconv"

"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/crypto"
"github.com/ethereum/go-ethereum/rlp"
"github.com/ethereum/go-ethereum/statediff"
"strconv"
)

const ExpectedRowLength = 5
Expand All @@ -33,6 +34,7 @@ type StorageDiff struct {
BlockHeight int `db:"block_height"`
StorageKey common.Hash `db:"storage_key"`
StorageValue common.Hash `db:"storage_value"`
HeaderID int64 `db:"header_id"`
}

func FromParityCsvRow(csvRow []string) (StorageDiff, error) {
Expand Down
100 changes: 81 additions & 19 deletions libraries/shared/watcher/storage_watcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,35 +18,54 @@ package watcher

import (
"fmt"
"time"

"github.com/ethereum/go-ethereum/common"
"github.com/makerdao/vulcanizedb/libraries/shared/fetcher"
"github.com/makerdao/vulcanizedb/libraries/shared/storage"
"github.com/makerdao/vulcanizedb/libraries/shared/storage/utils"
"github.com/makerdao/vulcanizedb/libraries/shared/transformer"
"github.com/makerdao/vulcanizedb/pkg/datastore"
"github.com/makerdao/vulcanizedb/pkg/datastore/postgres"
"github.com/makerdao/vulcanizedb/pkg/datastore/postgres/repositories"
"github.com/sirupsen/logrus"
"time"
)

type ErrHeaderMismatch struct {

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💯

dbHash string
diffHash string
}

func NewErrHeaderMismatch(DBHash, diffHash string) *ErrHeaderMismatch {
return &ErrHeaderMismatch{dbHash: DBHash, diffHash: diffHash}
}

func (e ErrHeaderMismatch) Error() string {
return fmt.Sprintf("db header hash (%s) doesn't match diff header hash (%s)", e.dbHash, e.diffHash)
}

type IStorageWatcher interface {
AddTransformers(initializers []transformer.StorageTransformerInitializer)
Execute(diffsChan chan utils.StorageDiff, errsChan chan error, queueRecheckInterval time.Duration)
Execute(queueRecheckInterval time.Duration) error
}

type StorageWatcher struct {
db *postgres.DB
StorageFetcher fetcher.IStorageFetcher
Queue storage.IStorageQueue
HeaderRepository datastore.HeaderRepository
KeccakAddressTransformers map[common.Hash]transformer.StorageTransformer // keccak hash of an address => transformer
}

func NewStorageWatcher(fetcher fetcher.IStorageFetcher, db *postgres.DB) StorageWatcher {
queue := storage.NewStorageQueue(db)
headerRepository := repositories.NewHeaderRepository(db)
transformers := make(map[common.Hash]transformer.StorageTransformer)
return StorageWatcher{
db: db,
StorageFetcher: fetcher,
Queue: queue,
HeaderRepository: headerRepository,
KeccakAddressTransformers: transformers,
}
}
Expand All @@ -58,13 +77,21 @@ func (storageWatcher StorageWatcher) AddTransformers(initializers []transformer.
}
}

func (storageWatcher StorageWatcher) Execute(diffsChan chan utils.StorageDiff, errsChan chan error, queueRecheckInterval time.Duration) {
func (storageWatcher StorageWatcher) Execute(queueRecheckInterval time.Duration) error {
ticker := time.NewTicker(queueRecheckInterval)
diffsChan := make(chan utils.StorageDiff)
errsChan := make(chan error)

defer close(diffsChan)
defer close(errsChan)

go storageWatcher.StorageFetcher.FetchStorageDiffs(diffsChan, errsChan)

for {
select {
case fetchErr := <-errsChan:
logrus.Warn(fmt.Sprintf("error fetching storage diffs: %s", fetchErr))
logrus.Warnf("error fetching storage diffs: %s", fetchErr.Error())
return fetchErr
case diff := <-diffsChan:
storageWatcher.processRow(diff)
case <-ticker.C:
Expand All @@ -79,43 +106,78 @@ func (storageWatcher StorageWatcher) getTransformer(diff utils.StorageDiff) (tra
}

func (storageWatcher StorageWatcher) processRow(diff utils.StorageDiff) {
storageTransformer, ok := storageWatcher.getTransformer(diff)
if !ok {
logrus.Debug("ignoring a diff from an unwatched contract")
storageTransformer, isTransformerWatchingAddress := storageWatcher.getTransformer(diff)
if !isTransformerWatchingAddress {
logrus.Debug("ignoring diff from an unwatched contract")
return
}

headerID, err := storageWatcher.getHeaderID(diff)
if err != nil {
logrus.Infof("error getting header for diff: %s", err.Error())
storageWatcher.queueDiff(diff)
return
}
diff.HeaderID = headerID

executeErr := storageTransformer.Execute(diff)
if executeErr != nil {
logrus.Warn(fmt.Sprintf("error executing storage transformer: %s", executeErr))
queueErr := storageWatcher.Queue.Add(diff)
if queueErr != nil {
logrus.Warn(fmt.Sprintf("error queueing storage diff: %s", queueErr))
}
logrus.Infof("error executing storage transformer: %s", executeErr.Error())
storageWatcher.queueDiff(diff)
}
}

func (storageWatcher StorageWatcher) processQueue() {
diffs, fetchErr := storageWatcher.Queue.GetAll()
if fetchErr != nil {
logrus.Warn(fmt.Sprintf("error getting queued storage: %s", fetchErr))
logrus.Infof("error getting queued storage: %s", fetchErr.Error())
}

for _, diff := range diffs {
storageTransformer, ok := storageWatcher.getTransformer(diff)
if !ok {
// delete diff from queue if address no longer watched
headerID, getHeaderErr := storageWatcher.getHeaderID(diff)
if getHeaderErr != nil {
logrus.Infof("error getting header for diff: %s", getHeaderErr.Error())
continue
}
diff.HeaderID = headerID

storageTransformer, isTransformerWatchingAddress := storageWatcher.getTransformer(diff)
if !isTransformerWatchingAddress {
storageWatcher.deleteRow(diff.Id)
continue
}

executeErr := storageTransformer.Execute(diff)
if executeErr == nil {
storageWatcher.deleteRow(diff.Id)
if executeErr != nil {
logrus.Infof("error executing storage transformer: %s", executeErr.Error())
continue
}

storageWatcher.deleteRow(diff.Id)
}
}

func (storageWatcher StorageWatcher) deleteRow(id int) {
deleteErr := storageWatcher.Queue.Delete(id)
if deleteErr != nil {
logrus.Warn(fmt.Sprintf("error deleting persisted diff from queue: %s", deleteErr))
logrus.Infof("error deleting persisted diff from queue: %s", deleteErr.Error())
}
}

func (storageWatcher StorageWatcher) queueDiff(diff utils.StorageDiff) {
queueErr := storageWatcher.Queue.Add(diff)
if queueErr != nil {
logrus.Infof("error queueing storage diff: %s", queueErr.Error())
}
}

func (storageWatcher StorageWatcher) getHeaderID(diff utils.StorageDiff) (int64, error) {
header, getHeaderErr := storageWatcher.HeaderRepository.GetHeader(int64(diff.BlockHeight))
if getHeaderErr != nil {
return 0, getHeaderErr
}
if diff.BlockHash.Hex() != header.Hash {
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thinking maybe this should convert header.Hash to a common.Hash and do this comparison with common.Hash instead of hex strings so that we don't run into pesky issues around the presence/absence of the 0x prefix

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done d73a988

return 0, NewErrHeaderMismatch(header.Hash, diff.BlockHash.Hex())
}
return header.Id, nil
}
Loading