Skip to content
This repository has been archived by the owner on Jun 20, 2024. It is now read-only.

Commit

Permalink
feat: use Kubo RPC if routing v1 not set
Browse files Browse the repository at this point in the history
  • Loading branch information
hacdias committed Sep 4, 2023
1 parent 82d25ec commit 715bdf2
Show file tree
Hide file tree
Showing 4 changed files with 155 additions and 8 deletions.
5 changes: 3 additions & 2 deletions docs/environment-variables.md
Original file line number Diff line number Diff line change
Expand Up @@ -65,12 +65,13 @@ to override the gateway address from which to retrieve IPNS Records from.
### `IPNS_RECORD_GATEWAY`

Single URL or a comma separated list of Gateway endpoints that support requests for `application/vnd.ipfs.ipns-record`.
This is used for IPNS Record routing. If not set, the value of `PROXY_GATEWAY_URL` will be
used.
This is used for IPNS Record routing.

`IPNS_RECORD_GATEWAY` also supports [Routing V1 HTTP API](https://specs.ipfs.tech/routing/http-routing-v1/)
for IPNS Record routing ([IPIP-379](https://specs.ipfs.tech/ipips/ipip-0379/)). To use it, the provided URL must end with `/routing/v1`.

If not set, the IPNS records will be fetched from `KUBO_RPC_URL`.

## Saturn Backend

### `STRN_ORCHESTRATOR_URL`
Expand Down
13 changes: 11 additions & 2 deletions handlers.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package main

import (
"errors"
"fmt"
"math/rand"
"net/http"
Expand All @@ -12,6 +13,7 @@ import (
_ "net/http/pprof"

"github.com/ipfs/bifrost-gateway/lib"
"github.com/libp2p/go-libp2p/core/routing"
"go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp"

"github.com/filecoin-saturn/caboose"
Expand Down Expand Up @@ -69,8 +71,15 @@ func withRequestLogger(next http.Handler) http.Handler {
}

func makeGatewayHandler(bs bstore.Blockstore, kuboRPC, gatewayURLs []string, port int, blockCacheSize int, cdns *cachedDNS, useGraphBackend bool) (*http.Server, error) {
// Sets up the routing system, which will proxy the IPNS routing requests to the given gateway.
routing := newProxyRouting(gatewayURLs, cdns)
// Sets up the routing system, which will proxy the IPNS routing requests to the given gateway or kubo RPC.
var routing routing.ValueStore
if len(gatewayURLs) != 0 {
routing = newProxyRouting(gatewayURLs, cdns)
} else if len(kuboRPC) != 0 {
routing = newRPCProxyRouting(kuboRPC, cdns)
} else {
return nil, errors.New("kubo rpc or gateway urls must be provided in order to delegate routing")
}

Check warning on line 82 in handlers.go

View check run for this annotation

Codecov / codecov/patch

handlers.go#L73-L82

Added lines #L73 - L82 were not covered by tests

// Sets up a cache to store blocks in
cacheBlockStore, err := lib.NewCacheBlockStore(blockCacheSize)
Expand Down
4 changes: 0 additions & 4 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,10 +109,6 @@ See documentation at: https://github.com/ipfs/bifrost-gateway/#readme`,
}

ipnsProxyGateway := getEnvs(EnvIPNSRecordGateway, "")
if len(ipnsProxyGateway) == 0 {
ipnsProxyGateway = proxyGateway
}

gatewaySrv, err := makeGatewayHandler(bs, kuboRPC, ipnsProxyGateway, gatewayPort, blockCacheSize, cdns, useGraphBackend)

Check warning on line 112 in main.go

View check run for this annotation

Codecov / codecov/patch

main.go#L111-L112

Added lines #L111 - L112 were not covered by tests
if err != nil {
return err
Expand Down
141 changes: 141 additions & 0 deletions routing.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,11 @@
package main

import (
"bytes"
"context"
"encoding/base64"
"encoding/json"
"errors"
"fmt"
"io"
"math/rand"
Expand All @@ -14,6 +18,143 @@ import (
"go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp"
)

type rpcProxyRouting struct {
kuboRPC []string
httpClient *http.Client
rand *rand.Rand
}

func newRPCProxyRouting(kuboRPC []string, cdns *cachedDNS) routing.ValueStore {

Check warning on line 27 in routing.go

View check run for this annotation

Codecov / codecov/patch

routing.go#L27

Added line #L27 was not covered by tests
s := rand.NewSource(time.Now().Unix())
rand := rand.New(s)

return &rpcProxyRouting{

Check warning on line 31 in routing.go

View check run for this annotation

Codecov / codecov/patch

routing.go#L31

Added line #L31 was not covered by tests
kuboRPC: kuboRPC,
httpClient: &http.Client{
Transport: otelhttp.NewTransport(&customTransport{
// Roundtripper with increased defaults than http.Transport such that retrieving
// multiple lookups concurrently is fast.
RoundTripper: &http.Transport{
MaxIdleConns: 1000,
MaxConnsPerHost: 100,
MaxIdleConnsPerHost: 100,
IdleConnTimeout: 90 * time.Second,
DialContext: cdns.dialWithCachedDNS,
ForceAttemptHTTP2: true,
},
}),
},
rand: rand,
}
}

func (ps *rpcProxyRouting) PutValue(context.Context, string, []byte, ...routing.Option) error {

Check warning on line 51 in routing.go

View check run for this annotation

Codecov / codecov/patch

routing.go#L51

Added line #L51 was not covered by tests
return routing.ErrNotSupported
}

func (ps *rpcProxyRouting) GetValue(ctx context.Context, k string, opts ...routing.Option) ([]byte, error) {

Check warning on line 55 in routing.go

View check run for this annotation

Codecov / codecov/patch

routing.go#L55

Added line #L55 was not covered by tests
return ps.fetch(ctx, k)
}

func (ps *rpcProxyRouting) SearchValue(ctx context.Context, k string, opts ...routing.Option) (<-chan []byte, error) {

Check warning on line 59 in routing.go

View check run for this annotation

Codecov / codecov/patch

routing.go#L59

Added line #L59 was not covered by tests
if !strings.HasPrefix(k, "/ipns/") {
return nil, routing.ErrNotSupported
}

ch := make(chan []byte)

go func() {
v, err := ps.fetch(ctx, k)
if err != nil {
close(ch)
} else {
ch <- v
close(ch)
}
}()

return ch, nil
}

func (ps *rpcProxyRouting) fetch(ctx context.Context, key string) (rb []byte, err error) {

Check warning on line 79 in routing.go

View check run for this annotation

Codecov / codecov/patch

routing.go#L79

Added line #L79 was not covered by tests
name, err := ipns.NameFromRoutingKey([]byte(key))
if err != nil {
return nil, err
}

key = "/ipns/" + name.String()

urlStr := fmt.Sprintf("%s/api/v0/dht/get?arg=%s", ps.getRandomKuboURL(), key)
req, err := http.NewRequestWithContext(ctx, http.MethodPost, urlStr, nil)
if err != nil {
return nil, err
}

goLog.Debugw("routing proxy fetch", "key", key, "from", req.URL.String())
defer func() {
if err != nil {
goLog.Debugw("routing proxy fetch error", "key", key, "from", req.URL.String(), "error", err.Error())
}
}()

resp, err := ps.httpClient.Do(req)
if err != nil {
return nil, err
}
defer resp.Body.Close()

// Read at most 10 KiB (max size of IPNS record).
rb, err = io.ReadAll(io.LimitReader(resp.Body, 10240))
if err != nil {
return nil, err
}

if resp.StatusCode != http.StatusOK {
return nil, fmt.Errorf("routing/get RPC returned unexpected status: %s, body: %q", resp.Status, string(rb))
}

parts := bytes.Split(bytes.TrimSpace(rb), []byte("\n"))
var b64 string

for _, part := range parts {
var evt routing.QueryEvent
err = json.Unmarshal(part, &evt)
if err != nil {
return nil, fmt.Errorf("routing/get RPC response cannot be parsed: %w", err)
}

if evt.Type == routing.Value {
b64 = evt.Extra
break
}
}

if b64 == "" {
return nil, errors.New("routing/get RPC returned no value")
}

rb, err = base64.StdEncoding.DecodeString(b64)
if err != nil {
return nil, err
}

entry, err := ipns.UnmarshalRecord(rb)
if err != nil {
return nil, err
}

err = ipns.ValidateWithName(entry, name)
if err != nil {
return nil, err
}

return rb, nil
}

func (ps *rpcProxyRouting) getRandomKuboURL() string {

Check warning on line 154 in routing.go

View check run for this annotation

Codecov / codecov/patch

routing.go#L154

Added line #L154 was not covered by tests
return ps.kuboRPC[ps.rand.Intn(len(ps.kuboRPC))]
}

type proxyRouting struct {
gatewayURLs []string
httpClient *http.Client
Expand Down

0 comments on commit 715bdf2

Please sign in to comment.