Skip to content

Commit

Permalink
Merge pull request #58 from insolar/develop
Browse files Browse the repository at this point in the history
Develop: fix WLT-1091
  • Loading branch information
uhzxl committed Sep 18, 2019
2 parents bc2b271 + c449999 commit 93f937d
Show file tree
Hide file tree
Showing 20 changed files with 126 additions and 106 deletions.
12 changes: 6 additions & 6 deletions internal/beauty/beautifier.go
Original file line number Diff line number Diff line change
Expand Up @@ -147,21 +147,21 @@ func (b *Beautifier) process(rec *record.Material) {
b.depositKeeper.Process(rec)
}

func (b *Beautifier) dump(tx orm.DB, pub replicator.OnDumpSuccess) error {
func (b *Beautifier) dump(tx orm.DB, pub replicator.OnDumpSuccess, errors prometheus.Counter) error {
log.Infof("dump beautifier")
if err := b.memberComposer.Dump(tx, pub); err != nil {
if err := b.memberComposer.Dump(tx, pub, errors); err != nil {
return err
}
if err := b.memberBalanceUpdater.Dump(tx, pub); err != nil {
if err := b.memberBalanceUpdater.Dump(tx, pub, errors); err != nil {
return err
}
if err := b.transferComposer.Dump(tx, pub); err != nil {
if err := b.transferComposer.Dump(tx, pub, errors); err != nil {
return err
}
if err := b.depositComposer.Dump(tx, pub); err != nil {
if err := b.depositComposer.Dump(tx, pub, errors); err != nil {
return err
}
if err := b.migrationAddressComposer.Dump(tx, pub); err != nil {
if err := b.migrationAddressComposer.Dump(tx, pub, errors); err != nil {
return err
}
if err := b.migrationAddressKeeper.Dump(tx, pub); err != nil {
Expand Down
4 changes: 2 additions & 2 deletions internal/beauty/member/member.go
Original file line number Diff line number Diff line change
Expand Up @@ -196,12 +196,12 @@ func (c *Composer) compose(b *memberBuilder) {
delete(c.builders, origin)
}

func (c *Composer) Dump(tx orm.DB, pub replicator.OnDumpSuccess) error {
func (c *Composer) Dump(tx orm.DB, pub replicator.OnDumpSuccess, errorCounter prometheus.Counter) error {
log.Infof("dump members")
c.updateStat()

for _, member := range c.cache {
if err := member.Dump(tx); err != nil {
if err := member.Dump(tx, errorCounter); err != nil {
return errors.Wrapf(err, "failed to dump members")
}
}
Expand Down
5 changes: 3 additions & 2 deletions internal/beauty/member/update_balance.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"github.com/insolar/insolar/insolar/gen"
"github.com/insolar/insolar/insolar/record"
"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"

log "github.com/sirupsen/logrus"

Expand Down Expand Up @@ -73,11 +74,11 @@ func (u *BalanceUpdater) processAccountAmend(id insolar.ID, rec *record.Material
})
}

func (u *BalanceUpdater) Dump(tx orm.DB, pub replicator.OnDumpSuccess) error {
func (u *BalanceUpdater) Dump(tx orm.DB, pub replicator.OnDumpSuccess, errorCounter prometheus.Counter) error {
log.Infof("dump member balances")

for _, acc := range u.technicalAccounts {
if err := acc.Dump(tx); err != nil {
if err := acc.Dump(tx, errorCounter); err != nil {
return errors.Wrapf(err, "failed to dump internal member")
}
}
Expand Down
4 changes: 2 additions & 2 deletions internal/beauty/migration/address.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,14 +119,14 @@ func (c *Composer) Process(rec *record.Material) {
}
}

func (c *Composer) Dump(tx orm.DB, pub replicator.OnDumpSuccess) error {
func (c *Composer) Dump(tx orm.DB, pub replicator.OnDumpSuccess, errorCounter prometheus.Counter) error {
log.Infof("dump migration addresses")

c.updateStat()

log.Infof("dump %d addresses", len(c.cache))
for _, addr := range c.cache {
if err := addr.Dump(tx); err != nil {
if err := addr.Dump(tx, errorCounter); err != nil {
return errors.Wrapf(err, "failed to dump migration addresses addr=%s", addr.Addr)
}
}
Expand Down
4 changes: 2 additions & 2 deletions internal/beauty/migration/deposit/deposit.go
Original file line number Diff line number Diff line change
Expand Up @@ -282,11 +282,11 @@ func (c *Composer) compose(b *depositBuilder) {
delete(c.builders, origin)
}

func (c *Composer) Dump(tx orm.DB, pub replicator.OnDumpSuccess) error {
func (c *Composer) Dump(tx orm.DB, pub replicator.OnDumpSuccess, errorCounter prometheus.Counter) error {
log.Infof("dump deposits")

for _, dep := range c.cache {
if err := dep.Dump(tx); err != nil {
if err := dep.Dump(tx, errorCounter); err != nil {
return errors.Wrapf(err, "failed to dump deposits")
}
}
Expand Down
5 changes: 3 additions & 2 deletions internal/beauty/pulse/keeper.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"github.com/go-pg/pg/orm"
"github.com/insolar/insolar/insolar"
"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"
log "github.com/sirupsen/logrus"

"github.com/insolar/observer/internal/configuration"
Expand Down Expand Up @@ -83,10 +84,10 @@ func (k *Keeper) process(pn insolar.PulseNumber, entropy insolar.Entropy, timest
})
}

func (k *Keeper) dump(tx orm.DB, pub replicator.OnDumpSuccess) error {
func (k *Keeper) dump(tx orm.DB, pub replicator.OnDumpSuccess, errorCounter prometheus.Counter) error {
log.Infof("dump pulse keeper")
for _, p := range k.cache {
if err := p.Dump(tx); err != nil {
if err := p.Dump(tx, errorCounter); err != nil {
return errors.Wrapf(err, "failed to dump pulse")
}
}
Expand Down
4 changes: 2 additions & 2 deletions internal/beauty/transfer/transfer.go
Original file line number Diff line number Diff line change
Expand Up @@ -188,13 +188,13 @@ func isTransferCall(request *dto.Request) bool {
return false
}

func (c *Composer) Dump(tx orm.DB, pub replicator.OnDumpSuccess) error {
func (c *Composer) Dump(tx orm.DB, pub replicator.OnDumpSuccess, errorCounter prometheus.Counter) error {
log.Infof("dump member transfers")

c.updateStat()

for _, transfer := range c.cache {
if err := transfer.Dump(tx); err != nil {
if err := transfer.Dump(tx, errorCounter); err != nil {
return errors.Wrapf(err, "failed to dump transfers")
}
}
Expand Down
11 changes: 6 additions & 5 deletions internal/dto/dumper.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"github.com/go-pg/pg/orm"
"github.com/insolar/insolar/insolar/record"
"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"
log "github.com/sirupsen/logrus"

"github.com/insolar/observer/internal/configuration"
Expand Down Expand Up @@ -89,25 +90,25 @@ func (d *Dumper) process(rn uint32, rec *record.Material) {
d.buildUnpacked(rec)
}

func (d *Dumper) dump(tx orm.DB, pub replicator.OnDumpSuccess) error {
func (d *Dumper) dump(tx orm.DB, pub replicator.OnDumpSuccess, errorCounter prometheus.Counter) error {
log.Infof("dump raw records")
for _, rec := range d.records {
if err := rec.Dump(tx); err != nil {
if err := rec.Dump(tx, errorCounter); err != nil {
return errors.Wrapf(err, "failed to dump raw records")
}
}
for _, req := range d.requests {
if err := req.Dump(tx); err != nil {
if err := req.Dump(tx, errorCounter); err != nil {
return errors.Wrapf(err, "failed to dump raw requests")
}
}
for _, res := range d.results {
if err := res.Dump(tx); err != nil {
if err := res.Dump(tx, errorCounter); err != nil {
return errors.Wrapf(err, "failed to dump raw results")
}
}
for _, obj := range d.objects {
if err := obj.Dump(tx); err != nil {
if err := obj.Dump(tx, errorCounter); err != nil {
return errors.Wrapf(err, "failed to dump raw objects")
}
}
Expand Down
12 changes: 10 additions & 2 deletions internal/model/beauty/deposit.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ package beauty
import (
"github.com/go-pg/pg/orm"
"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"
"github.com/sirupsen/logrus"
)

type Deposit struct {
Expand All @@ -34,9 +36,15 @@ type Deposit struct {
DepositState string `sql:",notnull"`
}

func (d *Deposit) Dump(tx orm.DB) error {
if err := tx.Insert(d); err != nil {
func (d *Deposit) Dump(tx orm.DB, errorCounter prometheus.Counter) error {
res, err := tx.Model(d).OnConflict("DO NOTHING").Insert(d)
if err != nil {
return errors.Wrapf(err, "failed to insert deposit")
}

if res.RowsAffected() == 0 {
errorCounter.Inc()
logrus.Errorf("Failed to insert deposit: %v", d)
}
return nil
}
36 changes: 0 additions & 36 deletions internal/model/beauty/fee.go

This file was deleted.

12 changes: 10 additions & 2 deletions internal/model/beauty/member.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ package beauty
import (
"github.com/go-pg/pg/orm"
"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"
"github.com/sirupsen/logrus"
)

type Member struct {
Expand All @@ -32,9 +34,15 @@ type Member struct {
Status string
}

func (m *Member) Dump(tx orm.DB) error {
if err := tx.Insert(m); err != nil {
func (m *Member) Dump(tx orm.DB, errorCounter prometheus.Counter) error {
res, err := tx.Model(m).OnConflict("DO NOTHING").Insert(m)
if err != nil {
return errors.Wrapf(err, "failed to insert member")
}

if res.RowsAffected() == 0 {
errorCounter.Inc()
logrus.Errorf("Failed to insert member: %v", m)
}
return nil
}
15 changes: 11 additions & 4 deletions internal/model/beauty/migration_address.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,8 @@ package beauty
import (
"github.com/go-pg/pg/orm"
"github.com/pkg/errors"
log "github.com/sirupsen/logrus"
"github.com/prometheus/client_golang/prometheus"
"github.com/sirupsen/logrus"
)

type MigrationAddress struct {
Expand All @@ -30,9 +31,15 @@ type MigrationAddress struct {
Wasted bool
}

func (a *MigrationAddress) Dump(tx orm.DB) error {
if err := tx.Insert(a); err != nil {
log.Error(errors.Wrapf(err, "failed to insert migration address"))
func (a *MigrationAddress) Dump(tx orm.DB, errorCounter prometheus.Counter) error {
res, err := tx.Model(a).OnConflict("DO NOTHING").Insert(a)
if err != nil {
return errors.Wrapf(err, "failed to insert migration address")
}

if res.RowsAffected() == 0 {
errorCounter.Inc()
logrus.Errorf("Failed to insert migration address: %v", a)
}
return nil
}
11 changes: 9 additions & 2 deletions internal/model/beauty/pulse.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ import (
"github.com/go-pg/pg/orm"
"github.com/insolar/insolar/insolar"
"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"
"github.com/sirupsen/logrus"
)

type Pulse struct {
Expand All @@ -31,10 +33,15 @@ type Pulse struct {
RequestsCount uint32
}

func (p *Pulse) Dump(tx orm.DB) error {
err := tx.Insert(p)
func (p *Pulse) Dump(tx orm.DB, errorCounter prometheus.Counter) error {
res, err := tx.Model(p).OnConflict("DO NOTHING").Insert(p)
if err != nil {
return errors.Wrapf(err, "failed to insert pulse %v", p)
}

if res.RowsAffected() == 0 {
errorCounter.Inc()
logrus.Errorf("Failed to insert pulse: %v", p)
}
return nil
}
13 changes: 11 additions & 2 deletions internal/model/beauty/transfer.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,11 @@ package beauty

import (
"github.com/go-pg/pg/orm"
"github.com/prometheus/client_golang/prometheus"

"github.com/insolar/insolar/insolar"
"github.com/pkg/errors"
"github.com/sirupsen/logrus"
)

type Transfer struct {
Expand All @@ -39,9 +42,15 @@ type Transfer struct {
EthHash string `sql:",notnull"`
}

func (m *Transfer) Dump(tx orm.DB) error {
if err := tx.Insert(m); err != nil {
func (t *Transfer) Dump(tx orm.DB, errorCounter prometheus.Counter) error {
res, err := tx.Model(t).OnConflict("DO NOTHING").Insert(t)
if err != nil {
return errors.Wrapf(err, "failed to insert transfer")
}

if res.RowsAffected() == 0 {
errorCounter.Inc()
logrus.Errorf("Failed to insert transfer: %v", t)
}
return nil
}
25 changes: 0 additions & 25 deletions internal/model/dumper.go

This file was deleted.

Loading

0 comments on commit 93f937d

Please sign in to comment.