Skip to content

Commit

Permalink
Merge pull request #773 from ipfs/feat/component-rpc-apis
Browse files Browse the repository at this point in the history
RPC: Give each component a different RPC Service
  • Loading branch information
hsanjuan committed May 7, 2019
2 parents 7e700e2 + 3d49ac2 commit 8b19f81
Show file tree
Hide file tree
Showing 25 changed files with 443 additions and 320 deletions.
2 changes: 1 addition & 1 deletion adder/local/dag_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ type DAGService struct {
}

// New returns a new Adder with the given rpc Client. The client is used
// to perform calls to IPFSBlockPut and Pin content on Cluster.
// to perform calls to IPFS.BlockPut and Pin content on Cluster.
func New(rpc *rpc.Client, opts api.PinOptions) *DAGService {
return &DAGService{
rpcClient: rpc,
Expand Down
39 changes: 26 additions & 13 deletions adder/local/dag_service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,22 +16,25 @@ import (
rpc "github.com/libp2p/go-libp2p-gorpc"
)

type testRPC struct {
type testIPFSRPC struct {
blocks sync.Map
pins sync.Map
}

func (rpcs *testRPC) IPFSBlockPut(ctx context.Context, in *api.NodeWithMeta, out *struct{}) error {
type testClusterRPC struct {
pins sync.Map
}

func (rpcs *testIPFSRPC) BlockPut(ctx context.Context, in *api.NodeWithMeta, out *struct{}) error {
rpcs.blocks.Store(in.Cid.String(), in)
return nil
}

func (rpcs *testRPC) Pin(ctx context.Context, in *api.Pin, out *struct{}) error {
func (rpcs *testClusterRPC) Pin(ctx context.Context, in *api.Pin, out *struct{}) error {
rpcs.pins.Store(in.Cid.String(), in)
return nil
}

func (rpcs *testRPC) BlockAllocate(ctx context.Context, in *api.Pin, out *[]peer.ID) error {
func (rpcs *testClusterRPC) BlockAllocate(ctx context.Context, in *api.Pin, out *[]peer.ID) error {
if in.ReplicationFactorMin > 1 {
return errors.New("we can only replicate to 1 peer")
}
Expand All @@ -43,9 +46,14 @@ func (rpcs *testRPC) BlockAllocate(ctx context.Context, in *api.Pin, out *[]peer

func TestAdd(t *testing.T) {
t.Run("balanced", func(t *testing.T) {
rpcObj := &testRPC{}
clusterRPC := &testClusterRPC{}
ipfsRPC := &testIPFSRPC{}
server := rpc.NewServer(nil, "mock")
err := server.RegisterName("Cluster", rpcObj)
err := server.RegisterName("Cluster", clusterRPC)
if err != nil {
t.Fatal(err)
}
err = server.RegisterName("IPFSConnector", ipfsRPC)
if err != nil {
t.Fatal(err)
}
Expand Down Expand Up @@ -73,22 +81,27 @@ func TestAdd(t *testing.T) {

expected := test.ShardingDirCids[:]
for _, c := range expected {
_, ok := rpcObj.blocks.Load(c)
_, ok := ipfsRPC.blocks.Load(c)
if !ok {
t.Error("no IPFSBlockPut for block", c)
t.Error("no IPFS.BlockPut for block", c)
}
}

_, ok := rpcObj.pins.Load(test.ShardingDirBalancedRootCIDWrapped)
_, ok := clusterRPC.pins.Load(test.ShardingDirBalancedRootCIDWrapped)
if !ok {
t.Error("the tree wasn't pinned")
}
})

t.Run("trickle", func(t *testing.T) {
rpcObj := &testRPC{}
clusterRPC := &testClusterRPC{}
ipfsRPC := &testIPFSRPC{}
server := rpc.NewServer(nil, "mock")
err := server.RegisterName("Cluster", rpcObj)
err := server.RegisterName("Cluster", clusterRPC)
if err != nil {
t.Fatal(err)
}
err = server.RegisterName("IPFSConnector", ipfsRPC)
if err != nil {
t.Fatal(err)
}
Expand All @@ -114,7 +127,7 @@ func TestAdd(t *testing.T) {
t.Fatal("bad root cid")
}

_, ok := rpcObj.pins.Load(test.ShardingDirTrickleRootCID)
_, ok := clusterRPC.pins.Load(test.ShardingDirTrickleRootCID)
if !ok {
t.Error("the tree wasn't pinned")
}
Expand Down
4 changes: 2 additions & 2 deletions adder/sharding/dag.go
Original file line number Diff line number Diff line change
Expand Up @@ -140,8 +140,8 @@ func putDAG(ctx context.Context, rpcC *rpc.Client, nodes []ipld.Node, dests []pe
errs := rpcC.MultiCall(
ctxs,
dests,
"Cluster",
"IPFSBlockPut",
"IPFSConnector",
"BlockPut",
b,
rpcutil.RPCDiscardReplies(len(dests)),
)
Expand Down
6 changes: 5 additions & 1 deletion adder/sharding/dag_service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ type testRPC struct {
pins sync.Map
}

func (rpcs *testRPC) IPFSBlockPut(ctx context.Context, in *api.NodeWithMeta, out *struct{}) error {
func (rpcs *testRPC) BlockPut(ctx context.Context, in *api.NodeWithMeta, out *struct{}) error {
rpcs.blocks.Store(in.Cid.String(), in.Data)
return nil
}
Expand Down Expand Up @@ -70,6 +70,10 @@ func makeAdder(t *testing.T, params *api.AddParams) (*adder.Adder, *testRPC) {
if err != nil {
t.Fatal(err)
}
err = server.RegisterName("IPFSConnector", rpcObj)
if err != nil {
t.Fatal(err)
}
client := rpc.NewClientWithServer(nil, "mock", server)

out := make(chan *api.AddedOutput, 1)
Expand Down
4 changes: 2 additions & 2 deletions adder/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,8 @@ func PutBlock(ctx context.Context, rpc *rpc.Client, n *api.NodeWithMeta, dests [
errs := rpc.MultiCall(
ctxs,
dests,
"Cluster",
"IPFSBlockPut",
"IPFSConnector",
"BlockPut",
n,
rpcutil.RPCDiscardReplies(len(dests)),
)
Expand Down
12 changes: 6 additions & 6 deletions api/ipfsproxy/ipfsproxy.go
Original file line number Diff line number Diff line change
Expand Up @@ -435,8 +435,8 @@ func (proxy *Server) pinUpdateHandler(w http.ResponseWriter, r *http.Request) {
err = proxy.rpcClient.CallContext(
ctx,
"",
"Cluster",
"IPFSResolve",
"IPFSConnector",
"Resolve",
pFrom.String(),
&fromCid,
)
Expand Down Expand Up @@ -591,8 +591,8 @@ func (proxy *Server) repoStatHandler(w http.ResponseWriter, r *http.Request) {
peers := make([]peer.ID, 0)
err := proxy.rpcClient.Call(
"",
"Cluster",
"ConsensusPeers",
"Consensus",
"Peers",
struct{}{},
&peers,
)
Expand All @@ -614,8 +614,8 @@ func (proxy *Server) repoStatHandler(w http.ResponseWriter, r *http.Request) {
errs := proxy.rpcClient.MultiCall(
ctxs,
peers,
"Cluster",
"IPFSRepoStat",
"IPFSConnector",
"RepoStat",
struct{}{},
repoStatsIfaces,
)
Expand Down
4 changes: 2 additions & 2 deletions api/rest/restapi.go
Original file line number Diff line number Diff line change
Expand Up @@ -574,8 +574,8 @@ func (api *API) metricsHandler(w http.ResponseWriter, r *http.Request) {
err := api.rpcClient.CallContext(
r.Context(),
"",
"Cluster",
"PeerMonitorLatestMetrics",
"PeerMonitor",
"LatestMetrics",
name,
&metrics,
)
Expand Down
42 changes: 20 additions & 22 deletions cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -157,14 +157,7 @@ func NewCluster(
}

func (c *Cluster) setupRPC() error {
var rpcServer *rpc.Server
if c.config.Tracing {
sh := &ocgorpc.ServerHandler{}
rpcServer = rpc.NewServer(c.host, version.RPCProtocol, rpc.WithServerStatsHandler(sh))
} else {
rpcServer = rpc.NewServer(c.host, version.RPCProtocol)
}
err := rpcServer.RegisterName("Cluster", &RPCAPI{c})
rpcServer, err := newRPCServer(c)
if err != nil {
return err
}
Expand All @@ -173,7 +166,12 @@ func (c *Cluster) setupRPC() error {
var rpcClient *rpc.Client
if c.config.Tracing {
csh := &ocgorpc.ClientHandler{}
rpcClient = rpc.NewClientWithServer(c.host, version.RPCProtocol, rpcServer, rpc.WithClientStatsHandler(csh))
rpcClient = rpc.NewClientWithServer(
c.host,
version.RPCProtocol,
rpcServer,
rpc.WithClientStatsHandler(csh),
)
} else {
rpcClient = rpc.NewClientWithServer(c.host, version.RPCProtocol, rpcServer)
}
Expand Down Expand Up @@ -648,8 +646,8 @@ func (c *Cluster) PeerAdd(ctx context.Context, pid peer.ID) (*api.ID, error) {
err = c.rpcClient.CallContext(
ctx,
pid,
"Cluster",
"PeerMonitorLogMetric",
"PeerMonitor",
"LogMetric",
m,
&struct{}{},
)
Expand All @@ -661,8 +659,8 @@ func (c *Cluster) PeerAdd(ctx context.Context, pid peer.ID) (*api.ID, error) {
err = c.rpcClient.CallContext(
ctx,
pid,
"Cluster",
"IPFSConnectSwarms",
"IPFSConnector",
"ConnectSwarms",
struct{}{},
&struct{}{},
)
Expand Down Expand Up @@ -856,7 +854,7 @@ func (c *Cluster) StatusAll(ctx context.Context) ([]*api.GlobalPinInfo, error) {
defer span.End()
ctx = trace.NewContext(c.ctx, span)

return c.globalPinInfoSlice(ctx, "TrackerStatusAll")
return c.globalPinInfoSlice(ctx, "PinTracker", "StatusAll")
}

// StatusAllLocal returns the PinInfo for all the tracked Cids in this peer.
Expand All @@ -876,7 +874,7 @@ func (c *Cluster) Status(ctx context.Context, h cid.Cid) (*api.GlobalPinInfo, er
defer span.End()
ctx = trace.NewContext(c.ctx, span)

return c.globalPinInfoCid(ctx, "TrackerStatus", h)
return c.globalPinInfoCid(ctx, "PinTracker", "Status", h)
}

// StatusLocal returns this peer's PinInfo for a given Cid.
Expand All @@ -897,7 +895,7 @@ func (c *Cluster) SyncAll(ctx context.Context) ([]*api.GlobalPinInfo, error) {
defer span.End()
ctx = trace.NewContext(c.ctx, span)

return c.globalPinInfoSlice(ctx, "SyncAllLocal")
return c.globalPinInfoSlice(ctx, "Cluster", "SyncAllLocal")
}

// SyncAllLocal makes sure that the current state for all tracked items
Expand Down Expand Up @@ -927,7 +925,7 @@ func (c *Cluster) Sync(ctx context.Context, h cid.Cid) (*api.GlobalPinInfo, erro
defer span.End()
ctx = trace.NewContext(c.ctx, span)

return c.globalPinInfoCid(ctx, "SyncLocal", h)
return c.globalPinInfoCid(ctx, "Cluster", "SyncLocal", h)
}

// used for RecoverLocal and SyncLocal.
Expand Down Expand Up @@ -985,7 +983,7 @@ func (c *Cluster) Recover(ctx context.Context, h cid.Cid) (*api.GlobalPinInfo, e
defer span.End()
ctx = trace.NewContext(c.ctx, span)

return c.globalPinInfoCid(ctx, "TrackerRecover", h)
return c.globalPinInfoCid(ctx, "PinTracker", "Recover", h)
}

// RecoverLocal triggers a recover operation for a given Cid in this peer only.
Expand Down Expand Up @@ -1356,7 +1354,7 @@ func (c *Cluster) Peers(ctx context.Context) []*api.ID {
return peers
}

func (c *Cluster) globalPinInfoCid(ctx context.Context, method string, h cid.Cid) (*api.GlobalPinInfo, error) {
func (c *Cluster) globalPinInfoCid(ctx context.Context, comp, method string, h cid.Cid) (*api.GlobalPinInfo, error) {
ctx, span := trace.StartSpan(ctx, "cluster/globalPinInfoCid")
defer span.End()

Expand All @@ -1379,7 +1377,7 @@ func (c *Cluster) globalPinInfoCid(ctx context.Context, method string, h cid.Cid
errs := c.rpcClient.MultiCall(
ctxs,
members,
"Cluster",
comp,
method,
h,
rpcutil.CopyPinInfoToIfaces(replies),
Expand Down Expand Up @@ -1409,7 +1407,7 @@ func (c *Cluster) globalPinInfoCid(ctx context.Context, method string, h cid.Cid
return pin, nil
}

func (c *Cluster) globalPinInfoSlice(ctx context.Context, method string) ([]*api.GlobalPinInfo, error) {
func (c *Cluster) globalPinInfoSlice(ctx context.Context, comp, method string) ([]*api.GlobalPinInfo, error) {
ctx, span := trace.StartSpan(ctx, "cluster/globalPinInfoSlice")
defer span.End()

Expand All @@ -1431,7 +1429,7 @@ func (c *Cluster) globalPinInfoSlice(ctx context.Context, method string) ([]*api
errs := c.rpcClient.MultiCall(
ctxs,
members,
"Cluster",
comp,
method,
struct{}{},
rpcutil.CopyPinInfoSliceToIfaces(replies),
Expand Down
4 changes: 2 additions & 2 deletions connect_graph.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,8 +97,8 @@ func (c *Cluster) recordIPFSLinks(cg *api.ConnectGraph, pID *api.ID) {
var swarmPeers []peer.ID
err := c.rpcClient.Call(
pID.ID,
"Cluster",
"IPFSSwarmPeers",
"IPFSConnector",
"SwarmPeers",
struct{}{},
&swarmPeers,
)
Expand Down
8 changes: 4 additions & 4 deletions consensus/crdt/consensus.go
Original file line number Diff line number Diff line change
Expand Up @@ -174,7 +174,7 @@ func (css *Consensus) setup() {
err = css.rpcClient.CallContext(
css.ctx,
"",
"Cluster",
"PinTracker",
"Track",
pin,
&struct{}{},
Expand All @@ -194,7 +194,7 @@ func (css *Consensus) setup() {
err = css.rpcClient.CallContext(
css.ctx,
"",
"Cluster",
"PinTracker",
"Untrack",
pin,
&struct{}{},
Expand Down Expand Up @@ -295,8 +295,8 @@ func (css *Consensus) Peers(ctx context.Context) ([]peer.ID, error) {
err := css.rpcClient.CallContext(
ctx,
"",
"Cluster",
"PeerMonitorLatestMetrics",
"PeerMonitor",
"LatestMetrics",
css.config.PeersetMetric,
&metrics,
)
Expand Down

0 comments on commit 8b19f81

Please sign in to comment.