Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

routing/http!: support for IPIP-378 (delegated content and peer providing) #539

Draft
wants to merge 16 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 13 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,24 @@ The following emojis are used to highlight certain changes:
* `gateway` now sets the [`Content-Location`](https://developer.mozilla.org/en-US/docs/Web/HTTP/Headers/Content-Location) header for requests with non-default content format, as a result of content negotiation. This allows generic and misconfigured HTTP caches to store Deserialized, CAR and Block responses separately, under distinct cache keys.
* `gateway` now supports `car-dups`, `car-order` and `car-version` as query parameters in addition to the `application/vnd.ipld.car` parameters sent via `Accept` header. The parameters in the `Accept` header have always priority, but including them in URL simplifies HTTP caching and allows use in `Content-Location` header on CAR responses to maximize interoperability with wide array of HTTP caches.
* `bitswap/server` now allows to override the default peer ledger with `WithPeerLedger`.
* ✨ `routing/http`: delegated content and peer routing ([IPIP-378](https://github.com/ipfs/specs/pull/378)) has been implemented. This includes the following additions:
- `client`: now includes `Provide` and `ProvidePeer` methods, which can be used to provide signed records for content routing and peer routing.
- `types`: types related to the Announcement schema record have been added. A `types/iter.Filter` utility has also been added if you want to filter on top of an iterator.

### Changed

* ✨🛠 `routing/http`: delegated content and peer routing ([IPIP-378](https://github.com/ipfs/specs/pull/378)) has been implemented. This includes the following changes:
- `client`: `WithProviderInfo` now accepts a third parameter, `protocols`, whose value is used when providing the peer.
- `contentrouter`: the `Client` interface has been updated to reflect the changes made to the client, that is, replacing the `ProvideBitswap` method by the generic `Provide` method.
- `server`: the `ContentRouter` interface now includes a `Provide` and a `ProvidePeer` functions.

### Removed

- ✨🛠 `routing/http`: delegated content and peer routing ([IPIP-378](https://github.com/ipfs/specs/pull/378)) has been implemented. This includes the following removals:
- Deprecated Bitswap-schema related types and functions have been removed (e.g. `ProvideBitswap`, `BitswapRecord`, `SchemaBitswap`, `WriteProvidersRequest`, `WriteProvidersResponse`).
- `server`: `ContentRouter` no longer includes `ProvideBitswap`
- `contentrouter`: the content router adaptor no longer supports the previously deprecated Bitswap schema.

### Fixed

* `routing/http/server` now returns 404 Status Not Found when no records can be found.
Expand Down
261 changes: 182 additions & 79 deletions routing/http/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
"github.com/libp2p/go-libp2p/core/crypto"
"github.com/libp2p/go-libp2p/core/peer"
"github.com/multiformats/go-multiaddr"
"github.com/multiformats/go-multibase"
)

var (
Expand All @@ -44,14 +45,14 @@
clock clock.Clock
accepts string

peerID peer.ID
addrs []types.Multiaddr
identity crypto.PrivKey
identity crypto.PrivKey
peerID peer.ID
addrs []types.Multiaddr
protocols []string

// Called immediately after signing a provide request. It is used
// Called immediately after signing a provide (peer) request. It is used
// for testing, e.g., testing the server with a mangled signature.
//lint:ignore SA1019 // ignore staticcheck
afterSignCallback func(req *types.WriteBitswapRecord)
afterSignCallback func(req *types.AnnouncementRecord)
}

// defaultUserAgent is used as a fallback to inform HTTP server which library
Expand All @@ -76,14 +77,6 @@

type Option func(*Client) error

func WithIdentity(identity crypto.PrivKey) Option {
return func(c *Client) error {
c.identity = identity
return nil
}
}

// WithHTTPClient sets a custom HTTP Client to be used with [Client].
func WithHTTPClient(h httpClient) Option {
return func(c *Client) error {
c.httpClient = h
Expand Down Expand Up @@ -115,9 +108,17 @@
}
}

func WithProviderInfo(peerID peer.ID, addrs []multiaddr.Multiaddr) Option {
// WithProviderInfo configures the [Client] with the given provider information.
// This is used by the methods [Client.Provide] and [Client.ProvidePeer] in order
// to create and sign announcement records.
//
// You can still use [Client.ProvideRecords] and [Client.ProvidePeerRecords]
// without this configuration. Then, you must provide already signed-records.
func WithProviderInfo(identity crypto.PrivKey, peerID peer.ID, addrs []multiaddr.Multiaddr, protocols []string) Option {
return func(c *Client) error {
c.identity = identity
c.peerID = peerID
c.protocols = protocols
for _, a := range addrs {
c.addrs = append(c.addrs, types.Multiaddr{Multiaddr: a})
}
Expand Down Expand Up @@ -254,102 +255,140 @@
return &measuringIter[iter.Result[types.Record]]{Iter: it, ctx: ctx, m: m}, nil
}

// Deprecated: protocol-agnostic provide is being worked on in [IPIP-378]:
//
// [IPIP-378]: https://github.com/ipfs/specs/pull/378
func (c *Client) ProvideBitswap(ctx context.Context, keys []cid.Cid, ttl time.Duration) (time.Duration, error) {
if c.identity == nil {
return 0, errors.New("cannot provide Bitswap records without an identity")
}
if c.peerID.Size() == 0 {
return 0, errors.New("cannot provide Bitswap records without a peer ID")
}

ks := make([]types.CID, len(keys))
for i, c := range keys {
ks[i] = types.CID{Cid: c}
// Provide publishes [types.AnnouncementRecord]s based on the given [types.AnnouncementRequests].
// This records will be signed by your provided. Therefore, the [Client] must have been configured
// with [WithProviderInfo].
func (c *Client) Provide(ctx context.Context, announcements ...types.AnnouncementRequest) (iter.ResultIter[*types.AnnouncementResponseRecord], error) {
if err := c.canProvide(); err != nil {
return nil, err
}

now := c.clock.Now()
records := make([]*types.AnnouncementRecord, len(announcements))

for i, announcement := range announcements {
record := &types.AnnouncementRecord{
Schema: types.SchemaAnnouncement,
Payload: types.AnnouncementPayload{
CID: announcement.CID,
Scope: announcement.Scope,
Timestamp: now,
TTL: announcement.TTL,
ID: &c.peerID,
Addrs: c.addrs,
Protocols: c.protocols,
},
}

req := types.WriteBitswapRecord{
Protocol: "transport-bitswap",
Schema: types.SchemaBitswap,
Payload: types.BitswapPayload{
Keys: ks,
AdvisoryTTL: &types.Duration{Duration: ttl},
Timestamp: &types.Time{Time: now},
ID: &c.peerID,
Addrs: c.addrs,
},
}
err := req.Sign(c.peerID, c.identity)
if err != nil {
return 0, err
}
if len(announcement.Metadata) != 0 {
var err error
record.Payload.Metadata, err = multibase.Encode(multibase.Base64, announcement.Metadata)
if err != nil {
return nil, fmt.Errorf("multibase-encoding metadata: %w", err)
}

Check warning on line 288 in routing/http/client/client.go

View check run for this annotation

Codecov / codecov/patch

routing/http/client/client.go#L284-L288

Added lines #L284 - L288 were not covered by tests
}

if c.afterSignCallback != nil {
c.afterSignCallback(&req)
}
err := record.Sign(c.peerID, c.identity)
if err != nil {
return nil, err
}

Check warning on line 294 in routing/http/client/client.go

View check run for this annotation

Codecov / codecov/patch

routing/http/client/client.go#L293-L294

Added lines #L293 - L294 were not covered by tests

advisoryTTL, err := c.provideSignedBitswapRecord(ctx, &req)
if err != nil {
return 0, err
if c.afterSignCallback != nil {
c.afterSignCallback(record)
}

records[i] = record
}

return advisoryTTL, err
url := c.baseURL + "/routing/v1/providers"
req := jsontypes.AnnounceProvidersRequest{
Providers: records,
}
return c.provide(ctx, url, req)
}

// ProvideAsync makes a provide request to a delegated router
//
//lint:ignore SA1019 // ignore staticcheck
func (c *Client) provideSignedBitswapRecord(ctx context.Context, bswp *types.WriteBitswapRecord) (time.Duration, error) {
//lint:ignore SA1019 // ignore staticcheck
req := jsontypes.WriteProvidersRequest{Providers: []types.Record{bswp}}
// ProvideRecords publishes the given [types.AnnouncementRecord]. An error will
// be returned if the records aren't signed or valid.
func (c *Client) ProvideRecords(ctx context.Context, records ...*types.AnnouncementRecord) (iter.ResultIter[*types.AnnouncementResponseRecord], error) {
providerRecords := make([]*types.AnnouncementRecord, len(records))
for i, record := range records {
if err := record.Verify(); err != nil {
return nil, err
}
providerRecords[i] = records[i]

Check warning on line 318 in routing/http/client/client.go

View check run for this annotation

Codecov / codecov/patch

routing/http/client/client.go#L312-L318

Added lines #L312 - L318 were not covered by tests
}

url := c.baseURL + "/routing/v1/providers/"
url := c.baseURL + "/routing/v1/providers"
req := jsontypes.AnnounceProvidersRequest{
Providers: providerRecords,
}
return c.provide(ctx, url, req)

Check warning on line 325 in routing/http/client/client.go

View check run for this annotation

Codecov / codecov/patch

routing/http/client/client.go#L321-L325

Added lines #L321 - L325 were not covered by tests
}

func (c *Client) provide(ctx context.Context, url string, req interface{}) (iter.ResultIter[*types.AnnouncementResponseRecord], error) {
b, err := drjson.MarshalJSONBytes(req)
if err != nil {
return 0, err
return nil, err

Check warning on line 331 in routing/http/client/client.go

View check run for this annotation

Codecov / codecov/patch

routing/http/client/client.go#L331

Added line #L331 was not covered by tests
}

httpReq, err := http.NewRequestWithContext(ctx, http.MethodPut, url, bytes.NewBuffer(b))
httpReq, err := http.NewRequestWithContext(ctx, http.MethodPost, url, bytes.NewBuffer(b))
if err != nil {
return 0, err
return nil, err

Check warning on line 336 in routing/http/client/client.go

View check run for this annotation

Codecov / codecov/patch

routing/http/client/client.go#L336

Added line #L336 was not covered by tests
}

resp, err := c.httpClient.Do(httpReq)
if err != nil {
return 0, fmt.Errorf("making HTTP req to provide a signed record: %w", err)
return nil, fmt.Errorf("making HTTP req to provide a signed peer record: %w", err)
}
defer resp.Body.Close()

if resp.StatusCode != http.StatusOK {
return 0, httpError(resp.StatusCode, resp.Body)
resp.Body.Close()
return nil, httpError(resp.StatusCode, resp.Body)
}

//lint:ignore SA1019 // ignore staticcheck
var provideResult jsontypes.WriteProvidersResponse
err = json.NewDecoder(resp.Body).Decode(&provideResult)
respContentType := resp.Header.Get("Content-Type")
mediaType, _, err := mime.ParseMediaType(respContentType)
if err != nil {
return 0, err
}
if len(provideResult.ProvideResults) != 1 {
return 0, fmt.Errorf("expected 1 result but got %d", len(provideResult.ProvideResults))
resp.Body.Close()
return nil, fmt.Errorf("parsing Content-Type: %w", err)

Check warning on line 353 in routing/http/client/client.go

View check run for this annotation

Codecov / codecov/patch

routing/http/client/client.go#L352-L353

Added lines #L352 - L353 were not covered by tests
}

//lint:ignore SA1019 // ignore staticcheck
v, ok := provideResult.ProvideResults[0].(*types.WriteBitswapRecordResponse)
if !ok {
return 0, errors.New("expected AdvisoryTTL field")
}
var skipBodyClose bool
defer func() {
if !skipBodyClose {
resp.Body.Close()
}
}()

if v.AdvisoryTTL != nil {
return v.AdvisoryTTL.Duration, nil
var it iter.ResultIter[*types.AnnouncementResponseRecord]
switch mediaType {
case mediaTypeJSON:
parsedResp := &jsontypes.AnnouncePeersResponse{}
err = json.NewDecoder(resp.Body).Decode(parsedResp)
if err != nil {
return nil, err
}

Check warning on line 370 in routing/http/client/client.go

View check run for this annotation

Codecov / codecov/patch

routing/http/client/client.go#L369-L370

Added lines #L369 - L370 were not covered by tests
var sliceIt iter.Iter[*types.AnnouncementResponseRecord] = iter.FromSlice(parsedResp.ProvideResults)
it = iter.ToResultIter(sliceIt)
case mediaTypeNDJSON:
skipBodyClose = true
it = ndjson.NewAnnouncementResponseRecordsIter(resp.Body)
default:
logger.Errorw("unknown media type", "MediaType", mediaType, "ContentType", respContentType)
return nil, errors.New("unknown content type")

Check warning on line 378 in routing/http/client/client.go

View check run for this annotation

Codecov / codecov/patch

routing/http/client/client.go#L373-L378

Added lines #L373 - L378 were not covered by tests
}

return 0, nil
return it, nil
}

func (c *Client) canProvide() error {
if c.identity == nil {
return errors.New("cannot provide without identity")
}
if c.peerID.Size() == 0 {
return errors.New("cannot provide without peer ID")
}
return nil
}

// FindPeers searches for information for the given [peer.ID].
Expand Down Expand Up @@ -413,6 +452,9 @@
case mediaTypeJSON:
parsedResp := &jsontypes.PeersResponse{}
err = json.NewDecoder(resp.Body).Decode(parsedResp)
if err != nil {
return nil, err
}

Check warning on line 457 in routing/http/client/client.go

View check run for this annotation

Codecov / codecov/patch

routing/http/client/client.go#L456-L457

Added lines #L456 - L457 were not covered by tests
var sliceIt iter.Iter[*types.PeerRecord] = iter.FromSlice(parsedResp.Peers)
it = iter.ToResultIter(sliceIt)
case mediaTypeNDJSON:
Expand All @@ -426,6 +468,67 @@
return &measuringIter[iter.Result[*types.PeerRecord]]{Iter: it, ctx: ctx, m: m}, nil
}

// ProvidePeer publishes an [types.AnnouncementRecord] with the provider
// information from your peer, configured with [WithProviderInfo].
func (c *Client) ProvidePeer(ctx context.Context, ttl time.Duration, metadata []byte) (iter.ResultIter[*types.AnnouncementResponseRecord], error) {
if err := c.canProvide(); err != nil {
return nil, err
}

record := &types.AnnouncementRecord{
Schema: types.SchemaAnnouncement,
Payload: types.AnnouncementPayload{
Timestamp: time.Now(),
TTL: ttl,
ID: &c.peerID,
Addrs: c.addrs,
Protocols: c.protocols,
},
}

if len(metadata) != 0 {
var err error
record.Payload.Metadata, err = multibase.Encode(multibase.Base64, metadata)
if err != nil {
return nil, fmt.Errorf("multibase-encoding metadata: %w", err)
}

Check warning on line 494 in routing/http/client/client.go

View check run for this annotation

Codecov / codecov/patch

routing/http/client/client.go#L490-L494

Added lines #L490 - L494 were not covered by tests
}

err := record.Sign(c.peerID, c.identity)
if err != nil {
return nil, err
}

Check warning on line 500 in routing/http/client/client.go

View check run for this annotation

Codecov / codecov/patch

routing/http/client/client.go#L499-L500

Added lines #L499 - L500 were not covered by tests

if c.afterSignCallback != nil {
c.afterSignCallback(record)
}

url := c.baseURL + "/routing/v1/peers"
req := jsontypes.AnnouncePeersRequest{
Peers: []*types.AnnouncementRecord{record},
}

return c.provide(ctx, url, req)
}

// ProvidePeerRecords publishes the given [types.AnnouncementRecord]. An error will
// be returned if the records aren't signed or valid.
func (c *Client) ProvidePeerRecords(ctx context.Context, records ...*types.AnnouncementRecord) (iter.ResultIter[*types.AnnouncementResponseRecord], error) {
providerRecords := make([]*types.AnnouncementRecord, len(records))
for i, record := range records {
if err := record.Verify(); err != nil {
return nil, err
}
providerRecords[i] = records[i]

Check warning on line 522 in routing/http/client/client.go

View check run for this annotation

Codecov / codecov/patch

routing/http/client/client.go#L516-L522

Added lines #L516 - L522 were not covered by tests
}

url := c.baseURL + "/routing/v1/peers"
req := jsontypes.AnnouncePeersRequest{
Peers: providerRecords,
}
return c.provide(ctx, url, req)

Check warning on line 529 in routing/http/client/client.go

View check run for this annotation

Codecov / codecov/patch

routing/http/client/client.go#L525-L529

Added lines #L525 - L529 were not covered by tests
}

// GetIPNS tries to retrieve the [ipns.Record] for the given [ipns.Name]. The record is
// validated against the given name. If validation fails, an error is returned, but no
// record.
Expand Down
Loading
Loading