-
Notifications
You must be signed in to change notification settings - Fork 0
/
main.go
136 lines (121 loc) · 4.66 KB
/
main.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
package main
import (
"context"
"flag"
"os"
"os/signal"
"path/filepath"
"strings"
"github.com/ipfs/go-log/v2"
"github.com/ipni/depute"
"github.com/libp2p/go-libp2p"
"github.com/libp2p/go-libp2p/core/crypto"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials"
)
var logger = log.Logger("depute/cmd")
const (
libp2pUserAgent = "ipni/depute"
)
type arrayFlags []string
func (a *arrayFlags) String() string {
return strings.Join(*a, ", ")
}
func (a *arrayFlags) Set(value string) error {
*a = append(*a, value)
return nil
}
func main() {
httpListenAddr := flag.String("httpListenAddr", "", "Address to listen on for publishing advertisements over HTTP.")
var directAnnounceURLs arrayFlags
flag.Var(&directAnnounceURLs, "directAnnounceURL", "Indexer URL to send direct http announcement to. Multiple OK")
var pubAddrs arrayFlags
flag.Var(&pubAddrs, "pubAddr", "Address to tell indexer where to retrieve advertisements. Multiple OK")
noPubsub := flag.Bool("noPubsub", false, "Disable pubsub announcements of new advertisements.")
libp2pIdentityPath := flag.String("libp2pIdentityPath", "", "Path to the marshalled libp2p host identity. If unspecified a random identity is generated.")
libp2pListenAddrs := flag.String("libp2pListenAddrs", "", "Comma separated libp2p host listen addrs. If unspecified the default listen addrs are used at ephemeral port.")
retrievalAddrs := flag.String("retrievalAddrs", "", "Comma separated retrieval multiaddrs to advertise. If unspecified, libp2p host listen addrs are used.")
grpcListenAddr := flag.String("grpcListenAddr", "0.0.0.0:40080", "The gRPC server listen address.")
grpcTlsCertPath := flag.String("grpcTlsCertPath", "", "Path to gRPC server TLS Certificate.")
grpcTlsKeyPath := flag.String("grpcTlsKeyPath", "", "Path to gRPC server TLS Key.")
logLevel := flag.String("logLevel", "info", "Logging level. Only applied if GOLOG_LOG_LEVEL environment variable is unset.")
topic := flag.String("topic", depute.DefaultTopic, "Sets the topic that pubsub messages are send on.")
flag.Parse()
if _, set := os.LookupEnv("GOLOG_LOG_LEVEL"); !set {
_ = log.SetLogLevel("*", *logLevel)
}
hOpts := []libp2p.Option{
libp2p.UserAgent(libp2pUserAgent),
}
if *libp2pIdentityPath != "" {
p := filepath.Clean(*libp2pIdentityPath)
logger := logger.With("path", p)
logger.Info("Unmarshalling libp2p host identity")
mid, err := os.ReadFile(p)
if err != nil {
logger.Fatalw("Failed to read libp2p host identity file", "err", err)
}
id, err := crypto.UnmarshalPrivateKey(mid)
if err != nil {
logger.Fatalw("Failed to unmarshal libp2p host identity file", "err", err)
}
hOpts = append(hOpts, libp2p.Identity(id))
}
if *libp2pListenAddrs != "" {
hOpts = append(hOpts, libp2p.ListenAddrStrings(strings.Split(*libp2pListenAddrs, ",")...))
}
h, err := libp2p.New(hOpts...)
if err != nil {
logger.Fatalw("Failed to instantiate libp2p host", "err", err)
}
deputeOpts := []depute.Option{
depute.WithHost(h),
depute.WithGrpcListenAddr(*grpcListenAddr),
}
if *retrievalAddrs != "" {
rAddrs := strings.Split(*libp2pListenAddrs, ",")
deputeOpts = append(deputeOpts, depute.WithRetrievalAddrs(rAddrs...))
}
deputeOpts = append(deputeOpts, depute.WithHttpListenAddr(*httpListenAddr))
if *noPubsub {
deputeOpts = append(deputeOpts, depute.WithNoPubsubAnnounce())
}
deputeOpts = append(deputeOpts, depute.WithPublishTopic(*topic))
if len(directAnnounceURLs) != 0 {
deputeOpts = append(deputeOpts, depute.WithDirectAnnounceURLs(directAnnounceURLs))
}
if len(pubAddrs) != 0 {
deputeOpts = append(deputeOpts, depute.WithPublishAddrs(pubAddrs))
}
var gsOpts []grpc.ServerOption
// TODO: expose more flags for gRPC server options.
if *grpcTlsCertPath != *grpcTlsKeyPath {
if *grpcTlsCertPath == "" || *grpcTlsKeyPath == "" {
logger.Fatal("Both TLS Certificate and Key path must be specified.")
} else {
creds, err := credentials.NewServerTLSFromFile(*grpcTlsCertPath, *grpcTlsKeyPath)
if err != nil {
logger.Fatalw("Failed to instantiate server TLS credentials", "err", err)
}
gsOpts = append(gsOpts, grpc.Creds(creds))
}
}
deputeOpts = append(deputeOpts, depute.WithGrpcServerOptions(gsOpts...))
c, err := depute.New(deputeOpts...)
if err != nil {
logger.Fatalw("Failed to instantiate depute", "err", err)
}
ctx := context.Background()
if err := c.Start(ctx); err != nil {
logger.Fatalw("Failed to start depute", "err", err)
}
sch := make(chan os.Signal, 1)
signal.Notify(sch, os.Interrupt)
<-sch
logger.Info("Terminating...")
if err := c.Shutdown(ctx); err != nil {
logger.Warnw("Failure occurred while shutting down server.", "err", err)
} else {
logger.Info("Shut down server successfully.")
}
}