-
Notifications
You must be signed in to change notification settings - Fork 5
/
head.go
176 lines (153 loc) · 4.79 KB
/
head.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
package head
import (
"context"
"errors"
"fmt"
"io"
"net"
"net/http"
"path"
"sync"
"time"
"github.com/ipfs/go-cid"
logging "github.com/ipfs/go-log/v2"
"github.com/libp2p/go-libp2p/core/host"
"github.com/libp2p/go-libp2p/core/peer"
"github.com/libp2p/go-libp2p/core/protocol"
"github.com/libp2p/go-libp2p/p2p/net/gostream"
"github.com/multiformats/go-multistream"
)
const closeTimeout = 30 * time.Second
var log = logging.Logger("dagsync/head")
type Publisher struct {
lock sync.Mutex
root cid.Cid
server *http.Server
}
// NewPublisher creates a new head publisher.
func NewPublisher() *Publisher {
p := &Publisher{
server: &http.Server{},
}
p.server.Handler = http.Handler(p)
return p
}
func protocolID(topic string) protocol.ID {
return protocol.ID(path.Join("/legs/head", topic, "0.0.1"))
}
func previousProtocolID(topic string) protocol.ID {
return protocol.ID("/legs/head/" + topic + "/0.0.1")
}
// Serve starts the server using the protocol ID derived from the topic name.
func (p *Publisher) Serve(host host.Host, topic string) error {
return p.serveProtocolID(protocolID(topic), host)
}
// ServePrevious starts the server using the previous protocol ID derived from
// the topic name. This is used for testing, or for cases where it is necessary
// to support clients that do not surrort the current protocol ID>
func (p *Publisher) ServePrevious(host host.Host, topic string) error {
return p.serveProtocolID(previousProtocolID(topic), host)
}
// serveProtocolID starts the server using the given protocol ID.
func (p *Publisher) serveProtocolID(pid protocol.ID, host host.Host) error {
l, err := gostream.Listen(host, pid)
if err != nil {
log.Errorw("Failed to listen to gostream with protocol", "host", host.ID(), "protocolID", pid)
return err
}
log.Infow("Serving gostream", "host", host.ID(), "protocolID", pid)
return p.server.Serve(l)
}
// QueryRootCid queries a server, identified by peerID, for the root (most
// recent) CID. If the server does not support the current protocol ID, then an
// attempt is made to connect to the server using the previous protocol ID.
func QueryRootCid(ctx context.Context, host host.Host, topic string, peerID peer.ID) (cid.Cid, error) {
client := http.Client{
Transport: &http.Transport{
DialContext: func(ctx context.Context, network, addr string) (net.Conn, error) {
conn, err := gostream.Dial(ctx, host, peerID, protocolID(topic))
if err != nil {
// If protocol ID is wrong, then try the previous protocol ID.
var errNoSupport multistream.ErrNotSupported[protocol.ID]
if errors.As(err, &errNoSupport) {
origErr := err
oldProtoID := previousProtocolID(topic)
conn, err = gostream.Dial(ctx, host, peerID, oldProtoID)
if err != nil {
return nil, origErr
}
log.Infow("Peer head CID server uses old protocol ID", "peer", peerID, "proto", oldProtoID)
} else {
return nil, err
}
}
return conn, err
},
},
}
// The httpclient expects there to be a host here. `.invalid` is a reserved
// TLD for this purpose. See
// https://datatracker.ietf.org/doc/html/rfc2606#section-2
resp, err := client.Get("http://unused.invalid/head")
if err != nil {
return cid.Undef, err
}
defer resp.Body.Close()
cidStr, err := io.ReadAll(resp.Body)
if err != nil {
return cid.Undef, fmt.Errorf("cannot fully read response body: %w", err)
}
if len(cidStr) == 0 {
log.Debug("No head is set; returning cid.Undef")
return cid.Undef, nil
}
cs := string(cidStr)
decode, err := cid.Decode(cs)
if err != nil {
return cid.Undef, fmt.Errorf("failed to decode CID %s: %w", cs, err)
}
log.Debugw("Sucessfully queried latest head", "head", decode)
return decode, nil
}
// ServeHTTP satisfies the http.Handler interface.
func (p *Publisher) ServeHTTP(w http.ResponseWriter, r *http.Request) {
base := path.Base(r.URL.Path)
if base != "head" {
log.Debug("Only head is supported; rejecting request with different base path")
http.Error(w, "", http.StatusNotFound)
return
}
p.lock.Lock()
rootCid := p.root
p.lock.Unlock()
var out []byte
if rootCid != cid.Undef {
currentHead := rootCid.String()
log.Debug("Found current head: %s", currentHead)
out = []byte(currentHead)
} else {
log.Debug("No head is set; responding with empty")
}
_, err := w.Write(out)
if err != nil {
log.Errorw("Failed to write response", "err", err)
}
}
// SetRoot sets the CID being published.
func (p *Publisher) SetRoot(c cid.Cid) {
p.lock.Lock()
p.root = c
p.lock.Unlock()
}
// Close stops the server.
func (p *Publisher) Close() error {
ctx, cancel := context.WithTimeout(context.Background(), closeTimeout)
defer cancel()
return p.server.Shutdown(ctx)
}
// Root returns the current root being publisher.
func (p *Publisher) Root() cid.Cid {
p.lock.Lock()
defer p.lock.Unlock()
return p.root
}