Skip to content

Commit

Permalink
(VDB-1046) Add command to extract diffs
Browse files Browse the repository at this point in the history
- Isolate diff extraction from transformers etc
  • Loading branch information
rmulhol committed Jan 17, 2020
1 parent 851c288 commit bc2ea7d
Show file tree
Hide file tree
Showing 45 changed files with 557 additions and 333 deletions.
2 changes: 1 addition & 1 deletion cmd/execute.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ import (

"github.com/ethereum/go-ethereum/statediff"
"github.com/makerdao/vulcanizedb/libraries/shared/constants"
"github.com/makerdao/vulcanizedb/libraries/shared/fetcher"
"github.com/makerdao/vulcanizedb/libraries/shared/storage/fetcher"
"github.com/makerdao/vulcanizedb/libraries/shared/streamer"
"github.com/makerdao/vulcanizedb/libraries/shared/transformer"
"github.com/makerdao/vulcanizedb/libraries/shared/watcher"
Expand Down
56 changes: 56 additions & 0 deletions cmd/extractDiffs.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
package cmd

import (
"github.com/ethereum/go-ethereum/statediff"
"github.com/makerdao/vulcanizedb/libraries/shared/storage"
"github.com/makerdao/vulcanizedb/libraries/shared/storage/fetcher"
"github.com/makerdao/vulcanizedb/libraries/shared/streamer"
"github.com/makerdao/vulcanizedb/pkg/fs"
"github.com/makerdao/vulcanizedb/utils"
"github.com/sirupsen/logrus"
"github.com/spf13/cobra"
)

// extractDiffsCmd represents the extractDiffs command
var extractDiffsCmd = &cobra.Command{
Use: "extractDiffs",
Short: "Extract storage diffs from a node and write them to postgres",
Long: `Reads storage diffs from either a CSV or JSON RPC subscription.
Configure which with the STORAGEDIFFS_SOURCE flag. Received diffs are
written to public.storage_diff.`,
Run: func(cmd *cobra.Command, args []string) {
extractDiffs()
},
}

func init() {
rootCmd.AddCommand(extractDiffsCmd)
}

func extractDiffs() {
// Setup bc and db objects
blockChain := getBlockChain()
db := utils.LoadPostgres(databaseConfig, blockChain.Node())

// initialize fetcher
var storageFetcher fetcher.IStorageFetcher
switch storageDiffsSource {
case "geth":
logrus.Debug("fetching storage diffs from geth pub sub")
rpcClient, _ := getClients()
stateDiffStreamer := streamer.NewStateDiffStreamer(rpcClient)
payloadChan := make(chan statediff.Payload)
storageFetcher = fetcher.NewGethRpcStorageFetcher(&stateDiffStreamer, payloadChan)
default:
logrus.Debug("fetching storage diffs from csv")
tailer := fs.FileTailer{Path: storageDiffsPath}
storageFetcher = fetcher.NewCsvTailStorageFetcher(tailer)
}

// extract diffs
extractor := storage.NewDiffExtractor(storageFetcher, &db)
err := extractor.ExtractDiffs()
if err != nil {
LogWithCommand.Fatalf("extracting diffs failed: %s", err.Error())
}
}
4 changes: 2 additions & 2 deletions libraries/shared/factories/storage/keys_loader.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,11 @@ package storage

import (
"github.com/ethereum/go-ethereum/common"
"github.com/makerdao/vulcanizedb/libraries/shared/storage"
"github.com/makerdao/vulcanizedb/libraries/shared/storage/types"
"github.com/makerdao/vulcanizedb/pkg/datastore/postgres"
)

type KeysLoader interface {
LoadMappings() (map[common.Hash]storage.ValueMetadata, error)
LoadMappings() (map[common.Hash]types.ValueMetadata, error)
SetDB(db *postgres.DB)
}
11 changes: 6 additions & 5 deletions libraries/shared/factories/storage/keys_lookup.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,24 +19,25 @@ package storage
import (
"github.com/ethereum/go-ethereum/common"
"github.com/makerdao/vulcanizedb/libraries/shared/storage"
"github.com/makerdao/vulcanizedb/libraries/shared/storage/types"
"github.com/makerdao/vulcanizedb/pkg/datastore/postgres"
)

type KeysLookup interface {
Lookup(key common.Hash) (storage.ValueMetadata, error)
Lookup(key common.Hash) (types.ValueMetadata, error)
SetDB(db *postgres.DB)
}

type keysLookup struct {
loader KeysLoader
mappings map[common.Hash]storage.ValueMetadata
mappings map[common.Hash]types.ValueMetadata
}

func NewKeysLookup(loader KeysLoader) KeysLookup {
return &keysLookup{loader: loader, mappings: make(map[common.Hash]storage.ValueMetadata)}
return &keysLookup{loader: loader, mappings: make(map[common.Hash]types.ValueMetadata)}
}

func (lookup *keysLookup) Lookup(key common.Hash) (storage.ValueMetadata, error) {
func (lookup *keysLookup) Lookup(key common.Hash) (types.ValueMetadata, error) {
metadata, ok := lookup.mappings[key]
if !ok {
refreshErr := lookup.refreshMappings()
Expand All @@ -45,7 +46,7 @@ func (lookup *keysLookup) Lookup(key common.Hash) (storage.ValueMetadata, error)
}
metadata, ok = lookup.mappings[key]
if !ok {
return metadata, storage.ErrKeyNotFound{Key: key.Hex()}
return metadata, types.ErrKeyNotFound{Key: key.Hex()}
}
}
return metadata, nil
Expand Down
14 changes: 7 additions & 7 deletions libraries/shared/factories/storage/keys_lookup_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ import (
"github.com/ethereum/go-ethereum/crypto"
storage_factory "github.com/makerdao/vulcanizedb/libraries/shared/factories/storage"
"github.com/makerdao/vulcanizedb/libraries/shared/mocks"
"github.com/makerdao/vulcanizedb/libraries/shared/storage"
"github.com/makerdao/vulcanizedb/libraries/shared/storage/types"
"github.com/makerdao/vulcanizedb/pkg/fakes"
"github.com/makerdao/vulcanizedb/test_config"
. "github.com/onsi/ginkgo"
Expand All @@ -30,7 +30,7 @@ import (

var _ = Describe("Storage keys lookup", func() {
var (
fakeMetadata = storage.GetValueMetadata("name", map[storage.Key]string{}, storage.Uint256)
fakeMetadata = types.GetValueMetadata("name", map[types.Key]string{}, types.Uint256)
lookup storage_factory.KeysLookup
loader *mocks.MockStorageKeysLoader
)
Expand All @@ -43,7 +43,7 @@ var _ = Describe("Storage keys lookup", func() {
Describe("Lookup", func() {
Describe("when key not found", func() {
It("refreshes keys", func() {
loader.StorageKeyMappings = map[common.Hash]storage.ValueMetadata{fakes.FakeHash: fakeMetadata}
loader.StorageKeyMappings = map[common.Hash]types.ValueMetadata{fakes.FakeHash: fakeMetadata}
_, err := lookup.Lookup(fakes.FakeHash)

Expect(err).NotTo(HaveOccurred())
Expand All @@ -62,7 +62,7 @@ var _ = Describe("Storage keys lookup", func() {

Describe("when key found", func() {
BeforeEach(func() {
loader.StorageKeyMappings = map[common.Hash]storage.ValueMetadata{fakes.FakeHash: fakeMetadata}
loader.StorageKeyMappings = map[common.Hash]types.ValueMetadata{fakes.FakeHash: fakeMetadata}
_, err := lookup.Lookup(fakes.FakeHash)
Expect(err).NotTo(HaveOccurred())
Expect(loader.LoadMappingsCallCount).To(Equal(1))
Expand All @@ -77,7 +77,7 @@ var _ = Describe("Storage keys lookup", func() {
})

It("returns metadata for loaded static key", func() {
loader.StorageKeyMappings = map[common.Hash]storage.ValueMetadata{fakes.FakeHash: fakeMetadata}
loader.StorageKeyMappings = map[common.Hash]types.ValueMetadata{fakes.FakeHash: fakeMetadata}

metadata, err := lookup.Lookup(fakes.FakeHash)

Expand All @@ -86,7 +86,7 @@ var _ = Describe("Storage keys lookup", func() {
})

It("returns metadata for hashed version of key (accommodates keys emitted from Geth)", func() {
loader.StorageKeyMappings = map[common.Hash]storage.ValueMetadata{fakes.FakeHash: fakeMetadata}
loader.StorageKeyMappings = map[common.Hash]types.ValueMetadata{fakes.FakeHash: fakeMetadata}

hashedKey := common.BytesToHash(crypto.Keccak256(fakes.FakeHash.Bytes()))
metadata, err := lookup.Lookup(hashedKey)
Expand All @@ -99,7 +99,7 @@ var _ = Describe("Storage keys lookup", func() {
_, err := lookup.Lookup(fakes.FakeHash)

Expect(err).To(HaveOccurred())
Expect(err).To(MatchError(storage.ErrKeyNotFound{Key: fakes.FakeHash.Hex()}))
Expect(err).To(MatchError(types.ErrKeyNotFound{Key: fakes.FakeHash.Hex()}))
})
})

Expand Down
4 changes: 2 additions & 2 deletions libraries/shared/factories/storage/repository.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,11 @@
package storage

import (
"github.com/makerdao/vulcanizedb/libraries/shared/storage"
"github.com/makerdao/vulcanizedb/libraries/shared/storage/types"
"github.com/makerdao/vulcanizedb/pkg/datastore/postgres"
)

type Repository interface {
Create(diffID, headerID int64, metadata storage.ValueMetadata, value interface{}) error
Create(diffID, headerID int64, metadata types.ValueMetadata, value interface{}) error
SetDB(db *postgres.DB)
}
3 changes: 2 additions & 1 deletion libraries/shared/factories/storage/transformer.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package storage
import (
"github.com/ethereum/go-ethereum/common"
"github.com/makerdao/vulcanizedb/libraries/shared/storage"
"github.com/makerdao/vulcanizedb/libraries/shared/storage/types"
"github.com/makerdao/vulcanizedb/libraries/shared/transformer"
"github.com/makerdao/vulcanizedb/pkg/datastore/postgres"
)
Expand All @@ -39,7 +40,7 @@ func (transformer Transformer) KeccakContractAddress() common.Hash {
return transformer.HashedAddress
}

func (transformer Transformer) Execute(diff storage.PersistedDiff) error {
func (transformer Transformer) Execute(diff types.PersistedDiff) error {
metadata, lookupErr := transformer.StorageKeysLookup.Lookup(diff.StorageKey)
if lookupErr != nil {
return lookupErr
Expand Down
40 changes: 20 additions & 20 deletions libraries/shared/factories/storage/transformer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,9 @@ import (
"math/rand"

"github.com/ethereum/go-ethereum/common"
storage_factory "github.com/makerdao/vulcanizedb/libraries/shared/factories/storage"
"github.com/makerdao/vulcanizedb/libraries/shared/factories/storage"
"github.com/makerdao/vulcanizedb/libraries/shared/mocks"
"github.com/makerdao/vulcanizedb/libraries/shared/storage"
"github.com/makerdao/vulcanizedb/libraries/shared/storage/types"
"github.com/makerdao/vulcanizedb/pkg/fakes"
. "github.com/onsi/ginkgo"
. "github.com/onsi/gomega"
Expand All @@ -32,52 +32,52 @@ var _ = Describe("Storage transformer", func() {
var (
storageKeysLookup *mocks.MockStorageKeysLookup
repository *mocks.MockStorageRepository
t storage_factory.Transformer
t storage.Transformer
)

BeforeEach(func() {
storageKeysLookup = &mocks.MockStorageKeysLookup{}
repository = &mocks.MockStorageRepository{}
t = storage_factory.Transformer{
t = storage.Transformer{
HashedAddress: common.Hash{},
StorageKeysLookup: storageKeysLookup,
Repository: repository,
}
})

It("returns the contract address being watched", func() {
fakeAddress := storage.HexToKeccak256Hash("0x12345")
fakeAddress := types.HexToKeccak256Hash("0x12345")
t.HashedAddress = fakeAddress

Expect(t.KeccakContractAddress()).To(Equal(fakeAddress))
})

It("looks up metadata for storage key", func() {
t.Execute(storage.PersistedDiff{})
t.Execute(types.PersistedDiff{})

Expect(storageKeysLookup.LookupCalled).To(BeTrue())
})

It("returns error if lookup fails", func() {
storageKeysLookup.LookupErr = fakes.FakeError

err := t.Execute(storage.PersistedDiff{})
err := t.Execute(types.PersistedDiff{})

Expect(err).To(HaveOccurred())
Expect(err).To(MatchError(fakes.FakeError))
})

It("creates storage row with decoded data", func() {
fakeMetadata := storage.ValueMetadata{Type: storage.Address}
fakeMetadata := types.ValueMetadata{Type: types.Address}
storageKeysLookup.Metadata = fakeMetadata
rawValue := common.HexToAddress("0x12345")
fakeHeaderID := rand.Int63()
fakeBlockNumber := rand.Int()
fakeBlockHash := fakes.RandomString(64)
fakeRow := storage.PersistedDiff{
fakeRow := types.PersistedDiff{
ID: rand.Int63(),
HeaderID: fakeHeaderID,
RawDiff: storage.RawDiff{
RawDiff: types.RawDiff{
HashedAddress: common.Hash{},
BlockHash: common.HexToHash(fakeBlockHash),
BlockHeight: fakeBlockNumber,
Expand All @@ -97,10 +97,10 @@ var _ = Describe("Storage transformer", func() {

It("returns error if creating row fails", func() {
rawValue := common.HexToAddress("0x12345")
fakeMetadata := storage.ValueMetadata{Type: storage.Address}
fakeMetadata := types.ValueMetadata{Type: types.Address}
storageKeysLookup.Metadata = fakeMetadata
repository.CreateErr = fakes.FakeError
diff := storage.PersistedDiff{RawDiff: storage.RawDiff{StorageValue: rawValue.Hash()}}
diff := types.PersistedDiff{RawDiff: types.RawDiff{StorageValue: rawValue.Hash()}}

err := t.Execute(diff)

Expand All @@ -112,26 +112,26 @@ var _ = Describe("Storage transformer", func() {
var (
rawValue = common.HexToAddress("000000000000000000000000000000000000000000000002a300000000002a30")
fakeHeaderID = rand.Int63()
packedTypes = make(map[int]storage.ValueType)
packedTypes = make(map[int]types.ValueType)
)
packedTypes[0] = storage.Uint48
packedTypes[1] = storage.Uint48
packedTypes[0] = types.Uint48
packedTypes[1] = types.Uint48

var fakeMetadata = storage.ValueMetadata{
var fakeMetadata = types.ValueMetadata{
Name: "",
Keys: nil,
Type: storage.PackedSlot,
Type: types.PackedSlot,
PackedTypes: packedTypes,
}

It("passes the decoded data items to the repository", func() {
storageKeysLookup.Metadata = fakeMetadata
fakeBlockNumber := rand.Int()
fakeBlockHash := fakes.RandomString(64)
fakeRow := storage.PersistedDiff{
fakeRow := types.PersistedDiff{
ID: rand.Int63(),
HeaderID: fakeHeaderID,
RawDiff: storage.RawDiff{
RawDiff: types.RawDiff{
HashedAddress: common.Hash{},
BlockHash: common.HexToHash(fakeBlockHash),
BlockHeight: fakeBlockNumber,
Expand All @@ -155,7 +155,7 @@ var _ = Describe("Storage transformer", func() {
It("returns error if creating a row fails", func() {
storageKeysLookup.Metadata = fakeMetadata
repository.CreateErr = fakes.FakeError
diff := storage.PersistedDiff{RawDiff: storage.RawDiff{StorageValue: rawValue.Hash()}}
diff := types.PersistedDiff{RawDiff: types.RawDiff{StorageValue: rawValue.Hash()}}

err := t.Execute(diff)

Expand Down
6 changes: 3 additions & 3 deletions libraries/shared/fetcher/fetcher_suite_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,12 @@
package fetcher_test

import (
"io/ioutil"
"testing"

. "github.com/onsi/ginkgo"
. "github.com/onsi/gomega"
log "github.com/sirupsen/logrus"
"io/ioutil"
"github.com/sirupsen/logrus"
)

func TestFactories(t *testing.T) {
Expand All @@ -31,5 +31,5 @@ func TestFactories(t *testing.T) {
}

var _ = BeforeSuite(func() {
log.SetOutput(ioutil.Discard)
logrus.SetOutput(ioutil.Discard)
})
Original file line number Diff line number Diff line change
Expand Up @@ -14,19 +14,19 @@
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>.

package fakes
package mocks

import (
"github.com/makerdao/vulcanizedb/libraries/shared/storage"
"github.com/makerdao/vulcanizedb/libraries/shared/storage/types"
)

type MockStorageDiffRepository struct {
CreatePassedRawDiffs []storage.RawDiff
CreatePassedRawDiffs []types.RawDiff
CreateReturnID int64
CreateReturnError error
}

func (repository *MockStorageDiffRepository) CreateStorageDiff(rawDiff storage.RawDiff) (int64, error) {
func (repository *MockStorageDiffRepository) CreateStorageDiff(rawDiff types.RawDiff) (int64, error) {
repository.CreatePassedRawDiffs = append(repository.CreatePassedRawDiffs, rawDiff)
return repository.CreateReturnID, repository.CreateReturnError
}
Loading

0 comments on commit bc2ea7d

Please sign in to comment.