This repository has been archived by the owner on Nov 24, 2022. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 1
/
service.go
123 lines (100 loc) · 3.19 KB
/
service.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
package service
import (
"context"
"errors"
"fmt"
"time"
httpapi "github.com/ipfs/go-ipfs-http-client"
"github.com/multiformats/go-multiaddr"
"github.com/textileio/broker-core/cmd/packerd/packer"
"github.com/textileio/broker-core/cmd/packerd/store"
"github.com/textileio/broker-core/msgbroker"
mbroker "github.com/textileio/broker-core/msgbroker"
"github.com/textileio/go-libp2p-pubsub-rpc/finalizer"
golog "github.com/textileio/go-log/v2"
)
var log = golog.Logger("packer/service")
// Config defines params for Service configuration.
type Config struct {
PostgresURI string
PinnerMultiaddr string
IpfsMaddrs []multiaddr.Multiaddr
DaemonFrequency time.Duration
ExportMetricsFrequency time.Duration
TargetSectorSize int64
BatchMinSize int64
BatchMinWaiting time.Duration
BatchWaitScalingFactor int64
CARUploader packer.CARUploader
CARExportURL string
}
// Service is a gRPC service wrapper around an packer.
type Service struct {
packer *packer.Packer
finalizer *finalizer.Finalizer
}
var _ mbroker.ReadyToBatchListener = (*Service)(nil)
// New returns a new Service.
func New(mb mbroker.MsgBroker, conf Config) (*Service, error) {
if err := validateConfig(conf); err != nil {
return nil, fmt.Errorf("config is invalid: %s", err)
}
fin := finalizer.NewFinalizer()
ma, err := multiaddr.NewMultiaddr(conf.PinnerMultiaddr)
if err != nil {
return nil, fmt.Errorf("parsing ipfs client multiaddr: %s", err)
}
pinnerClient, err := httpapi.NewApi(ma)
if err != nil {
return nil, fmt.Errorf("creating ipfs client: %s", err)
}
opts := []packer.Option{
packer.WithDaemonFrequency(conf.DaemonFrequency),
packer.WithSectorSize(conf.TargetSectorSize),
packer.WithCARExportURL(conf.CARExportURL),
packer.WithBatchMinSize(conf.BatchMinSize),
packer.WithBatchMinWaiting(conf.BatchMinWaiting),
packer.WithBatchWaitScalingFactor(conf.BatchWaitScalingFactor),
packer.WithCARUploader(conf.CARUploader),
}
lib, err := packer.New(conf.PostgresURI, pinnerClient, conf.IpfsMaddrs, mb, opts...)
if err != nil {
return nil, fin.Cleanupf("creating packer: %v", err)
}
fin.Add(lib)
s := &Service{
packer: lib,
finalizer: fin,
}
if err := mbroker.RegisterHandlers(mb, s, msgbroker.WithACKDeadline(time.Minute*5)); err != nil {
return nil, fmt.Errorf("registering msgbroker handlers: %s", err)
}
return s, nil
}
// OnReadyToBatch process a message for data ready to be included in a batch.
func (s *Service) OnReadyToBatch(ctx context.Context, opID mbroker.OperationID, srs []mbroker.ReadyToBatchData) error {
err := s.packer.ReadyToBatch(ctx, opID, srs)
if errors.Is(err, store.ErrOperationIDExists) {
log.Warnf("operation-id %s already processed, acking", opID)
return nil
}
if err != nil {
return fmt.Errorf("processing ready to batch: %s", err)
}
return nil
}
// Close the service.
func (s *Service) Close() error {
log.Info("closing service")
defer log.Info("service was shutdown")
return s.finalizer.Cleanup(nil)
}
func validateConfig(conf Config) error {
if conf.PinnerMultiaddr == "" {
return fmt.Errorf("ipfs pinner multiaddr is empty")
}
if conf.PostgresURI == "" {
return fmt.Errorf("postgres uri is empty")
}
return nil
}