Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feat/improve pin serial #601

Merged
merged 4 commits into from
Oct 30, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions api/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -838,6 +838,16 @@ func (pins PinSerial) Clone() PinSerial {
return new
}

// DecodeCid retrieves just the cid from a PinSerial without
// allocating a Pin.
func (pins PinSerial) DecodeCid() cid.Cid {
c, err := cid.Decode(pins.Cid)
if err != nil {
logger.Debug(pins.Cid, err)
}
return c
}

// NodeWithMeta specifies a block of data and a set of optional metadata fields
// carrying information about the encoded ipld node
type NodeWithMeta struct {
Expand Down
42 changes: 42 additions & 0 deletions api/types_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -257,3 +257,45 @@ func TestMetric(t *testing.T) {
t.Error("looks like a bad ttl")
}
}

func BenchmarkPinSerial_ToPin(b *testing.B) {
pin := Pin{
Cid: testCid1,
Type: ClusterDAGType,
Allocations: []peer.ID{testPeerID1},
Reference: testCid2,
MaxDepth: -1,
PinOptions: PinOptions{
ReplicationFactorMax: -1,
ReplicationFactorMin: -1,
Name: "A test pin",
},
}
pinS := pin.ToSerial()

b.ResetTimer()
for i := 0; i < b.N; i++ {
pinS.ToPin()
}
}

func BenchmarkPinSerial_DecodeCid(b *testing.B) {
pin := Pin{
Cid: testCid1,
Type: ClusterDAGType,
Allocations: []peer.ID{testPeerID1},
Reference: testCid2,
MaxDepth: -1,
PinOptions: PinOptions{
ReplicationFactorMax: -1,
ReplicationFactorMin: -1,
Name: "A test pin",
},
}
pinS := pin.ToSerial()

b.ResetTimer()
for i := 0; i < b.N; i++ {
pinS.DecodeCid()
}
}
18 changes: 10 additions & 8 deletions consensus/raft/log_op.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,33 +43,35 @@ func (op *LogOp) ApplyTo(cstate consensus.State) (consensus.State, error) {
// api.PinSerial, which don't get copied when passed.
pinS := op.Cid.Clone()

pin := pinS.ToPin()

switch op.Type {
case LogOpPin:
err = state.Add(pin)
err = state.Add(pinS.ToPin())
if err != nil {
goto ROLLBACK
}
// Async, we let the PinTracker take care of any problems
op.consensus.rpcClient.Go("",
op.consensus.rpcClient.Go(
"",
"Cluster",
"Track",
pinS,
&struct{}{},
nil)
nil,
)
case LogOpUnpin:
err = state.Rm(pin.Cid)
err = state.Rm(pinS.DecodeCid())
if err != nil {
goto ROLLBACK
}
// Async, we let the PinTracker take care of any problems
op.consensus.rpcClient.Go("",
op.consensus.rpcClient.Go(
"",
"Cluster",
"Untrack",
pinS,
&struct{}{},
nil)
nil,
)
default:
logger.Error("unknown LogOp type. Ignoring")
}
Expand Down
28 changes: 14 additions & 14 deletions rpc_api.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ func (rpcapi *RPCAPI) Pin(ctx context.Context, in api.PinSerial, out *struct{})

// Unpin runs Cluster.Unpin().
func (rpcapi *RPCAPI) Unpin(ctx context.Context, in api.PinSerial, out *struct{}) error {
c := in.ToPin().Cid
c := in.DecodeCid()
return rpcapi.c.Unpin(c)
}

Expand Down Expand Up @@ -124,15 +124,15 @@ func (rpcapi *RPCAPI) StatusAllLocal(ctx context.Context, in struct{}, out *[]ap

// Status runs Cluster.Status().
func (rpcapi *RPCAPI) Status(ctx context.Context, in api.PinSerial, out *api.GlobalPinInfoSerial) error {
c := in.ToPin().Cid
c := in.DecodeCid()
pinfo, err := rpcapi.c.Status(c)
*out = pinfo.ToSerial()
return err
}

// StatusLocal runs Cluster.StatusLocal().
func (rpcapi *RPCAPI) StatusLocal(ctx context.Context, in api.PinSerial, out *api.PinInfoSerial) error {
c := in.ToPin().Cid
c := in.DecodeCid()
pinfo := rpcapi.c.StatusLocal(c)
*out = pinfo.ToSerial()
return nil
Expand All @@ -154,15 +154,15 @@ func (rpcapi *RPCAPI) SyncAllLocal(ctx context.Context, in struct{}, out *[]api.

// Sync runs Cluster.Sync().
func (rpcapi *RPCAPI) Sync(ctx context.Context, in api.PinSerial, out *api.GlobalPinInfoSerial) error {
c := in.ToPin().Cid
c := in.DecodeCid()
pinfo, err := rpcapi.c.Sync(c)
*out = pinfo.ToSerial()
return err
}

// SyncLocal runs Cluster.SyncLocal().
func (rpcapi *RPCAPI) SyncLocal(ctx context.Context, in api.PinSerial, out *api.PinInfoSerial) error {
c := in.ToPin().Cid
c := in.DecodeCid()
pinfo, err := rpcapi.c.SyncLocal(c)
*out = pinfo.ToSerial()
return err
Expand All @@ -177,15 +177,15 @@ func (rpcapi *RPCAPI) RecoverAllLocal(ctx context.Context, in struct{}, out *[]a

// Recover runs Cluster.Recover().
func (rpcapi *RPCAPI) Recover(ctx context.Context, in api.PinSerial, out *api.GlobalPinInfoSerial) error {
c := in.ToPin().Cid
c := in.DecodeCid()
pinfo, err := rpcapi.c.Recover(c)
*out = pinfo.ToSerial()
return err
}

// RecoverLocal runs Cluster.RecoverLocal().
func (rpcapi *RPCAPI) RecoverLocal(ctx context.Context, in api.PinSerial, out *api.PinInfoSerial) error {
c := in.ToPin().Cid
c := in.DecodeCid()
pinfo, err := rpcapi.c.RecoverLocal(c)
*out = pinfo.ToSerial()
return err
Expand Down Expand Up @@ -248,7 +248,7 @@ func (rpcapi *RPCAPI) Track(ctx context.Context, in api.PinSerial, out *struct{}

// Untrack runs PinTracker.Untrack().
func (rpcapi *RPCAPI) Untrack(ctx context.Context, in api.PinSerial, out *struct{}) error {
c := in.ToPin().Cid
c := in.DecodeCid()
return rpcapi.c.tracker.Untrack(c)
}

Expand All @@ -260,7 +260,7 @@ func (rpcapi *RPCAPI) TrackerStatusAll(ctx context.Context, in struct{}, out *[]

// TrackerStatus runs PinTracker.Status().
func (rpcapi *RPCAPI) TrackerStatus(ctx context.Context, in api.PinSerial, out *api.PinInfoSerial) error {
c := in.ToPin().Cid
c := in.DecodeCid()
pinfo := rpcapi.c.tracker.Status(c)
*out = pinfo.ToSerial()
return nil
Expand All @@ -275,7 +275,7 @@ func (rpcapi *RPCAPI) TrackerRecoverAll(ctx context.Context, in struct{}, out *[

// TrackerRecover runs PinTracker.Recover().
func (rpcapi *RPCAPI) TrackerRecover(ctx context.Context, in api.PinSerial, out *api.PinInfoSerial) error {
c := in.ToPin().Cid
c := in.DecodeCid()
pinfo, err := rpcapi.c.tracker.Recover(c)
*out = pinfo.ToSerial()
return err
Expand All @@ -287,20 +287,20 @@ func (rpcapi *RPCAPI) TrackerRecover(ctx context.Context, in api.PinSerial, out

// IPFSPin runs IPFSConnector.Pin().
func (rpcapi *RPCAPI) IPFSPin(ctx context.Context, in api.PinSerial, out *struct{}) error {
c := in.ToPin().Cid
c := in.DecodeCid()
depth := in.ToPin().MaxDepth
return rpcapi.c.ipfs.Pin(ctx, c, depth)
}

// IPFSUnpin runs IPFSConnector.Unpin().
func (rpcapi *RPCAPI) IPFSUnpin(ctx context.Context, in api.PinSerial, out *struct{}) error {
c := in.ToPin().Cid
c := in.DecodeCid()
return rpcapi.c.ipfs.Unpin(ctx, c)
}

// IPFSPinLsCid runs IPFSConnector.PinLsCid().
func (rpcapi *RPCAPI) IPFSPinLsCid(ctx context.Context, in api.PinSerial, out *api.IPFSPinStatus) error {
c := in.ToPin().Cid
c := in.DecodeCid()
b, err := rpcapi.c.ipfs.PinLsCid(ctx, c)
*out = b
return err
Expand Down Expand Up @@ -347,7 +347,7 @@ func (rpcapi *RPCAPI) IPFSBlockPut(ctx context.Context, in api.NodeWithMeta, out

// IPFSBlockGet runs IPFSConnector.BlockGet().
func (rpcapi *RPCAPI) IPFSBlockGet(ctx context.Context, in api.PinSerial, out *[]byte) error {
c := in.ToPin().Cid
c := in.DecodeCid()
res, err := rpcapi.c.ipfs.BlockGet(c)
*out = res
return err
Expand Down