-
Notifications
You must be signed in to change notification settings - Fork 0
/
offers_processor.go
114 lines (96 loc) · 3.17 KB
/
offers_processor.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
package processors
import (
"context"
"github.com/lantah/go/ingest"
"github.com/lantah/go/services/orbitr/internal/db2/history"
"github.com/lantah/go/support/errors"
"github.com/lantah/go/xdr"
)
// The offers processor can be configured to trim the offers table
// by removing all offer rows which were marked for deletion at least 100 ledgers ago
const compactionWindow = uint32(100)
type OffersProcessor struct {
offersQ history.QOffers
sequence uint32
cache *ingest.ChangeCompactor
}
func NewOffersProcessor(offersQ history.QOffers, sequence uint32) *OffersProcessor {
p := &OffersProcessor{offersQ: offersQ, sequence: sequence}
p.reset()
return p
}
func (p *OffersProcessor) reset() {
p.cache = ingest.NewChangeCompactor()
}
func (p *OffersProcessor) ProcessChange(ctx context.Context, change ingest.Change) error {
if change.Type != xdr.LedgerEntryTypeOffer {
return nil
}
if err := p.cache.AddChange(change); err != nil {
return errors.Wrap(err, "error adding to ledgerCache")
}
if p.cache.Size() > maxBatchSize {
if err := p.flushCache(ctx); err != nil {
return errors.Wrap(err, "error in Commit")
}
p.reset()
}
return nil
}
func (p *OffersProcessor) ledgerEntryToRow(entry *xdr.LedgerEntry) history.Offer {
offer := entry.Data.MustOffer()
return history.Offer{
SellerID: offer.SellerId.Address(),
OfferID: int64(offer.OfferId),
SellingAsset: offer.Selling,
BuyingAsset: offer.Buying,
Amount: int64(offer.Amount),
Pricen: int32(offer.Price.N),
Priced: int32(offer.Price.D),
Price: float64(offer.Price.N) / float64(offer.Price.D),
Flags: int32(offer.Flags),
LastModifiedLedger: uint32(entry.LastModifiedLedgerSeq),
Sponsor: ledgerEntrySponsorToNullString(*entry),
}
}
func (p *OffersProcessor) flushCache(ctx context.Context) error {
var batchUpsertOffers []history.Offer
changes := p.cache.GetChanges()
for _, change := range changes {
switch {
case change.Post != nil:
// Created and updated
row := p.ledgerEntryToRow(change.Post)
batchUpsertOffers = append(batchUpsertOffers, row)
case change.Pre != nil && change.Post == nil:
// Removed
row := p.ledgerEntryToRow(change.Pre)
row.Deleted = true
row.LastModifiedLedger = p.sequence
batchUpsertOffers = append(batchUpsertOffers, row)
default:
return errors.New("Invalid io.Change: change.Pre == nil && change.Post == nil")
}
}
if len(batchUpsertOffers) > 0 {
err := p.offersQ.UpsertOffers(ctx, batchUpsertOffers)
if err != nil {
return errors.Wrap(err, "errors in UpsertOffers")
}
}
return nil
}
func (p *OffersProcessor) Commit(ctx context.Context) error {
if err := p.flushCache(ctx); err != nil {
return errors.Wrap(err, "error flushing cache")
}
if p.sequence > compactionWindow {
// trim offers table by removing offers which were deleted before the cutoff ledger
if offerRowsRemoved, err := p.offersQ.CompactOffers(ctx, p.sequence-compactionWindow); err != nil {
return errors.Wrap(err, "could not compact offers")
} else {
log.WithField("offer_rows_removed", offerRowsRemoved).Info("Trimmed offers table")
}
}
return nil
}