Skip to content

Commit

Permalink
Removing CID Lists Part 2: Deprecating protocols (#293)
Browse files Browse the repository at this point in the history
* refactor(cidsets): remove cidsets

* refactor(network): remove deprecated protocols

* refactor(cidlists): delete cidlists
  • Loading branch information
hannahhoward committed Jan 14, 2022
1 parent eddbf61 commit a12f639
Show file tree
Hide file tree
Showing 38 changed files with 79 additions and 3,688 deletions.
5 changes: 2 additions & 3 deletions benchmarks/testinstance/testinstance.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package testinstance

import (
"context"
"os"
"time"

"github.com/ipfs/go-datastore"
Expand Down Expand Up @@ -165,8 +164,8 @@ func NewInstance(ctx context.Context, net tn.Network, tempDir string, diskBasedD

linkSystem := storeutil.LinkSystemForBlockstore(bstore)
gs := gsimpl.New(ctx, gsNet, linkSystem, gsimpl.RejectAllRequestsByDefault())
transport := gstransport.NewTransport(p, gs, dtNet)
dt, err := dtimpl.NewDataTransfer(namespace.Wrap(dstore, datastore.NewKey("/data-transfers/transfers")), os.TempDir(), dtNet, transport)
transport := gstransport.NewTransport(p, gs)
dt, err := dtimpl.NewDataTransfer(namespace.Wrap(dstore, datastore.NewKey("/data-transfers/transfers")), dtNet, transport)
if err != nil {
return Instance{}, err
}
Expand Down
8 changes: 0 additions & 8 deletions channelmonitor/channelmonitor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -610,14 +610,6 @@ func (m *mockChannelState) Stages() *datatransfer.ChannelStages {
panic("implement me")
}

func (m *mockChannelState) ReceivedCids() []cid.Cid {
panic("implement me")
}

func (m *mockChannelState) ReceivedCidsLen() int {
panic("implement me")
}

func (m *mockChannelState) ReceivedCidsTotal() int64 {
panic("implement me")
}
Expand Down
22 changes: 1 addition & 21 deletions channels/channel_state.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,6 @@ type channelState struct {
voucherResults []internal.EncodedVoucherResult
voucherResultDecoder DecoderByTypeFunc
voucherDecoder DecoderByTypeFunc
receivedCids ReceivedCidsReader

// stages tracks the timeline of events related to a data transfer, for
// traceability purposes.
Expand Down Expand Up @@ -110,24 +109,6 @@ func (c channelState) Voucher() datatransfer.Voucher {
return encodable.(datatransfer.Voucher)
}

// ReceivedCids returns the cids received so far on this channel
func (c channelState) ReceivedCids() []cid.Cid {
receivedCids, err := c.receivedCids.ToArray(c.ChannelID())
if err != nil {
log.Error(err)
}
return receivedCids
}

// ReceivedCids returns the number of unique cids received so far on this channel
func (c channelState) ReceivedCidsLen() int {
len, err := c.receivedCids.Len(c.ChannelID())
if err != nil {
log.Error(err)
}
return len
}

// ReceivedCidsTotal returns the number of (non-unique) cids received so far
// on the channel - note that a block can exist in more than one place in the DAG
func (c channelState) ReceivedCidsTotal() int64 {
Expand Down Expand Up @@ -233,7 +214,7 @@ func (c channelState) Stages() *datatransfer.ChannelStages {
return c.stages
}

func fromInternalChannelState(c internal.ChannelState, voucherDecoder DecoderByTypeFunc, voucherResultDecoder DecoderByTypeFunc, receivedCidsReader ReceivedCidsReader) datatransfer.ChannelState {
func fromInternalChannelState(c internal.ChannelState, voucherDecoder DecoderByTypeFunc, voucherResultDecoder DecoderByTypeFunc) datatransfer.ChannelState {
return channelState{
selfPeer: c.SelfPeer,
isPull: c.Initiator == c.Recipient,
Expand All @@ -255,7 +236,6 @@ func fromInternalChannelState(c internal.ChannelState, voucherDecoder DecoderByT
voucherResults: c.VoucherResults,
voucherResultDecoder: voucherResultDecoder,
voucherDecoder: voucherDecoder,
receivedCids: receivedCidsReader,
stages: c.Stages,
missingCids: c.MissingCids,
}
Expand Down
84 changes: 2 additions & 82 deletions channels/channels.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ import (

"github.com/ipfs/go-cid"
"github.com/ipfs/go-datastore"
"github.com/ipfs/go-datastore/namespace"
"github.com/ipld/go-ipld-prime"
peer "github.com/libp2p/go-libp2p-core/peer"
cbg "github.com/whyrusleeping/cbor-gen"
Expand All @@ -21,18 +20,11 @@ import (
datatransfer "github.com/filecoin-project/go-data-transfer"
"github.com/filecoin-project/go-data-transfer/channels/internal"
"github.com/filecoin-project/go-data-transfer/channels/internal/migrations"
"github.com/filecoin-project/go-data-transfer/cidlists"
"github.com/filecoin-project/go-data-transfer/cidsets"
"github.com/filecoin-project/go-data-transfer/encoding"
)

type DecoderByTypeFunc func(identifier datatransfer.TypeIdentifier) (encoding.Decoder, bool)

type ReceivedCidsReader interface {
ToArray(chid datatransfer.ChannelID) ([]cid.Cid, error)
Len(chid datatransfer.ChannelID) (int, error)
}

type Notifier func(datatransfer.Event, datatransfer.ChannelState)

// ErrNotFound is returned when a channel cannot be found with a given channel ID
Expand All @@ -59,7 +51,6 @@ type Channels struct {
blockIndexCache *blockIndexCache
stateMachines fsm.Group
migrateStateMachines func(context.Context) error
seenCIDs *cidsets.CIDSetManager
}

// ChannelEnvironment -- just a proxy for DTNetwork for now
Expand All @@ -72,22 +63,19 @@ type ChannelEnvironment interface {

// New returns a new thread safe list of channels
func New(ds datastore.Batching,
cidLists cidlists.CIDLists,
notifier Notifier,
voucherDecoder DecoderByTypeFunc,
voucherResultDecoder DecoderByTypeFunc,
env ChannelEnvironment,
selfPeer peer.ID) (*Channels, error) {

seenCIDsDS := namespace.Wrap(ds, datastore.NewKey("seencids"))
c := &Channels{
seenCIDs: cidsets.NewCIDSetManager(seenCIDsDS),
notifier: notifier,
voucherDecoder: voucherDecoder,
voucherResultDecoder: voucherResultDecoder,
}
c.blockIndexCache = newBlockIndexCache()
channelMigrations, err := migrations.GetChannelStateMigrations(selfPeer, cidLists)
channelMigrations, err := migrations.GetChannelStateMigrations(selfPeer)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -127,19 +115,6 @@ func (c *Channels) dispatch(eventName fsm.EventName, channel fsm.StateType) {
}
log.Debugw("process data transfer listeners", "name", datatransfer.Events[evtCode], "transfer ID", realChannel.TransferID)
c.notifier(evt, c.fromInternalChannelState(realChannel))

// When the channel has been cleaned up, remove the caches of seen cids
if evt.Code == datatransfer.CleanupComplete {
chid := datatransfer.ChannelID{
Initiator: realChannel.Initiator,
Responder: realChannel.Responder,
ID: realChannel.TransferID,
}
err := c.removeSeenCIDCaches(chid)
if err != nil {
log.Errorf("failed to clean up channel %s: %s", err)
}
}
}

// CreateNew creates a new channel id and channel state and saves to channels.
Expand Down Expand Up @@ -271,12 +246,6 @@ func (c *Channels) DataQueued(chid datatransfer.ChannelID, k cid.Cid, delta uint
// Returns true if this is the first time the block has been received
func (c *Channels) DataReceived(chid datatransfer.ChannelID, k cid.Cid, delta uint64, index int64, unique bool) (bool, error) {
new, err := c.fireProgressEvent(chid, datatransfer.DataReceived, datatransfer.DataReceivedProgress, k, delta, index, unique, c.getReceivedIndex)
// TODO: remove when ReceivedCids and legacy protocol is removed
// write the seen received cids, but write async in order to avoid blocking processing
if err == nil {
sid := seenCidsSetID(chid, datatransfer.DataReceived)
_, _ = c.seenCIDs.InsertSetCID(sid, k)
}
return new, err
}

Expand Down Expand Up @@ -395,25 +364,6 @@ func (c *Channels) HasChannel(chid datatransfer.ChannelID) (bool, error) {
return c.stateMachines.Has(chid)
}

// removeSeenCIDCaches cleans up the caches of "seen" blocks, ie
// blocks that have already been queued / sent / received
func (c *Channels) removeSeenCIDCaches(chid datatransfer.ChannelID) error {
// Clean up seen block caches
progressStates := []datatransfer.EventCode{
datatransfer.DataQueued,
datatransfer.DataSent,
datatransfer.DataReceived,
}
for _, evt := range progressStates {
sid := seenCidsSetID(chid, evt)
err := c.seenCIDs.DeleteSet(sid)
if err != nil {
return err
}
}
return nil
}

// fireProgressEvent fires
// - an event for queuing / sending / receiving blocks
// - a corresponding "progress" event if the block has not been seen before
Expand Down Expand Up @@ -463,37 +413,7 @@ func (c *Channels) checkChannelExists(chid datatransfer.ChannelID, code datatran
return nil
}

// Get the ID of the CID set for the given channel ID and event code.
// The CID set stores a unique list of queued / sent / received CIDs.
func seenCidsSetID(chid datatransfer.ChannelID, evt datatransfer.EventCode) cidsets.SetID {
return cidsets.SetID(chid.String() + "/" + datatransfer.Events[evt])
}

// Convert from the internally used channel state format to the externally exposed ChannelState
func (c *Channels) fromInternalChannelState(ch internal.ChannelState) datatransfer.ChannelState {
rcr := &receivedCidsReader{seenCIDs: c.seenCIDs}
return fromInternalChannelState(ch, c.voucherDecoder, c.voucherResultDecoder, rcr)
}

// Implements the ReceivedCidsReader interface so that the internal channel
// state has access to the received CIDs.
// The interface is used (instead of passing these values directly)
// so the values can be loaded lazily. Reading all CIDs from the datastore
// is an expensive operation so we want to avoid doing it unless necessary.
// Note that the received CIDs get cleaned up when the channel completes, so
// these methods will return an empty array after that point.
type receivedCidsReader struct {
seenCIDs *cidsets.CIDSetManager
return fromInternalChannelState(ch, c.voucherDecoder, c.voucherResultDecoder)
}

func (r *receivedCidsReader) ToArray(chid datatransfer.ChannelID) ([]cid.Cid, error) {
sid := seenCidsSetID(chid, datatransfer.DataReceived)
return r.seenCIDs.SetToArray(sid)
}

func (r *receivedCidsReader) Len(chid datatransfer.ChannelID) (int, error) {
sid := seenCidsSetID(chid, datatransfer.DataReceived)
return r.seenCIDs.SetLen(sid)
}

var _ ReceivedCidsReader = (*receivedCidsReader)(nil)

0 comments on commit a12f639

Please sign in to comment.