Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore(bitswap) tidy up bitswap, ledger #38

Merged
merged 6 commits into from Sep 11, 2014
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
31 changes: 15 additions & 16 deletions bitswap/bitswap.go
@@ -1,17 +1,17 @@
package bitswap

import (
"github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/goprotobuf/proto"
"time"

proto "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/goprotobuf/proto"
ds "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/datastore.go"

blocks "github.com/jbenet/go-ipfs/blocks"
peer "github.com/jbenet/go-ipfs/peer"
routing "github.com/jbenet/go-ipfs/routing"
dht "github.com/jbenet/go-ipfs/routing/dht"
swarm "github.com/jbenet/go-ipfs/swarm"
u "github.com/jbenet/go-ipfs/util"

ds "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/datastore.go"

"time"
)

// PartnerWantListMax is the bound for the number of keys we'll store per
Expand Down Expand Up @@ -44,7 +44,7 @@ type BitSwap struct {
// The Ledger has the peer.ID, and the peer connection works through net.
// Ledgers of known relationships (active or inactive) stored in datastore.
// Changes to the Ledger should be committed to the datastore.
partners map[u.Key]*Ledger
partners LedgerMap

// haveList is the set of keys we have values for. a map for fast lookups.
// haveList KeySet -- not needed. all values in datastore?
Expand Down Expand Up @@ -137,7 +137,7 @@ func (bs *BitSwap) getBlock(k u.Key, p *peer.Peer, timeout time.Duration) ([]byt
func (bs *BitSwap) HaveBlock(blk *blocks.Block) error {
go func() {
for _, ledger := range bs.partners {
if _, ok := ledger.WantList[blk.Key()]; ok {
if ledger.WantListContains(blk.Key()) {
//send block to node
if ledger.ShouldSend() {
bs.SendBlock(ledger.Partner, blk)
Expand Down Expand Up @@ -192,14 +192,13 @@ func (bs *BitSwap) handleMessages() {
// and then if we do, check the ledger for whether or not we should send it.
func (bs *BitSwap) peerWantsBlock(p *peer.Peer, want string) {
u.DOut("peer [%s] wants block [%s]\n", p.ID.Pretty(), u.Key(want).Pretty())
ledg := bs.GetLedger(p)
ledger := bs.getLedger(p)

dsk := ds.NewKey(want)
blk_i, err := bs.datastore.Get(dsk)
if err != nil {
if err == ds.ErrNotFound {
// TODO: this needs to be different. We need timeouts.
ledg.WantList[u.Key(want)] = struct{}{}
ledger.Wants(u.Key(want))
}
u.PErr("datastore get error: %v\n", err)
return
Expand All @@ -211,15 +210,15 @@ func (bs *BitSwap) peerWantsBlock(p *peer.Peer, want string) {
return
}

if ledg.ShouldSend() {
if ledger.ShouldSend() {
u.DOut("Sending block to peer.\n")
bblk, err := blocks.NewBlock(blk)
if err != nil {
u.PErr("newBlock error: %v\n", err)
return
}
bs.SendBlock(p, bblk)
ledg.SentBytes(len(blk))
ledger.SentBytes(len(blk))
} else {
u.DOut("Decided not to send block.")
}
Expand All @@ -239,11 +238,11 @@ func (bs *BitSwap) blockReceive(p *peer.Peer, blk *blocks.Block) {
}
bs.listener.Respond(string(blk.Key()), mes)

ledger := bs.GetLedger(p)
ledger := bs.getLedger(p)
ledger.ReceivedBytes(len(blk.Data))
}

func (bs *BitSwap) GetLedger(p *peer.Peer) *Ledger {
func (bs *BitSwap) getLedger(p *peer.Peer) *Ledger {
l, ok := bs.partners[p.Key()]
if ok {
return l
Expand Down Expand Up @@ -276,7 +275,7 @@ func (bs *BitSwap) Halt() {

func (bs *BitSwap) SetStrategy(sf StrategyFunc) {
bs.strategy = sf
for _, ledg := range bs.partners {
ledg.Strategy = sf
for _, ledger := range bs.partners {
ledger.Strategy = sf
}
}
61 changes: 47 additions & 14 deletions bitswap/ledger.go
@@ -1,32 +1,34 @@
package bitswap

import (
"sync"
"time"

peer "github.com/jbenet/go-ipfs/peer"
u "github.com/jbenet/go-ipfs/util"

"time"
)

// Ledger stores the data exchange relationship between two peers.
type Ledger struct {
lock sync.RWMutex

// Partner is the remote Peer.
Partner *peer.Peer

// Accounting tracks bytes sent and recieved.
Accounting debtRatio

// FirstExchnage is the time of the first data exchange.
FirstExchange time.Time
// firstExchnage is the time of the first data exchange.
firstExchange time.Time

// LastExchange is the time of the last data exchange.
LastExchange time.Time
// lastExchange is the time of the last data exchange.
lastExchange time.Time

// Number of exchanges with this peer
ExchangeCount uint64
// exchangeCount is the number of exchanges with this peer
exchangeCount uint64

// WantList is a (bounded, small) set of keys that Partner desires.
WantList KeySet
// wantList is a (bounded, small) set of keys that Partner desires.
wantList KeySet

Strategy StrategyFunc
}
Expand All @@ -35,17 +37,48 @@ type Ledger struct {
type LedgerMap map[u.Key]*Ledger

func (l *Ledger) ShouldSend() bool {
l.lock.Lock()
defer l.lock.Unlock()

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Now that i think about it, this needs to be protected by some locks. its not threadsafe.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

good catch. will open an issue and fix in an upcoming PR

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed in this PR. Last amendment I promise. =)

return l.Strategy(l)
}

func (l *Ledger) SentBytes(n int) {
l.ExchangeCount++
l.LastExchange = time.Now()
l.lock.Lock()
defer l.lock.Unlock()

l.exchangeCount++
l.lastExchange = time.Now()
l.Accounting.BytesSent += uint64(n)
}

func (l *Ledger) ReceivedBytes(n int) {
l.ExchangeCount++
l.LastExchange = time.Now()
l.lock.Lock()
defer l.lock.Unlock()

l.exchangeCount++
l.lastExchange = time.Now()
l.Accounting.BytesRecv += uint64(n)
}

// TODO: this needs to be different. We need timeouts.
func (l *Ledger) Wants(k u.Key) {
l.lock.Lock()
defer l.lock.Unlock()

l.wantList[k] = struct{}{}
}

func (l *Ledger) WantListContains(k u.Key) bool {
l.lock.RLock()
defer l.lock.RUnlock()

_, ok := l.wantList[k]
return ok
}

func (l *Ledger) ExchangeCount() uint64 {
l.lock.RLock()
defer l.lock.RUnlock()
return l.exchangeCount
}
23 changes: 23 additions & 0 deletions bitswap/ledger_test.go
@@ -0,0 +1,23 @@
package bitswap

import (
"sync"
"testing"
)

func TestRaceConditions(t *testing.T) {
const numberOfExpectedExchanges = 10000
l := new(Ledger)
var wg sync.WaitGroup
for i := 0; i < numberOfExpectedExchanges; i++ {
wg.Add(1)
go func() {
defer wg.Done()
l.ReceivedBytes(1)
}()
}
wg.Wait()
if l.ExchangeCount() != numberOfExpectedExchanges {
t.Fail()
}
}