Skip to content
This repository has been archived by the owner on Aug 31, 2021. It is now read-only.

Commit

Permalink
Remove handling of duplicate storage diffs in watcher
Browse files Browse the repository at this point in the history
- Can push this responsibility down to the transformers
- Update docs to reflect that transformers should handle duplicates
  • Loading branch information
rmulhol committed May 20, 2019
1 parent 79765c7 commit e290979
Show file tree
Hide file tree
Showing 6 changed files with 5 additions and 30 deletions.
2 changes: 1 addition & 1 deletion cmd/composeAndExecute.go
Original file line number Diff line number Diff line change
Expand Up @@ -193,5 +193,5 @@ func composeAndExecute() {
func init() {
rootCmd.AddCommand(composeAndExecuteCmd)
composeAndExecuteCmd.Flags().BoolVarP(&recheckHeadersArg, "recheck-headers", "r", false, "whether to re-check headers for watched events")
composeAndExecuteCmd.Flags().DurationVarP(&queueRecheckInterval, "queue-recheck-interval", "q", 5*time.Minute, "how often to recheck queued storage diffs")
composeAndExecuteCmd.Flags().DurationVarP(&queueRecheckInterval, "queue-recheck-interval", "q", 5*time.Minute, "interval duration for rechecking queued storage diffs (ex: 5m30s)")
}
2 changes: 1 addition & 1 deletion cmd/execute.go
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,7 @@ func execute() {
func init() {
rootCmd.AddCommand(executeCmd)
executeCmd.Flags().BoolVarP(&recheckHeadersArg, "recheck-headers", "r", false, "whether to re-check headers for watched events")
executeCmd.Flags().DurationVarP(&queueRecheckInterval, "queue-recheck-interval", "q", 5*time.Minute, "how often to recheck queued storage diffs")
executeCmd.Flags().DurationVarP(&queueRecheckInterval, "queue-recheck-interval", "q", 5*time.Minute, "interval duration for rechecking queued storage diffs (ex: 5m30s)")
}

type Exporter interface {
Expand Down
1 change: 1 addition & 0 deletions libraries/shared/factories/storage/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,7 @@ type Repository interface {
A contract-specific implementation of the repository interface enables the transformer to write the decoded storage value to the appropriate table in postgres.

The `Create` function is expected to recognize and persist a given storage value by the variable's name, as indicated on the row's metadata.
Note: we advise silently discarding duplicates in `Create` - as it's possible that you may read the same diff several times, and an error will trigger the storage watcher to queue that diff for later processing.

The `SetDB` function is required for the repository to connect to the database.

Expand Down
3 changes: 0 additions & 3 deletions libraries/shared/storage/utils/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,9 @@
package utils

import (
"errors"
"fmt"
)

var ErrRowExists = errors.New("parsed row for storage diff already exists")

type ErrContractNotFound struct {
Contract string
}
Expand Down
4 changes: 2 additions & 2 deletions libraries/shared/watcher/storage_watcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ func (storageWatcher StorageWatcher) processRow(row utils.StorageDiffRow) {
return
}
executeErr := storageTransformer.Execute(row)
if executeErr != nil && executeErr != utils.ErrRowExists {
if executeErr != nil {
logrus.Warn(fmt.Sprintf("error executing storage transformer: %s", executeErr))
queueErr := storageWatcher.Queue.Add(row)
if queueErr != nil {
Expand All @@ -100,7 +100,7 @@ func (storageWatcher StorageWatcher) processQueue() {
continue
}
executeErr := storageTransformer.Execute(row)
if executeErr == nil || executeErr == utils.ErrRowExists {
if executeErr == nil {
storageWatcher.deleteRow(row.Id)
}
}
Expand Down
23 changes: 0 additions & 23 deletions libraries/shared/watcher/storage_watcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,18 +109,6 @@ var _ = Describe("Storage Watcher", func() {
close(done)
})

It("does not queue row if transformer execution fails because row already exists", func(done Done) {
mockTransformer.ExecuteErr = utils.ErrRowExists

go storageWatcher.Execute(rows, errs, time.Hour)

Expect(<-errs).To(BeNil())
Consistently(func() bool {
return mockQueue.AddCalled
}).Should(BeFalse())
close(done)
})

It("queues row for later processing if transformer execution fails", func(done Done) {
mockTransformer.ExecuteErr = fakes.FakeError

Expand Down Expand Up @@ -199,17 +187,6 @@ var _ = Describe("Storage Watcher", func() {
close(done)
})

It("deletes row from queue if transformer execution errors because row already exists", func(done Done) {
mockTransformer.ExecuteErr = utils.ErrRowExists

go storageWatcher.Execute(rows, errs, time.Nanosecond)

Eventually(func() int {
return mockQueue.DeletePassedId
}).Should(Equal(row.Id))
close(done)
})

It("logs error if deleting persisted row fails", func(done Done) {
mockQueue.DeleteErr = fakes.FakeError
tempFile, fileErr := ioutil.TempFile("", "log")
Expand Down

0 comments on commit e290979

Please sign in to comment.