Skip to content

Commit

Permalink
Merge pull request #711 from jbenet/fix-hanging-notifs
Browse files Browse the repository at this point in the history
misc fixes: hanging connects + test output
  • Loading branch information
jbenet committed Feb 1, 2015
2 parents 282be4f + 3f53747 commit d0f6043
Show file tree
Hide file tree
Showing 5 changed files with 135 additions and 23 deletions.
20 changes: 9 additions & 11 deletions cmd/ipfs/daemon.go
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
package main

import (
"bytes"
"fmt"
"os"

ma "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-multiaddr"
cmds "github.com/jbenet/go-ipfs/commands"
Expand Down Expand Up @@ -51,9 +51,8 @@ the daemon.
}

func daemonFunc(req cmds.Request, res cmds.Response) {
var out bytes.Buffer
res.SetOutput(&out)
writef(&out, "Initializing daemon...\n")
// let the user know we're going.
fmt.Printf("Initializing daemon...\n")

// first, whether user has provided the initialization flag. we may be
// running in an uninitialized state.
Expand All @@ -70,7 +69,7 @@ func daemonFunc(req cmds.Request, res cmds.Response) {
// `IsInitialized` where the quality of the signal can be improved over
// time, and many call-sites can benefit.
if !util.FileExists(req.Context().ConfigRoot) {
err := initWithDefaults(&out, req.Context().ConfigRoot)
err := initWithDefaults(os.Stdout, req.Context().ConfigRoot)
if err != nil {
res.SetError(debugerror.Wrap(err), cmds.ErrNormal)
return
Expand Down Expand Up @@ -155,8 +154,8 @@ func daemonFunc(req cmds.Request, res cmds.Response) {
res.SetError(err, cmds.ErrNormal)
return
}
writef(&out, "IPFS mounted at: %s\n", fsdir)
writef(&out, "IPNS mounted at: %s\n", nsdir)
fmt.Printf("IPFS mounted at: %s\n", fsdir)
fmt.Printf("IPNS mounted at: %s\n", nsdir)
}

var rootRedirect corehttp.ServeOption
Expand All @@ -173,17 +172,16 @@ func daemonFunc(req cmds.Request, res cmds.Response) {
writable = cfg.Gateway.Writable
}

if writable {
fmt.Printf("IPNS gateway mounted read-write\n")
}

if gatewayMaddr != nil {
go func() {
var opts = []corehttp.ServeOption{corehttp.GatewayOption(writable)}
if rootRedirect != nil {
opts = append(opts, rootRedirect)
}
fmt.Printf("Gateway server listening on %s\n", gatewayMaddr)
if writable {
fmt.Printf("Gateway server is writable\n")
}
err := corehttp.ListenAndServe(node, gatewayMaddr.String(), opts...)
if err != nil {
log.Error(err)
Expand Down
2 changes: 2 additions & 0 deletions routing/dht/notif.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ func (nn *netNotifiee) Connected(n inet.Network, v inet.Conn) {
select {
case <-dht.Closing():
return
default:
}
dht.Update(dht.Context(), v.RemotePeer())
}
Expand All @@ -27,6 +28,7 @@ func (nn *netNotifiee) Disconnected(n inet.Network, v inet.Conn) {
select {
case <-dht.Closing():
return
default:
}
dht.routingTable.Remove(v.RemotePeer())
}
Expand Down
26 changes: 16 additions & 10 deletions test/sharness/lib/test-lib.sh
Original file line number Diff line number Diff line change
Expand Up @@ -74,15 +74,16 @@ test_run_repeat_10_sec() {
}

test_wait_output_n_lines_60_sec() {
echo "$2" >expected_waitn
for i in 1 2 3 4 5 6 7 8 9 10
for i in 1 2 3 4 5 6
do
cat "$1" | wc -l | tr -d " " >actual_waitn
test_cmp "expected_waitn" "actual_waitn" && return
sleep 2
for i in 1 2 3 4 5 6 7 8 9 10
do
test $(cat "$1" | wc -l | tr -d " ") -ge $2 && return
sleep 1
done
done
cat "$1" | wc -l | tr -d " " >actual_waitn
test_cmp "expected_waitn" "actual_waitn"
actual=$(cat "$1" | wc -l | tr -d " ")
fsh "expected $2 lines of output. got $actual"
}

test_wait_open_tcp_port_10_sec() {
Expand Down Expand Up @@ -130,6 +131,13 @@ test_config_ipfs_gateway_writable() {

test_launch_ipfs_daemon() {

ADDR_API="/ip4/127.0.0.1/tcp/5001"
ADDR_GWAY=`ipfs config Addresses.Gateway`
NLINES="2"
if test "$ADDR_GWAY" != ""; then
NLINES="3"
fi

test_expect_success "'ipfs daemon' succeeds" '
ipfs daemon >actual_daemon 2>daemon_err &
'
Expand All @@ -138,19 +146,17 @@ test_launch_ipfs_daemon() {
# and we make sure there are no errors
test_expect_success "'ipfs daemon' is ready" '
IPFS_PID=$! &&
test_run_repeat_10_sec "cat actual_daemon | grep \"API server listening on\"" &&
test_wait_output_n_lines_60_sec actual_daemon $NLINES &&
printf "" >empty && test_cmp daemon_err empty ||
fsh cat actual_daemon || fsh cat daemon_err
'

ADDR_API="/ip4/127.0.0.1/tcp/5001"
test_expect_success "'ipfs daemon' output includes API address" '
cat actual_daemon | grep "API server listening on $ADDR_API" ||
fsh cat actual_daemon ||
fsh "cat actual_daemon | grep \"API server listening on $ADDR_API\""
'

ADDR_GWAY=`ipfs config Addresses.Gateway`
if test "$ADDR_GWAY" != ""; then
test_expect_success "'ipfs daemon' output includes Gateway address" '
cat actual_daemon | grep "Gateway server listening on $ADDR_GWAY" ||
Expand Down
26 changes: 24 additions & 2 deletions thirdparty/notifier/notifier.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,9 @@ package notifier

import (
"sync"

process "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/goprocess"
ratelimit "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/goprocess/ratelimit"
)

// Notifiee is a generic interface. Clients implement
Expand All @@ -31,6 +34,18 @@ type Notifiee interface{}
type Notifier struct {
mu sync.RWMutex // guards notifiees
nots map[Notifiee]struct{}
lim *ratelimit.RateLimiter
}

// RateLimited returns a rate limited Notifier. only limit goroutines
// will be spawned. If limit is zero, no rate limiting happens. This
// is the same as `Notifier{}`.
func RateLimited(limit int) Notifier {
n := Notifier{}
if limit > 0 {
n.lim = ratelimit.NewRateLimiter(process.Background(), limit)
}
return n
}

// Notify signs up Notifiee e for notifications. This function
Expand Down Expand Up @@ -107,8 +122,15 @@ func (n *Notifier) NotifyAll(notify func(Notifiee)) {
n.mu.Lock()
if n.nots != nil { // so that zero-value is ready to be used.
for notifiee := range n.nots {
go notify(notifiee)
// TODO find a good way to rate limit this without blocking notifier.

if n.lim == nil { // no rate limit
go notify(notifiee)
} else {
notifiee := notifiee // rebind for data races
n.lim.LimitedGo(func(worker process.Process) {
notify(notifiee)
})
}
}
}
n.mu.Unlock()
Expand Down
84 changes: 84 additions & 0 deletions thirdparty/notifier/notifier_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"fmt"
"sync"
"testing"
"time"
)

// test data structures
Expand Down Expand Up @@ -205,3 +206,86 @@ func TestThreadsafe(t *testing.T) {
t.Error("counts disagree")
}
}

type highwatermark struct {
mu sync.Mutex
mark int
limit int
errs chan error
}

func (m *highwatermark) incr() {
m.mu.Lock()
m.mark++
// fmt.Println("incr", m.mark)
if m.mark > m.limit {
m.errs <- fmt.Errorf("went over limit: %d/%d", m.mark, m.limit)
}
m.mu.Unlock()
}

func (m *highwatermark) decr() {
m.mu.Lock()
m.mark--
// fmt.Println("decr", m.mark)
if m.mark < 0 {
m.errs <- fmt.Errorf("went under zero: %d/%d", m.mark, m.limit)
}
m.mu.Unlock()
}

func TestLimited(t *testing.T) {
timeout := 10 * time.Second // huge timeout.
limit := 9

hwm := highwatermark{limit: limit, errs: make(chan error, 100)}
n := RateLimited(limit) // will stop after 3 rounds
n.Notify(1)
n.Notify(2)
n.Notify(3)

entr := make(chan struct{})
exit := make(chan struct{})
done := make(chan struct{})
go func() {
for i := 0; i < 10; i++ {
// fmt.Printf("round: %d\n", i)
n.NotifyAll(func(e Notifiee) {
hwm.incr()
entr <- struct{}{}
<-exit // wait
hwm.decr()
})
}
done <- struct{}{}
}()

for i := 0; i < 30; {
select {
case <-entr:
continue // let as many enter as possible
case <-time.After(1 * time.Millisecond):
}

// let one exit
select {
case <-entr:
continue // in case of timing issues.
case exit <- struct{}{}:
case <-time.After(timeout):
t.Error("got stuck")
}
i++
}

select {
case <-done: // two parts done
case <-time.After(timeout):
t.Error("did not finish")
}

close(hwm.errs)
for err := range hwm.errs {
t.Error(err)
}
}

0 comments on commit d0f6043

Please sign in to comment.