Skip to content

Commit

Permalink
Fix #382: Extract headers from IPFS API requests & apply them to hija…
Browse files Browse the repository at this point in the history
…cked ones.

This commit makes the proxy extract useful fixed headers (like CORS) from
the IPFS daemon API responses and then apply them to the responses
from hijacked endpoints like /add or /repo/stat.

It does this by caching a list of headers from the first IPFS API
response which has them. If we have not performed any proxied request or
managed to obtain the headers we're interested in, this will try triggering a
request to "/api/v0/version" to obtain them first.

This should fix the issues with using Cluster proxy with IPFS Companion and
Chrome.

License: MIT
Signed-off-by: Hector Sanjuan <code@hector.link>
  • Loading branch information
hsanjuan committed Dec 18, 2018
1 parent 9f6a173 commit 862c1eb
Show file tree
Hide file tree
Showing 9 changed files with 132 additions and 33 deletions.
98 changes: 88 additions & 10 deletions api/ipfsproxy/ipfsproxy.go
Expand Up @@ -6,30 +6,40 @@ import (
"fmt"
"net"
"net/http"
"net/http/httptest"
"net/http/httputil"
"net/url"
"strconv"
"strings"
"sync"
"time"

"github.com/ipfs/ipfs-cluster/adder/adderutils"
"github.com/ipfs/ipfs-cluster/api"
"github.com/ipfs/ipfs-cluster/rpcutil"
"github.com/ipfs/ipfs-cluster/version"

cid "github.com/ipfs/go-cid"
logging "github.com/ipfs/go-log"
rpc "github.com/libp2p/go-libp2p-gorpc"
peer "github.com/libp2p/go-libp2p-peer"
madns "github.com/multiformats/go-multiaddr-dns"
manet "github.com/multiformats/go-multiaddr-net"

"github.com/ipfs/ipfs-cluster/adder/adderutils"
"github.com/ipfs/ipfs-cluster/api"
"github.com/ipfs/ipfs-cluster/rpcutil"
)

// DNSTimeout is used when resolving DNS multiaddresses in this module
var DNSTimeout = 5 * time.Second

var logger = logging.Logger("ipfsproxy")

var ipfsHeaderList = []string{
"Server",
"Access-Control-Allow-Headers",
"Access-Control-Expose-Headers",
"Trailer",
"Vary",
}

// Server offers an IPFS API, hijacking some interesting requests
// and forwarding the rest to the ipfs daemon
// it proxies HTTP requests to the configured IPFS
Expand All @@ -48,11 +58,38 @@ type Server struct {
listener net.Listener // proxy listener
server *http.Server // proxy server

onceHeaders sync.Once
ipfsHeaders sync.Map

shutdownLock sync.Mutex
shutdown bool
wg sync.WaitGroup
}

// An http.Handler through which all proxied calls
// must pass (wraps the actual handler).
type proxyHandler struct {
server *Server
handler http.Handler
}

// ServeHTTP extracts interesting headers returned by IPFS responses
// and stores them in our cache.
func (ph *proxyHandler) ServeHTTP(rw http.ResponseWriter, req *http.Request) {
ph.handler.ServeHTTP(rw, req)

// If the "Server" header is not there, we did not do
// successful Header extraction yet. In this case
// we copy all interesting (ipfsHeaderList) headers
// from the proxyResponse.
if !ph.server.isIPFSHeadersKnown() {
srcHeaders := rw.Header()
for _, k := range ipfsHeaderList {
ph.server.ipfsHeaders.Store(k, srcHeaders[k])
}
}
}

type ipfsError struct {
Message string
}
Expand Down Expand Up @@ -118,8 +155,6 @@ func New(cfg *Config) (*Server, error) {
return nil, err
}

proxyHandler := httputil.NewSingleHostReverseProxy(proxyURL)

smux := http.NewServeMux()
s := &http.Server{
ReadTimeout: cfg.ReadTimeout,
Expand All @@ -145,6 +180,12 @@ func New(cfg *Config) (*Server, error) {
listener: l,
server: s,
}

proxyHandler := &proxyHandler{
server: proxy,
handler: httputil.NewSingleHostReverseProxy(proxyURL),
}

smux.Handle("/", proxyHandler)
smux.HandleFunc("/api/v0/pin/add", proxy.pinHandler) // add?arg=xxx
smux.HandleFunc("/api/v0/pin/add/", proxy.pinHandler) // add/xxx
Expand Down Expand Up @@ -218,13 +259,47 @@ func (proxy *Server) run() {
func ipfsErrorResponder(w http.ResponseWriter, errMsg string) {
res := ipfsError{errMsg}
resBytes, _ := json.Marshal(res)
w.Header().Add("Content-Type", "application/json")
w.WriteHeader(http.StatusInternalServerError)
w.Write(resBytes)
return
}

func (proxy *Server) isIPFSHeadersKnown() bool {
_, ok := proxy.ipfsHeaders.Load(ipfsHeaderList[0])
return ok
}

// Set headers that all hijacked endpoints share.
func (proxy *Server) setHeaders(dest http.Header) {
if !proxy.isIPFSHeadersKnown() { // make a request to fetch them
req, err := http.NewRequest("POST", "/api/v0/version", nil)
if err != nil {
logger.Error(err)
} else {
// We use the Recorder() ResponseWriter to simply
// save implementing one ourselves.
// This uses our proxy handler to trigger a proxied
// request which will record the headers once completed.
proxy.server.Handler.ServeHTTP(httptest.NewRecorder(), req)
}
}

// Copy ipfs headers
proxy.ipfsHeaders.Range(func(k, v interface{}) bool {
ks := k.(string)
vs := v.([]string)
dest[ks] = vs
return true
})

// Set Cluster global headers for all hijacked requests
dest.Set("Content-Type", "application/json")
dest.Set("Server", fmt.Sprintf("ipfs-cluster/ipfsproxy/%s", version.Version))
}

func (proxy *Server) pinOpHandler(op string, w http.ResponseWriter, r *http.Request) {
proxy.setHeaders(w.Header())

arg, ok := extractArgument(r.URL)
if !ok {
ipfsErrorResponder(w, "Error: bad argument")
Expand Down Expand Up @@ -252,7 +327,6 @@ func (proxy *Server) pinOpHandler(op string, w http.ResponseWriter, r *http.Requ
Pins: []string{arg},
}
resBytes, _ := json.Marshal(res)
w.Header().Add("Content-Type", "application/json")
w.WriteHeader(http.StatusOK)
w.Write(resBytes)
return
Expand All @@ -267,6 +341,8 @@ func (proxy *Server) unpinHandler(w http.ResponseWriter, r *http.Request) {
}

func (proxy *Server) pinLsHandler(w http.ResponseWriter, r *http.Request) {
proxy.setHeaders(w.Header())

pinLs := ipfsPinLsResp{}
pinLs.Keys = make(map[string]ipfsPinType)

Expand Down Expand Up @@ -314,12 +390,13 @@ func (proxy *Server) pinLsHandler(w http.ResponseWriter, r *http.Request) {
}

resBytes, _ := json.Marshal(pinLs)
w.Header().Add("Content-Type", "application/json")
w.WriteHeader(http.StatusOK)
w.Write(resBytes)
}

func (proxy *Server) addHandler(w http.ResponseWriter, r *http.Request) {
proxy.setHeaders(w.Header())

reader, err := r.MultipartReader()
if err != nil {
ipfsErrorResponder(w, "error reading request: "+err.Error())
Expand Down Expand Up @@ -394,6 +471,8 @@ func (proxy *Server) addHandler(w http.ResponseWriter, r *http.Request) {
}

func (proxy *Server) repoStatHandler(w http.ResponseWriter, r *http.Request) {
proxy.setHeaders(w.Header())

peers := make([]peer.ID, 0)
err := proxy.rpcClient.Call(
"",
Expand Down Expand Up @@ -437,7 +516,6 @@ func (proxy *Server) repoStatHandler(w http.ResponseWriter, r *http.Request) {
}

resBytes, _ := json.Marshal(totalStats)
w.Header().Add("Content-Type", "application/json")
w.WriteHeader(http.StatusOK)
w.Write(resBytes)
return
Expand Down
33 changes: 24 additions & 9 deletions api/ipfsproxy/ipfsproxy_test.go
Expand Up @@ -6,13 +6,14 @@ import (
"io/ioutil"
"net/http"
"net/url"
"strings"
"testing"

logging "github.com/ipfs/go-log"
ma "github.com/multiformats/go-multiaddr"

"github.com/ipfs/ipfs-cluster/api"
"github.com/ipfs/ipfs-cluster/test"

logging "github.com/ipfs/go-log"
ma "github.com/multiformats/go-multiaddr"
)

func init() {
Expand Down Expand Up @@ -40,12 +41,6 @@ func testIPFSProxy(t *testing.T) (*Server, *test.IpfsMock) {
return proxy, mock
}

func TestNewProxy(t *testing.T) {
ipfs, mock := testIPFSProxy(t)
defer mock.Close()
defer ipfs.Shutdown()
}

func TestIPFSProxyVersion(t *testing.T) {
proxy, mock := testIPFSProxy(t)
defer mock.Close()
Expand Down Expand Up @@ -578,3 +573,23 @@ func mustParseURL(rawurl string) *url.URL {
}
return u
}

func TestHeaderExtraction(t *testing.T) {
proxy, mock := testIPFSProxy(t)
defer mock.Close()
defer proxy.Shutdown()

res, err := http.Post(fmt.Sprintf("%s/pin/ls", proxyURL(proxy)), "", nil)
if err != nil {
t.Fatal("should forward requests to ipfs host: ", err)
}
res.Body.Close()

if res.Header.Get("Access-Control-Allow-Headers") != "test-allow-header" {
t.Error("the proxy should have extracted headers from ipfs")
}

if !strings.HasPrefix(res.Header.Get("Server"), "ipfs-cluster") {
t.Error("wrong value for Server header")
}
}
13 changes: 7 additions & 6 deletions cluster.go
Expand Up @@ -15,6 +15,7 @@ import (
"github.com/ipfs/ipfs-cluster/pstoremgr"
"github.com/ipfs/ipfs-cluster/rpcutil"
"github.com/ipfs/ipfs-cluster/state"
"github.com/ipfs/ipfs-cluster/version"

cid "github.com/ipfs/go-cid"
rpc "github.com/libp2p/go-libp2p-gorpc"
Expand Down Expand Up @@ -104,7 +105,7 @@ func NewCluster(
listenAddrs += fmt.Sprintf(" %s/ipfs/%s\n", addr, host.ID().Pretty())
}

logger.Infof("IPFS Cluster v%s listening on:\n%s\n", Version, listenAddrs)
logger.Infof("IPFS Cluster v%s listening on:\n%s\n", version.Version, listenAddrs)

// Note, we already loaded peers from peerstore into the host
// in daemon.go.
Expand Down Expand Up @@ -162,13 +163,13 @@ func NewCluster(
}

func (c *Cluster) setupRPC() error {
rpcServer := rpc.NewServer(c.host, RPCProtocol)
rpcServer := rpc.NewServer(c.host, version.RPCProtocol)
err := rpcServer.RegisterName("Cluster", &RPCAPI{c})
if err != nil {
return err
}
c.rpcServer = rpcServer
rpcClient := rpc.NewClientWithServer(c.host, RPCProtocol, rpcServer)
rpcClient := rpc.NewClientWithServer(c.host, version.RPCProtocol, rpcServer)
c.rpcClient = rpcClient
return nil
}
Expand Down Expand Up @@ -540,8 +541,8 @@ func (c *Cluster) ID() api.ID {
Addresses: addrs,
ClusterPeers: peers,
ClusterPeersAddresses: c.peerManager.PeersAddresses(peers),
Version: Version.String(),
RPCProtocolVersion: RPCProtocol,
Version: version.Version.String(),
RPCProtocolVersion: version.RPCProtocol,
IPFS: ipfsID,
Peername: c.config.Peername,
}
Expand Down Expand Up @@ -1092,7 +1093,7 @@ func (c *Cluster) AddFile(reader *multipart.Reader, params *api.AddParams) (cid.

// Version returns the current IPFS Cluster version.
func (c *Cluster) Version() string {
return Version.String()
return version.Version.String()
}

// Peers returns the IDs of the members of this Cluster.
Expand Down
5 changes: 3 additions & 2 deletions cluster_test.go
Expand Up @@ -18,6 +18,7 @@ import (
"github.com/ipfs/ipfs-cluster/state"
"github.com/ipfs/ipfs-cluster/state/mapstate"
"github.com/ipfs/ipfs-cluster/test"
"github.com/ipfs/ipfs-cluster/version"

cid "github.com/ipfs/go-cid"
rpc "github.com/libp2p/go-libp2p-gorpc"
Expand Down Expand Up @@ -230,7 +231,7 @@ func TestClusterID(t *testing.T) {
if id.ID == "" {
t.Error("expected a cluster ID")
}
if id.Version != Version.String() {
if id.Version != version.Version.String() {
t.Error("version should match current version")
}
//if id.PublicKey == nil {
Expand Down Expand Up @@ -776,7 +777,7 @@ func TestVersion(t *testing.T) {
cl, _, _, _, _ := testingCluster(t)
defer cleanRaft()
defer cl.Shutdown()
if cl.Version() != Version.String() {
if cl.Version() != version.Version.String() {
t.Error("bad Version()")
}
}
Expand Down
7 changes: 4 additions & 3 deletions cmd/ipfs-cluster-service/main.go
Expand Up @@ -13,6 +13,7 @@ import (

ipfscluster "github.com/ipfs/ipfs-cluster"
"github.com/ipfs/ipfs-cluster/state/mapstate"
"github.com/ipfs/ipfs-cluster/version"

semver "github.com/blang/semver"
logging "github.com/ipfs/go-log"
Expand Down Expand Up @@ -117,7 +118,7 @@ var (
func init() {
// Set build information.
if build, err := semver.NewBuildVersion(commit); err == nil {
ipfscluster.Version.Build = []string{"git" + build}
version.Version.Build = []string{"git" + build}
}

// We try guessing user's home from the HOME variable. This
Expand Down Expand Up @@ -164,7 +165,7 @@ func main() {
app.Usage = "IPFS Cluster node"
app.Description = Description
//app.Copyright = "© Protocol Labs, Inc."
app.Version = ipfscluster.Version.String()
app.Version = version.Version.String()
app.Flags = []cli.Flag{
cli.StringFlag{
Name: "config, c",
Expand Down Expand Up @@ -446,7 +447,7 @@ the mth data folder (m currently defaults to 5)
Name: "version",
Usage: "Print the ipfs-cluster version",
Action: func(c *cli.Context) error {
fmt.Printf("%s\n", ipfscluster.Version)
fmt.Printf("%s\n", version.Version)
return nil
},
},
Expand Down
3 changes: 2 additions & 1 deletion ipfscluster_test.go
Expand Up @@ -26,6 +26,7 @@ import (
"github.com/ipfs/ipfs-cluster/state"
"github.com/ipfs/ipfs-cluster/state/mapstate"
"github.com/ipfs/ipfs-cluster/test"
"github.com/ipfs/ipfs-cluster/version"

cid "github.com/ipfs/go-cid"
crypto "github.com/libp2p/go-libp2p-crypto"
Expand Down Expand Up @@ -378,7 +379,7 @@ func TestClustersVersion(t *testing.T) {
defer shutdownClusters(t, clusters, mock)
f := func(t *testing.T, c *Cluster) {
v := c.Version()
if v != Version.String() {
if v != version.Version.String() {
t.Error("Bad version")
}
}
Expand Down
2 changes: 1 addition & 1 deletion release.sh
Expand Up @@ -13,7 +13,7 @@ if [ -z $version ]; then
fi

make gx-clean
sed -i "s/Version = semver\.MustParse.*$/Version = semver.MustParse(\"$version\")/" version.go
sed -i "s/Version = semver\.MustParse.*$/Version = semver.MustParse(\"$version\")/" version/version.go
sed -i "s/const Version.*$/const Version = \"$version\"/" cmd/ipfs-cluster-ctl/main.go
git commit -S -a -m "Release $version"
lastver=`git tag -l | grep -E 'v[0-9]+\.[0-9]+\.[0-9]+$' | tail -n 1`
Expand Down

0 comments on commit 862c1eb

Please sign in to comment.