Skip to content

Commit

Permalink
Removing CID Lists part one: unblocking technical obstacles (#292)
Browse files Browse the repository at this point in the history
* feat(channels): remove cid lists for sender

remove a synchronous disk based lookup of CID lists

* feat(channels): don't block on checking cid lists

* fix(channels): fix potential race in block index cache
  • Loading branch information
hannahhoward committed Jan 14, 2022
1 parent b442027 commit eddbf61
Show file tree
Hide file tree
Showing 16 changed files with 301 additions and 79 deletions.
8 changes: 8 additions & 0 deletions channelmonitor/channelmonitor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -625,3 +625,11 @@ func (m *mockChannelState) ReceivedCidsTotal() int64 {
func (m *mockChannelState) MissingCids() []cid.Cid {
panic("implement me")
}

func (m *mockChannelState) QueuedCidsTotal() int64 {
panic("implement me")
}

func (m *mockChannelState) SentCidsTotal() int64 {
panic("implement me")
}
63 changes: 63 additions & 0 deletions channels/block_index_cache.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
package channels

import (
"sync"
"sync/atomic"

datatransfer "github.com/filecoin-project/go-data-transfer"
)

type readOriginalFn func(datatransfer.ChannelID) (int64, error)

type blockIndexKey struct {
evt datatransfer.EventCode
chid datatransfer.ChannelID
}
type blockIndexCache struct {
lk sync.RWMutex
values map[blockIndexKey]*int64
}

func newBlockIndexCache() *blockIndexCache {
return &blockIndexCache{
values: make(map[blockIndexKey]*int64),
}
}

func (bic *blockIndexCache) getValue(evt datatransfer.EventCode, chid datatransfer.ChannelID, readFromOriginal readOriginalFn) (*int64, error) {
idxKey := blockIndexKey{evt, chid}
bic.lk.RLock()
value := bic.values[idxKey]
bic.lk.RUnlock()
if value != nil {
return value, nil
}
bic.lk.Lock()
defer bic.lk.Unlock()
value = bic.values[idxKey]
if value != nil {
return value, nil
}
newValue, err := readFromOriginal(chid)
if err != nil {
return nil, err
}
bic.values[idxKey] = &newValue
return &newValue, nil
}

func (bic *blockIndexCache) updateIfGreater(evt datatransfer.EventCode, chid datatransfer.ChannelID, newIndex int64, readFromOriginal readOriginalFn) (bool, error) {
value, err := bic.getValue(evt, chid, readFromOriginal)
if err != nil {
return false, err
}
for {
currentIndex := atomic.LoadInt64(value)
if newIndex <= currentIndex {
return false, nil
}
if atomic.CompareAndSwapInt64(value, currentIndex, newIndex) {
return true, nil
}
}
}
20 changes: 20 additions & 0 deletions channels/channel_state.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,12 @@ type channelState struct {
// number of blocks that have been received, including blocks that are
// present in more than one place in the DAG
receivedBlocksTotal int64
// Number of blocks that have been queued, including blocks that are
// present in more than one place in the DAG
queuedBlocksTotal int64
// Number of blocks that have been sent, including blocks that are
// present in more than one place in the DAG
sentBlocksTotal int64
// more informative status on a channel
message string
// additional vouchers
Expand Down Expand Up @@ -128,6 +134,18 @@ func (c channelState) ReceivedCidsTotal() int64 {
return c.receivedBlocksTotal
}

// QueuedCidsTotal returns the number of (non-unique) cids queued so far
// on the channel - note that a block can exist in more than one place in the DAG
func (c channelState) QueuedCidsTotal() int64 {
return c.queuedBlocksTotal
}

// SentCidsTotal returns the number of (non-unique) cids sent so far
// on the channel - note that a block can exist in more than one place in the DAG
func (c channelState) SentCidsTotal() int64 {
return c.sentBlocksTotal
}

// Sender returns the peer id for the node that is sending data
func (c channelState) Sender() peer.ID { return c.sender }

Expand Down Expand Up @@ -230,6 +248,8 @@ func fromInternalChannelState(c internal.ChannelState, voucherDecoder DecoderByT
sent: c.Sent,
received: c.Received,
receivedBlocksTotal: c.ReceivedBlocksTotal,
queuedBlocksTotal: c.QueuedBlocksTotal,
sentBlocksTotal: c.SentBlocksTotal,
message: c.Message,
vouchers: c.Vouchers,
voucherResults: c.VoucherResults,
Expand Down
70 changes: 40 additions & 30 deletions channels/channels.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ type Channels struct {
notifier Notifier
voucherDecoder DecoderByTypeFunc
voucherResultDecoder DecoderByTypeFunc
blockIndexCache *blockIndexCache
stateMachines fsm.Group
migrateStateMachines func(context.Context) error
seenCIDs *cidsets.CIDSetManager
Expand Down Expand Up @@ -85,6 +86,7 @@ func New(ds datastore.Batching,
voucherDecoder: voucherDecoder,
voucherResultDecoder: voucherResultDecoder,
}
c.blockIndexCache = newBlockIndexCache()
channelMigrations, err := migrations.GetChannelStateMigrations(selfPeer, cidLists)
if err != nil {
return nil, err
Expand Down Expand Up @@ -234,38 +236,48 @@ func (c *Channels) CompleteCleanupOnRestart(chid datatransfer.ChannelID) error {
return c.send(chid, datatransfer.CompleteCleanupOnRestart)
}

// Returns true if this is the first time the block has been sent
func (c *Channels) DataSent(chid datatransfer.ChannelID, k cid.Cid, delta uint64) (bool, error) {
return c.fireProgressEvent(chid, datatransfer.DataSent, datatransfer.DataSentProgress, k, delta)
}

// Returns true if this is the first time the block has been queued
func (c *Channels) DataQueued(chid datatransfer.ChannelID, k cid.Cid, delta uint64) (bool, error) {
return c.fireProgressEvent(chid, datatransfer.DataQueued, datatransfer.DataQueuedProgress, k, delta)
func (c *Channels) getQueuedIndex(chid datatransfer.ChannelID) (int64, error) {
chst, err := c.GetByID(context.TODO(), chid)
if err != nil {
return 0, err
}
return chst.QueuedCidsTotal(), nil
}

// Returns true if this is the first time the block has been received
func (c *Channels) DataReceived(chid datatransfer.ChannelID, k cid.Cid, delta uint64, index int64) (bool, error) {
if err := c.checkChannelExists(chid, datatransfer.DataReceived); err != nil {
return false, err
func (c *Channels) getReceivedIndex(chid datatransfer.ChannelID) (int64, error) {
chst, err := c.GetByID(context.TODO(), chid)
if err != nil {
return 0, err
}
return chst.ReceivedCidsTotal(), nil
}

// Check if the block has already been seen
sid := seenCidsSetID(chid, datatransfer.DataReceived)
seen, err := c.seenCIDs.InsertSetCID(sid, k)
func (c *Channels) getSentIndex(chid datatransfer.ChannelID) (int64, error) {
chst, err := c.GetByID(context.TODO(), chid)
if err != nil {
return false, err
return 0, err
}
return chst.SentCidsTotal(), nil
}

// If the block has not been seen before, fire the progress event
if !seen {
if err := c.stateMachines.Send(chid, datatransfer.DataReceivedProgress, delta); err != nil {
return false, err
}
}
func (c *Channels) DataSent(chid datatransfer.ChannelID, k cid.Cid, delta uint64, index int64, unique bool) (bool, error) {
return c.fireProgressEvent(chid, datatransfer.DataSent, datatransfer.DataSentProgress, k, delta, index, unique, c.getSentIndex)
}

// Fire the regular event
return !seen, c.stateMachines.Send(chid, datatransfer.DataReceived, index)
func (c *Channels) DataQueued(chid datatransfer.ChannelID, k cid.Cid, delta uint64, index int64, unique bool) (bool, error) {
return c.fireProgressEvent(chid, datatransfer.DataQueued, datatransfer.DataQueuedProgress, k, delta, index, unique, c.getQueuedIndex)
}

// Returns true if this is the first time the block has been received
func (c *Channels) DataReceived(chid datatransfer.ChannelID, k cid.Cid, delta uint64, index int64, unique bool) (bool, error) {
new, err := c.fireProgressEvent(chid, datatransfer.DataReceived, datatransfer.DataReceivedProgress, k, delta, index, unique, c.getReceivedIndex)
// TODO: remove when ReceivedCids and legacy protocol is removed
// write the seen received cids, but write async in order to avoid blocking processing
if err == nil {
sid := seenCidsSetID(chid, datatransfer.DataReceived)
_, _ = c.seenCIDs.InsertSetCID(sid, k)
}
return new, err
}

// PauseInitiator pauses the initator of this channel
Expand Down Expand Up @@ -409,27 +421,25 @@ func (c *Channels) removeSeenCIDCaches(chid datatransfer.ChannelID) error {
// fire both DataSent AND DataSentProgress.
// If a block is resent, the method will fire DataSent but not DataSentProgress.
// Returns true if the block is new (both the event and a progress event were fired).
func (c *Channels) fireProgressEvent(chid datatransfer.ChannelID, evt datatransfer.EventCode, progressEvt datatransfer.EventCode, k cid.Cid, delta uint64) (bool, error) {
func (c *Channels) fireProgressEvent(chid datatransfer.ChannelID, evt datatransfer.EventCode, progressEvt datatransfer.EventCode, k cid.Cid, delta uint64, index int64, unique bool, readFromOriginal readOriginalFn) (bool, error) {
if err := c.checkChannelExists(chid, evt); err != nil {
return false, err
}

// Check if the block has already been seen
sid := seenCidsSetID(chid, evt)
seen, err := c.seenCIDs.InsertSetCID(sid, k)
isNewIndex, err := c.blockIndexCache.updateIfGreater(evt, chid, index, readFromOriginal)
if err != nil {
return false, err
}

// If the block has not been seen before, fire the progress event
if !seen {
if unique && isNewIndex {
if err := c.stateMachines.Send(chid, progressEvt, delta); err != nil {
return false, err
}
}

// Fire the regular event
return !seen, c.stateMachines.Send(chid, evt)
return unique && isNewIndex, c.stateMachines.Send(chid, evt, index)
}

func (c *Channels) send(chid datatransfer.ChannelID, code datatransfer.EventCode, args ...interface{}) error {
Expand Down
10 changes: 8 additions & 2 deletions channels/channels_fsm.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,10 @@ var ChannelEvents = fsm.Events{
fsm.Event(datatransfer.DataSent).
FromMany(transferringStates...).ToNoChange().
From(datatransfer.TransferFinished).ToNoChange().
Action(func(chst *internal.ChannelState) error {
Action(func(chst *internal.ChannelState, sentBlocksTotal int64) error {
if sentBlocksTotal > chst.SentBlocksTotal {
chst.SentBlocksTotal = sentBlocksTotal
}
chst.AddLog("")
return nil
}),
Expand All @@ -94,7 +97,10 @@ var ChannelEvents = fsm.Events{
fsm.Event(datatransfer.DataQueued).
FromMany(transferringStates...).ToNoChange().
From(datatransfer.TransferFinished).ToNoChange().
Action(func(chst *internal.ChannelState) error {
Action(func(chst *internal.ChannelState, queuedBlocksTotal int64) error {
if queuedBlocksTotal > chst.QueuedBlocksTotal {
chst.QueuedBlocksTotal = queuedBlocksTotal
}
chst.AddLog("")
return nil
}),
Expand Down
27 changes: 18 additions & 9 deletions channels/channels_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -158,13 +158,13 @@ func TestChannels(t *testing.T) {
require.Equal(t, datatransfer.TransferFinished, state.Status())

// send a data-sent event and ensure it's a no-op
_, err = channelList.DataSent(chid, cids[1], 1)
_, err = channelList.DataSent(chid, cids[1], 1, 1, true)
require.NoError(t, err)
state = checkEvent(ctx, t, received, datatransfer.DataSent)
require.Equal(t, datatransfer.TransferFinished, state.Status())

// send a data-queued event and ensure it's a no-op.
_, err = channelList.DataQueued(chid, cids[1], 1)
_, err = channelList.DataQueued(chid, cids[1], 1, 1, true)
require.NoError(t, err)
state = checkEvent(ctx, t, received, datatransfer.DataQueued)
require.Equal(t, datatransfer.TransferFinished, state.Status())
Expand All @@ -188,7 +188,7 @@ func TestChannels(t *testing.T) {
require.Equal(t, uint64(0), state.Sent())
require.Empty(t, state.ReceivedCids())

isNew, err := channelList.DataReceived(datatransfer.ChannelID{Initiator: peers[0], Responder: peers[1], ID: tid1}, cids[0], 50, 1)
isNew, err := channelList.DataReceived(datatransfer.ChannelID{Initiator: peers[0], Responder: peers[1], ID: tid1}, cids[0], 50, 1, true)
require.NoError(t, err)
_ = checkEvent(ctx, t, received, datatransfer.DataReceivedProgress)
require.True(t, isNew)
Expand All @@ -197,7 +197,7 @@ func TestChannels(t *testing.T) {
require.Equal(t, uint64(0), state.Sent())
require.Equal(t, []cid.Cid{cids[0]}, state.ReceivedCids())

isNew, err = channelList.DataSent(datatransfer.ChannelID{Initiator: peers[0], Responder: peers[1], ID: tid1}, cids[1], 100)
isNew, err = channelList.DataSent(datatransfer.ChannelID{Initiator: peers[0], Responder: peers[1], ID: tid1}, cids[1], 100, 1, true)
require.NoError(t, err)
_ = checkEvent(ctx, t, received, datatransfer.DataSentProgress)
require.True(t, isNew)
Expand All @@ -206,16 +206,24 @@ func TestChannels(t *testing.T) {
require.Equal(t, uint64(100), state.Sent())
require.Equal(t, []cid.Cid{cids[0]}, state.ReceivedCids())

// send block again has no effect
isNew, err = channelList.DataSent(datatransfer.ChannelID{Initiator: peers[0], Responder: peers[1], ID: tid1}, cids[1], 100, 1, true)
require.NoError(t, err)
require.False(t, isNew)
state = checkEvent(ctx, t, received, datatransfer.DataSent)
require.Equal(t, uint64(50), state.Received())
require.Equal(t, uint64(100), state.Sent())

// errors if channel does not exist
isNew, err = channelList.DataReceived(datatransfer.ChannelID{Initiator: peers[1], Responder: peers[0], ID: tid1}, cids[1], 200, 2)
isNew, err = channelList.DataReceived(datatransfer.ChannelID{Initiator: peers[1], Responder: peers[0], ID: tid1}, cids[1], 200, 2, true)
require.True(t, xerrors.As(err, new(*channels.ErrNotFound)))
require.False(t, isNew)
isNew, err = channelList.DataSent(datatransfer.ChannelID{Initiator: peers[1], Responder: peers[0], ID: tid1}, cids[1], 200)
isNew, err = channelList.DataSent(datatransfer.ChannelID{Initiator: peers[1], Responder: peers[0], ID: tid1}, cids[1], 200, 2, true)
require.True(t, xerrors.As(err, new(*channels.ErrNotFound)))
require.Equal(t, []cid.Cid{cids[0]}, state.ReceivedCids())
require.False(t, isNew)

isNew, err = channelList.DataReceived(datatransfer.ChannelID{Initiator: peers[0], Responder: peers[1], ID: tid1}, cids[1], 50, 2)
isNew, err = channelList.DataReceived(datatransfer.ChannelID{Initiator: peers[0], Responder: peers[1], ID: tid1}, cids[1], 50, 2, true)
require.NoError(t, err)
_ = checkEvent(ctx, t, received, datatransfer.DataReceivedProgress)
require.True(t, isNew)
Expand All @@ -224,20 +232,21 @@ func TestChannels(t *testing.T) {
require.Equal(t, uint64(100), state.Sent())
require.ElementsMatch(t, []cid.Cid{cids[0], cids[1]}, state.ReceivedCids())

isNew, err = channelList.DataSent(datatransfer.ChannelID{Initiator: peers[0], Responder: peers[1], ID: tid1}, cids[1], 25)
isNew, err = channelList.DataSent(datatransfer.ChannelID{Initiator: peers[0], Responder: peers[1], ID: tid1}, cids[1], 25, 2, false)
require.NoError(t, err)
require.False(t, isNew)
state = checkEvent(ctx, t, received, datatransfer.DataSent)
require.Equal(t, uint64(100), state.Received())
require.Equal(t, uint64(100), state.Sent())
require.ElementsMatch(t, []cid.Cid{cids[0], cids[1]}, state.ReceivedCids())

isNew, err = channelList.DataReceived(datatransfer.ChannelID{Initiator: peers[0], Responder: peers[1], ID: tid1}, cids[0], 50, 3)
isNew, err = channelList.DataReceived(datatransfer.ChannelID{Initiator: peers[0], Responder: peers[1], ID: tid1}, cids[0], 50, 3, false)
require.NoError(t, err)
require.False(t, isNew)
state = checkEvent(ctx, t, received, datatransfer.DataReceived)
require.Equal(t, uint64(100), state.Received())
require.Equal(t, uint64(100), state.Sent())

require.ElementsMatch(t, []cid.Cid{cids[0], cids[1]}, state.ReceivedCids())
})

Expand Down
7 changes: 6 additions & 1 deletion channels/internal/internalchannel.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,12 @@ type ChannelState struct {
// Number of blocks that have been received, including blocks that are
// present in more than one place in the DAG
ReceivedBlocksTotal int64

// Number of blocks that have been queued, including blocks that are
// present in more than one place in the DAG
QueuedBlocksTotal int64
// Number of blocks that have been sent, including blocks that are
// present in more than one place in the DAG
SentBlocksTotal int64
// Stages traces the execution fo a data transfer.
//
// EXPERIMENTAL; subject to change.
Expand Down
Loading

0 comments on commit eddbf61

Please sign in to comment.