Skip to content

Commit

Permalink
Merge branch 'master' into feat/sort-responses
Browse files Browse the repository at this point in the history
  • Loading branch information
hsanjuan committed Sep 6, 2019
2 parents 08d7b27 + d9a5e17 commit 81ea1e7
Show file tree
Hide file tree
Showing 36 changed files with 417 additions and 155 deletions.
4 changes: 2 additions & 2 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -38,9 +38,9 @@ test_problem:

$(sharness):
@echo "Downloading sharness"
@curl -L -s -o sharness/lib/sharness.tar.gz http://github.com/chriscool/sharness/archive/8fa4b9b0465d21b7ec114ec4528fa17f5a6eb361.tar.gz
@curl -L -s -o sharness/lib/sharness.tar.gz http://github.com/chriscool/sharness/archive/28c7490f5cdf1e95a8ebebf8b06ed5588db13875.tar.gz
@cd sharness/lib; tar -zxf sharness.tar.gz; cd ../..
@mv sharness/lib/sharness-8fa4b9b0465d21b7ec114ec4528fa17f5a6eb361 sharness/lib/sharness
@mv sharness/lib/sharness-28c7490f5cdf1e95a8ebebf8b06ed5588db13875 sharness/lib/sharness
@rm sharness/lib/sharness.tar.gz

clean_sharness:
Expand Down
47 changes: 47 additions & 0 deletions add_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,53 @@ func TestAdd(t *testing.T) {
})
}

func TestAddWithUserAllocations(t *testing.T) {
ctx := context.Background()
clusters, mock := createClusters(t)
defer shutdownClusters(t, clusters, mock)
sth := test.NewShardingTestHelper()
defer sth.Clean(t)

waitForLeaderAndMetrics(t, clusters)

t.Run("local", func(t *testing.T) {
params := api.DefaultAddParams()
params.ReplicationFactorMin = 2
params.ReplicationFactorMax = 2
params.UserAllocations = []peer.ID{clusters[0].id, clusters[1].id}
params.Shard = false
params.Name = "testlocal"
mfr, closer := sth.GetTreeMultiReader(t)
defer closer.Close()
r := multipart.NewReader(mfr, mfr.Boundary())
ci, err := clusters[0].AddFile(r, params)
if err != nil {
t.Fatal(err)
}

pinDelay()

f := func(t *testing.T, c *Cluster) {
if c == clusters[0] || c == clusters[1] {
pin := c.StatusLocal(ctx, ci)
if pin.Error != "" {
t.Error(pin.Error)
}
if pin.Status != api.TrackerStatusPinned {
t.Error("item should be pinned and is", pin.Status)
}
} else {
pin := c.StatusLocal(ctx, ci)
if pin.Status != api.TrackerStatusRemote {
t.Error("expected tracker status remote")
}
}
}

runF(t, clusters, f)
})
}

func TestAddPeerDown(t *testing.T) {
ctx := context.Background()
clusters, mock := createClusters(t)
Expand Down
16 changes: 14 additions & 2 deletions api/add.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"fmt"
"net/url"
"strconv"
"strings"

cid "github.com/ipfs/go-cid"
)
Expand Down Expand Up @@ -101,9 +102,14 @@ func AddParamsFromQuery(query url.Values) (*AddParams, error) {
params.Layout = layout

chunker := query.Get("chunker")
params.Chunker = chunker
if chunker != "" {
params.Chunker = chunker
}

name := query.Get("name")
params.Name = name
if name != "" {
params.Name = name
}

hashF := query.Get("hash")
if hashF != "" {
Expand Down Expand Up @@ -146,6 +152,11 @@ func AddParamsFromQuery(query url.Values) (*AddParams, error) {
return nil, err
}

allocs := query.Get("user-allocations")
if allocs != "" {
params.UserAllocations = StringsToPeers(strings.Split(allocs, ","))
}

err = parseIntParam(query, "cid-version", &params.CidVersion)
if err != nil {
return nil, err
Expand Down Expand Up @@ -178,6 +189,7 @@ func (p *AddParams) ToQueryString() string {
query.Set("replication-min", fmt.Sprintf("%d", p.ReplicationFactorMin))
query.Set("replication-max", fmt.Sprintf("%d", p.ReplicationFactorMax))
query.Set("name", p.Name)
query.Set("user-allocations", strings.Join(PeersToStrings(p.UserAllocations), ","))
query.Set("shard", fmt.Sprintf("%t", p.Shard))
query.Set("shard-size", fmt.Sprintf("%d", p.ShardSize))
query.Set("recursive", fmt.Sprintf("%t", p.Recursive))
Expand Down
2 changes: 1 addition & 1 deletion api/rest/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -349,7 +349,7 @@ func IsPeerAddress(addr ma.Multiaddr) bool {
if addr == nil {
return false
}
pid, err := addr.ValueForProtocol(ma.P_IPFS)
pid, err := addr.ValueForProtocol(ma.P_P2P)
dnsaddr, err2 := addr.ValueForProtocol(madns.DnsaddrProtocol.Code)
return (pid != "" && err == nil) || (dnsaddr != "" && err2 == nil)
}
4 changes: 2 additions & 2 deletions api/rest/client/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ func apiMAddr(a *rest.API) ma.Multiaddr {
}

func peerMAddr(a *rest.API) ma.Multiaddr {
ipfsAddr, _ := ma.NewMultiaddr(fmt.Sprintf("/ipfs/%s", peer.IDB58Encode(a.Host().ID())))
ipfsAddr, _ := ma.NewMultiaddr(fmt.Sprintf("/p2p/%s", peer.IDB58Encode(a.Host().ID())))
for _, a := range a.Host().Addrs() {
if _, err := a.ValueForProtocol(ma.P_IP4); err == nil {
return a.Encapsulate(ipfsAddr)
Expand Down Expand Up @@ -228,7 +228,7 @@ func TestDNSMultiaddress(t *testing.T) {
}

func TestPeerAddress(t *testing.T) {
peerAddr, _ := ma.NewMultiaddr("/dns4/localhost/tcp/1234/ipfs/QmP7R7gWEnruNePxmCa9GBa4VmUNexLVnb1v47R8Gyo3LP")
peerAddr, _ := ma.NewMultiaddr("/dns4/localhost/tcp/1234/p2p/QmP7R7gWEnruNePxmCa9GBa4VmUNexLVnb1v47R8Gyo3LP")
cfg := &Config{
APIAddr: peerAddr,
Host: "localhost",
Expand Down
10 changes: 9 additions & 1 deletion api/rest/restapi.go
Original file line number Diff line number Diff line change
Expand Up @@ -263,6 +263,10 @@ func (api *API) addRoutes(router *mux.Router) {
),
)
}
router.NotFoundHandler = ochttp.WithRouteTag(
http.HandlerFunc(api.notFoundHandler),
"/notfound",
)
api.router = router
}

Expand Down Expand Up @@ -481,7 +485,7 @@ func (api *API) runLibp2pServer(ctx context.Context) {

listenMsg := ""
for _, a := range api.host.Addrs() {
listenMsg += fmt.Sprintf(" %s/ipfs/%s\n", a, api.host.ID().Pretty())
listenMsg += fmt.Sprintf(" %s/p2p/%s\n", a, api.host.ID().Pretty())
}

logger.Infof("REST API (libp2p-http): ENABLED. Listening on:\n%s\n", listenMsg)
Expand Down Expand Up @@ -1026,6 +1030,10 @@ func (api *API) recoverHandler(w http.ResponseWriter, r *http.Request) {
}
}

func (api *API) notFoundHandler(w http.ResponseWriter, r *http.Request) {
api.sendResponse(w, http.StatusNotFound, errors.New("not found"), nil)
}

func (api *API) parsePinPathOrError(w http.ResponseWriter, r *http.Request) *types.PinPath {
vars := mux.Vars(r)
urlpath := "/" + vars["keyType"] + "/" + strings.TrimSuffix(vars["path"], "/")
Expand Down
28 changes: 28 additions & 0 deletions api/rest/restapi_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"fmt"
"io"
"io/ioutil"
"math/rand"
"net/http"
"net/http/httputil"
"strings"
Expand Down Expand Up @@ -1038,6 +1039,33 @@ func TestAPIRecoverAllEndpoint(t *testing.T) {
testBothEndpoints(t, tf)
}

func TestNotFoundHandler(t *testing.T) {
ctx := context.Background()
rest := testAPI(t)
defer rest.Shutdown(ctx)

tf := func(t *testing.T, url urlF) {
bytes := make([]byte, 10)
for i := 0; i < 10; i++ {
bytes[i] = byte(65 + rand.Intn(25)) //A=65 and Z = 65+25
}

var errResp api.Error
makePost(t, rest, url(rest)+"/"+string(bytes), []byte{}, &errResp)
if errResp.Code != 404 {
t.Error("expected error not found")
}

var errResp1 api.Error
makeGet(t, rest, url(rest)+"/"+string(bytes), &errResp1)
if errResp1.Code != 404 {
t.Error("expected error not found")
}
}

testBothEndpoints(t, tf)
}

func TestCORS(t *testing.T) {
ctx := context.Background()
rest := testAPI(t)
Expand Down
7 changes: 6 additions & 1 deletion api/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,9 @@ import (
var logger = logging.Logger("apitypes")

func init() {
// Use /p2p/ multiaddresses
multiaddr.SwapToP2pMultiaddrs()

// intialize trackerStatusString
stringTrackerStatus = make(map[string]TrackerStatus)
for k, v := range trackerStatusString {
Expand Down Expand Up @@ -528,7 +531,9 @@ func (po *PinOptions) ToQuery() string {
}
q.Set(fmt.Sprintf("%s%s", pinOptionsMetaPrefix, k), v)
}
q.Set("pin-update", po.PinUpdate.String())
if po.PinUpdate != cid.Undef {
q.Set("pin-update", po.PinUpdate.String())
}
return q.Encode()
}

Expand Down
2 changes: 1 addition & 1 deletion api/types_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ import (
var testTime = time.Date(2017, 12, 31, 15, 45, 50, 0, time.UTC)
var testMAddr, _ = ma.NewMultiaddr("/ip4/1.2.3.4")
var testMAddr2, _ = ma.NewMultiaddr("/dns4/a.b.c.d")
var testMAddr3, _ = ma.NewMultiaddr("/ip4/127.0.0.1/tcp/8081/ws/ipfs/QmSoLer265NRgSp2LA3dPaeykiS1J6DifTC88f5uVQKNAd")
var testMAddr3, _ = ma.NewMultiaddr("/ip4/127.0.0.1/tcp/8081/ws/p2p/QmSoLer265NRgSp2LA3dPaeykiS1J6DifTC88f5uVQKNAd")
var testCid1, _ = cid.Decode("QmP63DkAFEnDYNjDYBpyNDfttu1fvUw99x1brscPzpqmmq")
var testCid2, _ = cid.Decode("QmYCLpFCj9Av8NFjkQogvtXspnTDFWaizLpVFEijHTH4eV")
var testCid3, _ = cid.Decode("QmZmdA3UZKuHuy9FrWsxJ82q21nbEh97NUnxTzF5EHxZia")
Expand Down
40 changes: 33 additions & 7 deletions cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
peerstore "github.com/libp2p/go-libp2p-core/peerstore"
rpc "github.com/libp2p/go-libp2p-gorpc"
dht "github.com/libp2p/go-libp2p-kad-dht"
"github.com/libp2p/go-libp2p/p2p/discovery"
ma "github.com/multiformats/go-multiaddr"

ocgorpc "github.com/lanzafame/go-libp2p-ocgorpc"
Expand All @@ -41,6 +42,7 @@ const (
pingMetricName = "ping"
bootstrapCount = 3
reBootstrapInterval = 30 * time.Second
mdnsServiceTag = "_ipfs-cluster-discovery._udp"
)

var (
Expand All @@ -57,6 +59,7 @@ type Cluster struct {
config *Config
host host.Host
dht *dht.IpfsDHT
discovery discovery.Service
datastore ds.Datastore

rpcServer *rpc.Server
Expand Down Expand Up @@ -120,20 +123,31 @@ func NewCluster(

listenAddrs := ""
for _, addr := range host.Addrs() {
listenAddrs += fmt.Sprintf(" %s/ipfs/%s\n", addr, host.ID().Pretty())
listenAddrs += fmt.Sprintf(" %s/p2p/%s\n", addr, host.ID().Pretty())
}

logger.Infof("IPFS Cluster v%s listening on:\n%s\n", version.Version, listenAddrs)

peerManager := pstoremgr.New(ctx, host, cfg.GetPeerstorePath())

var mdns discovery.Service
if cfg.MDNSInterval > 0 {
mdns, err := discovery.NewMdnsService(ctx, host, cfg.MDNSInterval, mdnsServiceTag)
if err != nil {
cancel()
return nil, err
}
mdns.RegisterNotifee(peerManager)
}

c := &Cluster{
ctx: ctx,
cancel: cancel,
id: host.ID(),
config: cfg,
host: host,
dht: dht,
discovery: mdns,
datastore: datastore,
consensus: consensus,
apis: apis,
Expand Down Expand Up @@ -621,6 +635,12 @@ func (c *Cluster) Shutdown(ctx context.Context) error {

logger.Info("shutting down Cluster")

// Cancel discovery service (this shutdowns announcing). Handling
// entries is cancelled along with the context below.
if c.discovery != nil {
c.discovery.Close()
}

// Try to store peerset file for all known peers whatsoever
// if we got ready (otherwise, don't overwrite anything)
if c.readyB {
Expand Down Expand Up @@ -935,17 +955,20 @@ func (c *Cluster) StateSync(ctx context.Context) error {
}

trackedPins := c.tracker.StatusAll(ctx)
trackedPinsMap := make(map[string]int)
for i, tpin := range trackedPins {
trackedPinsMap[tpin.Cid.String()] = i
trackedPinsMap := make(map[string]struct{})
for _, tpin := range trackedPins {
trackedPinsMap[tpin.Cid.String()] = struct{}{}
}

// Track items which are not tracked
for _, pin := range clusterPins {
_, tracked := trackedPinsMap[pin.Cid.String()]
if !tracked {
logger.Debugf("StateSync: tracking %s, part of the shared state", pin.Cid)
c.tracker.Track(ctx, pin)
err = c.tracker.Track(ctx, pin)
if err != nil {
return err
}
}
}

Expand All @@ -970,10 +993,13 @@ func (c *Cluster) StateSync(ctx context.Context) error {
switch {
case p.Status == api.TrackerStatusRemote && allocatedHere:
logger.Debugf("StateSync: Tracking %s locally (currently remote)", pCid)
c.tracker.Track(ctx, currentPin)
err = c.tracker.Track(ctx, currentPin)
case p.Status == api.TrackerStatusPinned && !allocatedHere:
logger.Debugf("StateSync: Tracking %s as remote (currently local)", pCid)
c.tracker.Track(ctx, currentPin)
err = c.tracker.Track(ctx, currentPin)
}
if err != nil {
return err
}
}

Expand Down
Loading

0 comments on commit 81ea1e7

Please sign in to comment.