Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix #382: Extract headers from IPFS API requests and apply them to hijacked ones. #623

Merged
merged 3 commits into from
Dec 20, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
101 changes: 91 additions & 10 deletions api/ipfsproxy/ipfsproxy.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,30 +6,40 @@ import (
"fmt"
"net"
"net/http"
"net/http/httptest"
"net/http/httputil"
"net/url"
"strconv"
"strings"
"sync"
"time"

"github.com/ipfs/ipfs-cluster/adder/adderutils"
"github.com/ipfs/ipfs-cluster/api"
"github.com/ipfs/ipfs-cluster/rpcutil"
"github.com/ipfs/ipfs-cluster/version"

cid "github.com/ipfs/go-cid"
logging "github.com/ipfs/go-log"
rpc "github.com/libp2p/go-libp2p-gorpc"
peer "github.com/libp2p/go-libp2p-peer"
madns "github.com/multiformats/go-multiaddr-dns"
manet "github.com/multiformats/go-multiaddr-net"

"github.com/ipfs/ipfs-cluster/adder/adderutils"
"github.com/ipfs/ipfs-cluster/api"
"github.com/ipfs/ipfs-cluster/rpcutil"
)

// DNSTimeout is used when resolving DNS multiaddresses in this module
var DNSTimeout = 5 * time.Second

var logger = logging.Logger("ipfsproxy")

var ipfsHeaderList = []string{
"Server",
"Access-Control-Allow-Headers",
"Access-Control-Expose-Headers",
"Trailer",
"Vary",
}

// Server offers an IPFS API, hijacking some interesting requests
// and forwarding the rest to the ipfs daemon
// it proxies HTTP requests to the configured IPFS
Expand All @@ -48,11 +58,38 @@ type Server struct {
listener net.Listener // proxy listener
server *http.Server // proxy server

onceHeaders sync.Once
ipfsHeaders sync.Map

shutdownLock sync.Mutex
shutdown bool
wg sync.WaitGroup
}

// An http.Handler through which all proxied calls
// must pass (wraps the actual handler).
type proxyHandler struct {
server *Server
handler http.Handler
}

// ServeHTTP extracts interesting headers returned by IPFS responses
// and stores them in our cache.
func (ph *proxyHandler) ServeHTTP(rw http.ResponseWriter, req *http.Request) {
ph.handler.ServeHTTP(rw, req)

hdrs := make(http.Header)
ok := ph.server.setIPFSHeaders(hdrs)
if !ok {
// we are missing some headers we want, try
// to copy the ones coming on this proxied request.
srcHeaders := rw.Header()
for _, k := range ipfsHeaderList {
ph.server.ipfsHeaders.Store(k, srcHeaders[k])
}
}
}

type ipfsError struct {
Message string
}
Expand Down Expand Up @@ -118,8 +155,6 @@ func New(cfg *Config) (*Server, error) {
return nil, err
}

proxyHandler := httputil.NewSingleHostReverseProxy(proxyURL)

smux := http.NewServeMux()
s := &http.Server{
ReadTimeout: cfg.ReadTimeout,
Expand All @@ -145,6 +180,12 @@ func New(cfg *Config) (*Server, error) {
listener: l,
server: s,
}

proxyHandler := &proxyHandler{
server: proxy,
handler: httputil.NewSingleHostReverseProxy(proxyURL),
}

smux.Handle("/", proxyHandler)
smux.HandleFunc("/api/v0/pin/add", proxy.pinHandler) // add?arg=xxx
smux.HandleFunc("/api/v0/pin/add/", proxy.pinHandler) // add/xxx
Expand Down Expand Up @@ -218,13 +259,50 @@ func (proxy *Server) run() {
func ipfsErrorResponder(w http.ResponseWriter, errMsg string) {
res := ipfsError{errMsg}
resBytes, _ := json.Marshal(res)
w.Header().Add("Content-Type", "application/json")
w.WriteHeader(http.StatusInternalServerError)
w.Write(resBytes)
return
}

// setIPFSHeaders adds the known IPFS Headers to the destination
// and returns true if we could set all the headers in the list.
func (proxy *Server) setIPFSHeaders(dest http.Header) bool {
r := true
for _, h := range ipfsHeaderList {
v, ok := proxy.ipfsHeaders.Load(h)
if !ok {
r = false
continue
}
dest[h] = v.([]string)
}
return r
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(From looking at this and references of isIPFSHeadersKnown) It assumes that if Server header is present, remaining headers are present as well. Shouldn't we check other headers? Depending we need to check for other headers or not the name can be changed to areIPFSHeadersKnown or isServerHeaderPresent.

Suggested change
}
func (proxy *Server) areIPFSHeadersKnown() bool {
for _, header := range ipfsHeaderList {
if _, ok := proxy.ipfsHeaders.Load(header); !ok {
return !ok
}
}
return true
}

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is there a Go unwritten rule saying that boolean functions start with is? @lanzafame ?

Shouldn't we check other headers?

We can assume that if we learned about Server (or any one of them) we have learned about the rest. But maybe worth checking the whole list.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is there a Go unwritten rule saying that boolean functions start with is?

There is this:

"The Go Programming Language" by Donovan and Kernighan recommends in Chapter 6 that

Functions that merely access or modify internal values of a type..are called getters and setters. > However, when naming a getter method we usually omit the Get prefix. This Preference for brevity extends to all methods not just field accessors, and to other reductant prefixes as well, such as Fetch, Find and Lookup

But I wouldn't apply that to is. Though in this particular case, Known is enough of an indicator that this is a boolean predicate.


// Set headers that all hijacked endpoints share.
func (proxy *Server) setHeaders(dest http.Header) {
if ok := proxy.setIPFSHeaders(dest); !ok {
req, err := http.NewRequest("POST", "/api/v0/version", nil)
if err != nil {
logger.Error(err)
} else {
// We use the Recorder() ResponseWriter to simply
// save implementing one ourselves.
// This uses our proxy handler to trigger a proxied
// request which will record the headers once completed.
proxy.server.Handler.ServeHTTP(httptest.NewRecorder(), req)
proxy.setIPFSHeaders(dest)
}
}

// Set Cluster global headers for all hijacked requests
dest.Set("Content-Type", "application/json")
dest.Set("Server", fmt.Sprintf("ipfs-cluster/ipfsproxy/%s", version.Version))
}

func (proxy *Server) pinOpHandler(op string, w http.ResponseWriter, r *http.Request) {
proxy.setHeaders(w.Header())

arg, ok := extractArgument(r.URL)
if !ok {
ipfsErrorResponder(w, "Error: bad argument")
Expand Down Expand Up @@ -252,7 +330,6 @@ func (proxy *Server) pinOpHandler(op string, w http.ResponseWriter, r *http.Requ
Pins: []string{arg},
}
resBytes, _ := json.Marshal(res)
w.Header().Add("Content-Type", "application/json")
w.WriteHeader(http.StatusOK)
w.Write(resBytes)
return
Expand All @@ -267,6 +344,8 @@ func (proxy *Server) unpinHandler(w http.ResponseWriter, r *http.Request) {
}

func (proxy *Server) pinLsHandler(w http.ResponseWriter, r *http.Request) {
proxy.setHeaders(w.Header())

pinLs := ipfsPinLsResp{}
pinLs.Keys = make(map[string]ipfsPinType)

Expand Down Expand Up @@ -314,12 +393,13 @@ func (proxy *Server) pinLsHandler(w http.ResponseWriter, r *http.Request) {
}

resBytes, _ := json.Marshal(pinLs)
w.Header().Add("Content-Type", "application/json")
w.WriteHeader(http.StatusOK)
w.Write(resBytes)
}

func (proxy *Server) addHandler(w http.ResponseWriter, r *http.Request) {
proxy.setHeaders(w.Header())

reader, err := r.MultipartReader()
if err != nil {
ipfsErrorResponder(w, "error reading request: "+err.Error())
Expand Down Expand Up @@ -394,6 +474,8 @@ func (proxy *Server) addHandler(w http.ResponseWriter, r *http.Request) {
}

func (proxy *Server) repoStatHandler(w http.ResponseWriter, r *http.Request) {
proxy.setHeaders(w.Header())

peers := make([]peer.ID, 0)
err := proxy.rpcClient.Call(
"",
Expand Down Expand Up @@ -437,7 +519,6 @@ func (proxy *Server) repoStatHandler(w http.ResponseWriter, r *http.Request) {
}

resBytes, _ := json.Marshal(totalStats)
w.Header().Add("Content-Type", "application/json")
w.WriteHeader(http.StatusOK)
w.Write(resBytes)
return
Expand Down
33 changes: 24 additions & 9 deletions api/ipfsproxy/ipfsproxy_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,14 @@ import (
"io/ioutil"
"net/http"
"net/url"
"strings"
"testing"

logging "github.com/ipfs/go-log"
ma "github.com/multiformats/go-multiaddr"

"github.com/ipfs/ipfs-cluster/api"
"github.com/ipfs/ipfs-cluster/test"

logging "github.com/ipfs/go-log"
ma "github.com/multiformats/go-multiaddr"
)

func init() {
Expand Down Expand Up @@ -40,12 +41,6 @@ func testIPFSProxy(t *testing.T) (*Server, *test.IpfsMock) {
return proxy, mock
}

func TestNewProxy(t *testing.T) {
ipfs, mock := testIPFSProxy(t)
defer mock.Close()
defer ipfs.Shutdown()
}

func TestIPFSProxyVersion(t *testing.T) {
proxy, mock := testIPFSProxy(t)
defer mock.Close()
Expand Down Expand Up @@ -578,3 +573,23 @@ func mustParseURL(rawurl string) *url.URL {
}
return u
}

func TestHeaderExtraction(t *testing.T) {
proxy, mock := testIPFSProxy(t)
defer mock.Close()
defer proxy.Shutdown()

res, err := http.Post(fmt.Sprintf("%s/pin/ls", proxyURL(proxy)), "", nil)
if err != nil {
t.Fatal("should forward requests to ipfs host: ", err)
}
res.Body.Close()

if res.Header.Get("Access-Control-Allow-Headers") != "test-allow-header" {
t.Error("the proxy should have extracted headers from ipfs")
}

if !strings.HasPrefix(res.Header.Get("Server"), "ipfs-cluster") {
t.Error("wrong value for Server header")
}
}
13 changes: 7 additions & 6 deletions cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
"github.com/ipfs/ipfs-cluster/pstoremgr"
"github.com/ipfs/ipfs-cluster/rpcutil"
"github.com/ipfs/ipfs-cluster/state"
"github.com/ipfs/ipfs-cluster/version"

cid "github.com/ipfs/go-cid"
rpc "github.com/libp2p/go-libp2p-gorpc"
Expand Down Expand Up @@ -104,7 +105,7 @@ func NewCluster(
listenAddrs += fmt.Sprintf(" %s/ipfs/%s\n", addr, host.ID().Pretty())
}

logger.Infof("IPFS Cluster v%s listening on:\n%s\n", Version, listenAddrs)
logger.Infof("IPFS Cluster v%s listening on:\n%s\n", version.Version, listenAddrs)

// Note, we already loaded peers from peerstore into the host
// in daemon.go.
Expand Down Expand Up @@ -162,13 +163,13 @@ func NewCluster(
}

func (c *Cluster) setupRPC() error {
rpcServer := rpc.NewServer(c.host, RPCProtocol)
rpcServer := rpc.NewServer(c.host, version.RPCProtocol)
err := rpcServer.RegisterName("Cluster", &RPCAPI{c})
if err != nil {
return err
}
c.rpcServer = rpcServer
rpcClient := rpc.NewClientWithServer(c.host, RPCProtocol, rpcServer)
rpcClient := rpc.NewClientWithServer(c.host, version.RPCProtocol, rpcServer)
c.rpcClient = rpcClient
return nil
}
Expand Down Expand Up @@ -540,8 +541,8 @@ func (c *Cluster) ID() api.ID {
Addresses: addrs,
ClusterPeers: peers,
ClusterPeersAddresses: c.peerManager.PeersAddresses(peers),
Version: Version.String(),
RPCProtocolVersion: RPCProtocol,
Version: version.Version.String(),
RPCProtocolVersion: version.RPCProtocol,
IPFS: ipfsID,
Peername: c.config.Peername,
}
Expand Down Expand Up @@ -1092,7 +1093,7 @@ func (c *Cluster) AddFile(reader *multipart.Reader, params *api.AddParams) (cid.

// Version returns the current IPFS Cluster version.
func (c *Cluster) Version() string {
return Version.String()
return version.Version.String()
}

// Peers returns the IDs of the members of this Cluster.
Expand Down
5 changes: 3 additions & 2 deletions cluster_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"github.com/ipfs/ipfs-cluster/state"
"github.com/ipfs/ipfs-cluster/state/mapstate"
"github.com/ipfs/ipfs-cluster/test"
"github.com/ipfs/ipfs-cluster/version"

cid "github.com/ipfs/go-cid"
rpc "github.com/libp2p/go-libp2p-gorpc"
Expand Down Expand Up @@ -230,7 +231,7 @@ func TestClusterID(t *testing.T) {
if id.ID == "" {
t.Error("expected a cluster ID")
}
if id.Version != Version.String() {
if id.Version != version.Version.String() {
t.Error("version should match current version")
}
//if id.PublicKey == nil {
Expand Down Expand Up @@ -776,7 +777,7 @@ func TestVersion(t *testing.T) {
cl, _, _, _, _ := testingCluster(t)
defer cleanRaft()
defer cl.Shutdown()
if cl.Version() != Version.String() {
if cl.Version() != version.Version.String() {
t.Error("bad Version()")
}
}
Expand Down
7 changes: 4 additions & 3 deletions cmd/ipfs-cluster-service/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (

ipfscluster "github.com/ipfs/ipfs-cluster"
"github.com/ipfs/ipfs-cluster/state/mapstate"
"github.com/ipfs/ipfs-cluster/version"

semver "github.com/blang/semver"
logging "github.com/ipfs/go-log"
Expand Down Expand Up @@ -117,7 +118,7 @@ var (
func init() {
// Set build information.
if build, err := semver.NewBuildVersion(commit); err == nil {
ipfscluster.Version.Build = []string{"git" + build}
version.Version.Build = []string{"git" + build}
}

// We try guessing user's home from the HOME variable. This
Expand Down Expand Up @@ -164,7 +165,7 @@ func main() {
app.Usage = "IPFS Cluster node"
app.Description = Description
//app.Copyright = "© Protocol Labs, Inc."
app.Version = ipfscluster.Version.String()
app.Version = version.Version.String()
app.Flags = []cli.Flag{
cli.StringFlag{
Name: "config, c",
Expand Down Expand Up @@ -446,7 +447,7 @@ the mth data folder (m currently defaults to 5)
Name: "version",
Usage: "Print the ipfs-cluster version",
Action: func(c *cli.Context) error {
fmt.Printf("%s\n", ipfscluster.Version)
fmt.Printf("%s\n", version.Version)
return nil
},
},
Expand Down
3 changes: 2 additions & 1 deletion ipfscluster_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
"github.com/ipfs/ipfs-cluster/state"
"github.com/ipfs/ipfs-cluster/state/mapstate"
"github.com/ipfs/ipfs-cluster/test"
"github.com/ipfs/ipfs-cluster/version"

cid "github.com/ipfs/go-cid"
crypto "github.com/libp2p/go-libp2p-crypto"
Expand Down Expand Up @@ -378,7 +379,7 @@ func TestClustersVersion(t *testing.T) {
defer shutdownClusters(t, clusters, mock)
f := func(t *testing.T, c *Cluster) {
v := c.Version()
if v != Version.String() {
if v != version.Version.String() {
t.Error("Bad version")
}
}
Expand Down
Loading