From d71c16683531167230c5f6c4ce60bbe2e9f2b093 Mon Sep 17 00:00:00 2001 From: Brian Tiger Chow Date: Wed, 10 Sep 2014 19:38:21 -0700 Subject: [PATCH 1/6] organize imports --- bitswap/bitswap.go | 10 +++++----- bitswap/ledger.go | 4 ++-- 2 files changed, 7 insertions(+), 7 deletions(-) diff --git a/bitswap/bitswap.go b/bitswap/bitswap.go index 22f97514cf8..0484065a6a6 100644 --- a/bitswap/bitswap.go +++ b/bitswap/bitswap.go @@ -1,17 +1,17 @@ package bitswap import ( - "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/goprotobuf/proto" + "time" + + proto "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/goprotobuf/proto" + ds "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/datastore.go" + blocks "github.com/jbenet/go-ipfs/blocks" peer "github.com/jbenet/go-ipfs/peer" routing "github.com/jbenet/go-ipfs/routing" dht "github.com/jbenet/go-ipfs/routing/dht" swarm "github.com/jbenet/go-ipfs/swarm" u "github.com/jbenet/go-ipfs/util" - - ds "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/datastore.go" - - "time" ) // PartnerWantListMax is the bound for the number of keys we'll store per diff --git a/bitswap/ledger.go b/bitswap/ledger.go index a0f23b8d4b5..2a197f05741 100644 --- a/bitswap/ledger.go +++ b/bitswap/ledger.go @@ -1,10 +1,10 @@ package bitswap import ( + "time" + peer "github.com/jbenet/go-ipfs/peer" u "github.com/jbenet/go-ipfs/util" - - "time" ) // Ledger stores the data exchange relationship between two peers. From a910d6878d5c466b733fff6f8c749248ddc6c891 Mon Sep 17 00:00:00 2001 From: Brian Tiger Chow Date: Wed, 10 Sep 2014 19:43:23 -0700 Subject: [PATCH 2/6] privatize getLedger method --- bitswap/bitswap.go | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/bitswap/bitswap.go b/bitswap/bitswap.go index 0484065a6a6..a808954625d 100644 --- a/bitswap/bitswap.go +++ b/bitswap/bitswap.go @@ -192,7 +192,7 @@ func (bs *BitSwap) handleMessages() { // and then if we do, check the ledger for whether or not we should send it. func (bs *BitSwap) peerWantsBlock(p *peer.Peer, want string) { u.DOut("peer [%s] wants block [%s]\n", p.ID.Pretty(), u.Key(want).Pretty()) - ledg := bs.GetLedger(p) + ledg := bs.getLedger(p) dsk := ds.NewKey(want) blk_i, err := bs.datastore.Get(dsk) @@ -239,11 +239,11 @@ func (bs *BitSwap) blockReceive(p *peer.Peer, blk *blocks.Block) { } bs.listener.Respond(string(blk.Key()), mes) - ledger := bs.GetLedger(p) + ledger := bs.getLedger(p) ledger.ReceivedBytes(len(blk.Data)) } -func (bs *BitSwap) GetLedger(p *peer.Peer) *Ledger { +func (bs *BitSwap) getLedger(p *peer.Peer) *Ledger { l, ok := bs.partners[p.Key()] if ok { return l From 182604c61f74104e713a7cab985ecc9021961641 Mon Sep 17 00:00:00 2001 From: Brian Tiger Chow Date: Wed, 10 Sep 2014 19:47:40 -0700 Subject: [PATCH 3/6] substitute with LedgerMap type --- bitswap/bitswap.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/bitswap/bitswap.go b/bitswap/bitswap.go index a808954625d..d118c36877b 100644 --- a/bitswap/bitswap.go +++ b/bitswap/bitswap.go @@ -44,7 +44,7 @@ type BitSwap struct { // The Ledger has the peer.ID, and the peer connection works through net. // Ledgers of known relationships (active or inactive) stored in datastore. // Changes to the Ledger should be committed to the datastore. - partners map[u.Key]*Ledger + partners LedgerMap // haveList is the set of keys we have values for. a map for fast lookups. // haveList KeySet -- not needed. all values in datastore? From 99db07c3a4fa77a367431333680fc671950105e1 Mon Sep 17 00:00:00 2001 From: Brian Tiger Chow Date: Wed, 10 Sep 2014 20:23:39 -0700 Subject: [PATCH 4/6] hide ledger wantlist manipulation --- bitswap/bitswap.go | 15 +++++++-------- bitswap/ledger.go | 10 ++++++++++ 2 files changed, 17 insertions(+), 8 deletions(-) diff --git a/bitswap/bitswap.go b/bitswap/bitswap.go index d118c36877b..a2012c3de84 100644 --- a/bitswap/bitswap.go +++ b/bitswap/bitswap.go @@ -137,7 +137,7 @@ func (bs *BitSwap) getBlock(k u.Key, p *peer.Peer, timeout time.Duration) ([]byt func (bs *BitSwap) HaveBlock(blk *blocks.Block) error { go func() { for _, ledger := range bs.partners { - if _, ok := ledger.WantList[blk.Key()]; ok { + if ledger.WantListContains(blk.Key()) { //send block to node if ledger.ShouldSend() { bs.SendBlock(ledger.Partner, blk) @@ -192,14 +192,13 @@ func (bs *BitSwap) handleMessages() { // and then if we do, check the ledger for whether or not we should send it. func (bs *BitSwap) peerWantsBlock(p *peer.Peer, want string) { u.DOut("peer [%s] wants block [%s]\n", p.ID.Pretty(), u.Key(want).Pretty()) - ledg := bs.getLedger(p) + ledger := bs.getLedger(p) dsk := ds.NewKey(want) blk_i, err := bs.datastore.Get(dsk) if err != nil { if err == ds.ErrNotFound { - // TODO: this needs to be different. We need timeouts. - ledg.WantList[u.Key(want)] = struct{}{} + ledger.Wants(u.Key(want)) } u.PErr("datastore get error: %v\n", err) return @@ -211,7 +210,7 @@ func (bs *BitSwap) peerWantsBlock(p *peer.Peer, want string) { return } - if ledg.ShouldSend() { + if ledger.ShouldSend() { u.DOut("Sending block to peer.\n") bblk, err := blocks.NewBlock(blk) if err != nil { @@ -219,7 +218,7 @@ func (bs *BitSwap) peerWantsBlock(p *peer.Peer, want string) { return } bs.SendBlock(p, bblk) - ledg.SentBytes(len(blk)) + ledger.SentBytes(len(blk)) } else { u.DOut("Decided not to send block.") } @@ -276,7 +275,7 @@ func (bs *BitSwap) Halt() { func (bs *BitSwap) SetStrategy(sf StrategyFunc) { bs.strategy = sf - for _, ledg := range bs.partners { - ledg.Strategy = sf + for _, ledger := range bs.partners { + ledger.Strategy = sf } } diff --git a/bitswap/ledger.go b/bitswap/ledger.go index 2a197f05741..2e99d2ec76e 100644 --- a/bitswap/ledger.go +++ b/bitswap/ledger.go @@ -49,3 +49,13 @@ func (l *Ledger) ReceivedBytes(n int) { l.LastExchange = time.Now() l.Accounting.BytesRecv += uint64(n) } + +// TODO: this needs to be different. We need timeouts. +func (l *Ledger) Wants(k u.Key) { + l.WantList[k] = struct{}{} +} + +func (l *Ledger) WantListContains(k u.Key) bool { + _, ok := l.WantList[k] + return ok +} From 7bae74287242b64b8d7ac61f299ca2d3387e4713 Mon Sep 17 00:00:00 2001 From: Brian Tiger Chow Date: Wed, 10 Sep 2014 20:30:08 -0700 Subject: [PATCH 5/6] privatize fields in ledger --- bitswap/ledger.go | 28 ++++++++++++++-------------- 1 file changed, 14 insertions(+), 14 deletions(-) diff --git a/bitswap/ledger.go b/bitswap/ledger.go index 2e99d2ec76e..b8b58e5a665 100644 --- a/bitswap/ledger.go +++ b/bitswap/ledger.go @@ -16,17 +16,17 @@ type Ledger struct { // Accounting tracks bytes sent and recieved. Accounting debtRatio - // FirstExchnage is the time of the first data exchange. - FirstExchange time.Time + // firstExchnage is the time of the first data exchange. + firstExchange time.Time - // LastExchange is the time of the last data exchange. - LastExchange time.Time + // lastExchange is the time of the last data exchange. + lastExchange time.Time - // Number of exchanges with this peer - ExchangeCount uint64 + // exchangeCount is the number of exchanges with this peer + exchangeCount uint64 - // WantList is a (bounded, small) set of keys that Partner desires. - WantList KeySet + // wantList is a (bounded, small) set of keys that Partner desires. + wantList KeySet Strategy StrategyFunc } @@ -39,23 +39,23 @@ func (l *Ledger) ShouldSend() bool { } func (l *Ledger) SentBytes(n int) { - l.ExchangeCount++ - l.LastExchange = time.Now() + l.exchangeCount++ + l.lastExchange = time.Now() l.Accounting.BytesSent += uint64(n) } func (l *Ledger) ReceivedBytes(n int) { - l.ExchangeCount++ - l.LastExchange = time.Now() + l.exchangeCount++ + l.lastExchange = time.Now() l.Accounting.BytesRecv += uint64(n) } // TODO: this needs to be different. We need timeouts. func (l *Ledger) Wants(k u.Key) { - l.WantList[k] = struct{}{} + l.wantList[k] = struct{}{} } func (l *Ledger) WantListContains(k u.Key) bool { - _, ok := l.WantList[k] + _, ok := l.wantList[k] return ok } From ad30333581e9050f70c64b81cff0047671949ce4 Mon Sep 17 00:00:00 2001 From: Brian Tiger Chow Date: Thu, 11 Sep 2014 01:02:52 -0700 Subject: [PATCH 6/6] fix(bitswap:ledger) race conditions https://github.com/jbenet/go-ipfs/issues/39 --- bitswap/ledger.go | 23 +++++++++++++++++++++++ bitswap/ledger_test.go | 23 +++++++++++++++++++++++ 2 files changed, 46 insertions(+) create mode 100644 bitswap/ledger_test.go diff --git a/bitswap/ledger.go b/bitswap/ledger.go index b8b58e5a665..6ddc0a71107 100644 --- a/bitswap/ledger.go +++ b/bitswap/ledger.go @@ -1,6 +1,7 @@ package bitswap import ( + "sync" "time" peer "github.com/jbenet/go-ipfs/peer" @@ -9,6 +10,7 @@ import ( // Ledger stores the data exchange relationship between two peers. type Ledger struct { + lock sync.RWMutex // Partner is the remote Peer. Partner *peer.Peer @@ -35,16 +37,25 @@ type Ledger struct { type LedgerMap map[u.Key]*Ledger func (l *Ledger) ShouldSend() bool { + l.lock.Lock() + defer l.lock.Unlock() + return l.Strategy(l) } func (l *Ledger) SentBytes(n int) { + l.lock.Lock() + defer l.lock.Unlock() + l.exchangeCount++ l.lastExchange = time.Now() l.Accounting.BytesSent += uint64(n) } func (l *Ledger) ReceivedBytes(n int) { + l.lock.Lock() + defer l.lock.Unlock() + l.exchangeCount++ l.lastExchange = time.Now() l.Accounting.BytesRecv += uint64(n) @@ -52,10 +63,22 @@ func (l *Ledger) ReceivedBytes(n int) { // TODO: this needs to be different. We need timeouts. func (l *Ledger) Wants(k u.Key) { + l.lock.Lock() + defer l.lock.Unlock() + l.wantList[k] = struct{}{} } func (l *Ledger) WantListContains(k u.Key) bool { + l.lock.RLock() + defer l.lock.RUnlock() + _, ok := l.wantList[k] return ok } + +func (l *Ledger) ExchangeCount() uint64 { + l.lock.RLock() + defer l.lock.RUnlock() + return l.exchangeCount +} diff --git a/bitswap/ledger_test.go b/bitswap/ledger_test.go new file mode 100644 index 00000000000..d651d485ff7 --- /dev/null +++ b/bitswap/ledger_test.go @@ -0,0 +1,23 @@ +package bitswap + +import ( + "sync" + "testing" +) + +func TestRaceConditions(t *testing.T) { + const numberOfExpectedExchanges = 10000 + l := new(Ledger) + var wg sync.WaitGroup + for i := 0; i < numberOfExpectedExchanges; i++ { + wg.Add(1) + go func() { + defer wg.Done() + l.ReceivedBytes(1) + }() + } + wg.Wait() + if l.ExchangeCount() != numberOfExpectedExchanges { + t.Fail() + } +}