This repository has been archived by the owner on Nov 24, 2022. It is now read-only.
/
statereporting.go
83 lines (72 loc) · 2.27 KB
/
statereporting.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
package dealer
import (
"context"
"fmt"
"time"
"github.com/textileio/broker-core/broker"
"github.com/textileio/broker-core/cmd/dealerd/store"
mbroker "github.com/textileio/broker-core/msgbroker"
)
func (d *Dealer) daemonDealReporter() {
defer d.daemonWg.Done()
for {
select {
case <-d.daemonCtx.Done():
log.Infof("deal reporter daemon closed")
return
case <-time.After(d.config.dealReportingFreq):
if err := d.daemonDealReporterTick(); err != nil {
log.Errorf("deal reporter tick: %s", err)
}
}
}
}
func (d *Dealer) daemonDealReporterTick() error {
for {
ctx, cancel := context.WithTimeout(d.daemonCtx, time.Second*15)
defer cancel()
aud, ok, err := d.store.GetNextPending(ctx, store.StatusReportFinalized)
if err != nil {
return fmt.Errorf("get successful deals: %s", err)
}
if !ok {
break
}
if err := d.reportFinalizedAuctionDeal(ctx, aud); err != nil {
log.Errorf("reporting finalized auction deal: %s", err)
aud.ReadyAt = time.Now().Add(d.config.dealReportingRetryDelay)
if err := d.store.SaveAndMoveAuctionDeal(ctx, aud, store.StatusReportFinalized); err != nil {
return fmt.Errorf("saving reached deadline: %s", err)
}
return nil
}
cancel()
}
return nil
}
func (d *Dealer) reportFinalizedAuctionDeal(ctx context.Context, aud store.AuctionDeal) error {
ad, err := d.store.GetAuctionData(ctx, aud.AuctionDataID)
if err != nil {
return fmt.Errorf("get auction data: %s", err)
}
fad := broker.FinalizedDeal{
BatchID: ad.BatchID,
ErrorCause: aud.ErrorCause,
DealID: aud.DealID,
DealExpiration: aud.DealExpiration,
StorageProviderID: aud.StorageProviderID,
AuctionID: aud.AuctionID,
BidID: aud.BidID,
}
ctx, cancel := context.WithTimeout(context.Background(), time.Second*5)
defer cancel()
log.Debugf("reporting finalized deal (errorcause=%s)", aud.ErrorCause)
if err := mbroker.PublishMsgFinalizedDeal(ctx, d.mb, fad); err != nil {
return fmt.Errorf("publishing finalized-deal msg to msgbroker: %s", err)
}
// We mark the deal as Finalized so it no longer gets picked up by daemonDealReporterTick().
if err := d.store.SaveAndMoveAuctionDeal(ctx, aud, store.StatusFinalized); err != nil {
return fmt.Errorf("finalizing deal: %s", err)
}
return nil
}