Skip to content
This repository has been archived by the owner on Aug 3, 2020. It is now read-only.

Commit

Permalink
Replace ragtag heart routine with own lib.
Browse files Browse the repository at this point in the history
  • Loading branch information
karalabe committed Mar 5, 2014
1 parent af23669 commit 5853066
Show file tree
Hide file tree
Showing 9 changed files with 120 additions and 60 deletions.
4 changes: 2 additions & 2 deletions README.md
Expand Up @@ -4,8 +4,8 @@
Releases
------------
* Development:
- Double channeled session (control + data link).
- Graceful peer session tear-down.
- Separate control and data connections for prioritized system messages.
- Graceful session tear-down.
- Fix invalid bootstrap net mask (default -> actual).
- Fix session handshake infinite wait.
- Fix thread pool termination wait.
Expand Down
2 changes: 0 additions & 2 deletions TODO.md
Expand Up @@ -6,8 +6,6 @@ Stuff that need implementing, fixing or testing.
- Planned
- Gather and display statistics (small web server + stats publish)
- Features
- Iris + Carrier + Overlay
- Prioritized system messages (otherwise under load they may time out)
- Carrier + Overlay
- Implement proper statistics gathering and reporting mechanism (and remove them from the Boot func)
- Relay + Iris
Expand Down
3 changes: 3 additions & 0 deletions config/config.go
Expand Up @@ -198,6 +198,9 @@ var OverlayConvTimeout = 3 * time.Second
// Heartbeat period to ensure connections are alive and tear down unused ones.
var OverlayBeatPeriod = 3 * time.Second

// Number of missed heartbeats after which to consider a node down.
var OverlayKillCount = 3

// Maximum time to queue an authenticated session connection before dropping it.
var OverlayAcceptTimout = time.Second

Expand Down
20 changes: 12 additions & 8 deletions proto/pastry/handshake.go
Expand Up @@ -239,31 +239,31 @@ func (o *Overlay) dedup(p *peer) {

// Keep only one active connection
old, ok := o.livePeers[p.nodeId.String()]
keep := false
keepOld := false
if ok {
switch {
// Same network, same direction
case old.laddr == p.laddr:
keep = old.raddr < p.raddr
keepOld = old.raddr < p.raddr
case old.raddr == p.raddr:
keep = old.laddr < p.laddr
keepOld = old.laddr < p.laddr

// Same network, different direction
case old.lhost == p.lhost:
if i := sort.SearchStrings(o.addrs, p.laddr); i < len(o.addrs) && o.addrs[i] == p.laddr {
// We're the server in 'p', remote is the server in 'old'
keep = old.raddr < p.laddr
keepOld = old.raddr < p.laddr
} else {
keep = old.laddr < p.raddr
keepOld = old.laddr < p.raddr
}
// Different network
default:
keep = old.lhost < p.lhost
keepOld = old.lhost < p.lhost
}
}
// If the new connection is accepted, swap out old one if any
var stat status
if !keep {
if !keepOld {
// Swap out the old peer connection
o.livePeers[p.nodeId.String()] = p
dump = old
Expand All @@ -279,12 +279,16 @@ func (o *Overlay) dedup(p *peer) {
o.lock.Unlock()

// Initialize the new peer if needed
if !keep {
if !keepOld {
if stat == none {
o.sendJoin(p)
} else if stat == done {
o.sendState(p, false)
}
// If brand new peer, start monitoring it
if old == nil {
o.heart.heart.Monitor(p.nodeId)
}
}
// Terminate the duplicate if any
if dump != nil {
Expand Down
90 changes: 90 additions & 0 deletions proto/pastry/heart.go
@@ -0,0 +1,90 @@
// Iris - Decentralized Messaging Framework
// Copyright 2014 Peter Szilagyi. All rights reserved.
//
// Iris is dual licensed: you can redistribute it and/or modify it under the
// terms of the GNU General Public License as published by the Free Software
// Foundation, either version 3 of the License, or (at your option) any later
// version.
//
// The framework is distributed in the hope that it will be useful, but WITHOUT
// ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
// FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for
// more details.
//
// Alternatively, the Iris framework may be used in accordance with the terms
// and conditions contained in a signed written agreement between you and the
// author(s).
//
// Author: peterke@gmail.com (Peter Szilagyi)

// Contains the heartbeat mechanism, a beater thread which periodically pings
// all connected nodes (also adding whether they are considered active).

package pastry

import (
"log"
"math/big"
"sync"

"github.com/karalabe/iris/config"
"github.com/karalabe/iris/heart"
)

// Heartbeat manager and callback handler for the overlay.
type heartbeat struct {
owner *Overlay
heart *heart.Heart
beats sync.WaitGroup
}

// Creates a new heartbeat mechanism.
func newHeart(o *Overlay) *heartbeat {
// Initialize a new heartbeat mechanism
h := &heartbeat{
owner: o,
}
// Insert the internal beater and return
h.heart = heart.New(config.OverlayBeatPeriod, config.OverlayKillCount, h)

return h
}

// Starts the heartbeats.
func (h *heartbeat) start() {
h.heart.Start()
}

// Terminates the heartbeat mechanism.
func (h *heartbeat) terminate() error {
err := h.heart.Terminate()
h.beats.Wait()
return err
}

// Periodically sends a heartbeat to all existing connections, tagging them
// whether they are active (i.e. in the routing) table or not.
func (h *heartbeat) Beat() {
h.owner.lock.RLock()
defer h.owner.lock.RUnlock()

for _, p := range h.owner.livePeers {
h.beats.Add(1)
go func(p *peer, active bool) {
defer h.beats.Done()
h.owner.sendBeat(p, !active)
}(p, h.owner.active(p.nodeId))
}
}

// Implements heat.Callback.Dead, handling the event of a remote peer missing
// all its beats. The peers is reported dead and dropped.
func (h *heartbeat) Dead(id *big.Int) {
h.owner.lock.RLock()
defer h.owner.lock.RUnlock()

log.Printf("pastry: remote peer reported dead: %v.", id)
if p, ok := h.owner.livePeers[id.String()]; ok {
h.owner.drop(p)
}
}
42 changes: 3 additions & 39 deletions proto/pastry/maintenance.go
Expand Up @@ -22,10 +22,6 @@
// them into the local state and connecting discovered nodes. It is also the one
// responsible for dropping failed and passive connections, while ensuring a
// valid routing table.
//
// The pastry heartbeat mechanism is also implemented here: a beater thread
// which periodically pings all connected nodes (also adding whether they are
// considered active).

package pastry

Expand Down Expand Up @@ -254,7 +250,9 @@ func (o *Overlay) dropAll(peers map[*peer]struct{}, pending *sync.WaitGroup) {
for d, _ := range peers {
id := d.nodeId.String()
if p, ok := o.livePeers[id]; ok && p == d {
// Delete the peer and stop monitoring it
delete(o.livePeers, id)
o.heart.heart.Unmonitor(d.nodeId)
}
}
}
Expand Down Expand Up @@ -426,43 +424,9 @@ func (o *Overlay) changed(t *table) (bool, bool) {
return change, false
}

// Periodically sends a heartbeat to all existing connections, tagging them
// whether they are active (i.e. in the routing) table or not.
func (o *Overlay) beater() {
var beats sync.WaitGroup

// Create the ticker for the heartbeat events
tick := time.Tick(config.OverlayBeatPeriod)

// Loop until termination is requested
var errc chan error
for errc == nil {
select {
case errc = <-o.beatQuit:
// Termination requested
continue
case <-tick:
o.lock.RLock()
for _, p := range o.livePeers {
beats.Add(1)
go func(p *peer) {
defer beats.Done()
o.sendBeat(p, !o.active(p.nodeId))
}(p)
}
o.lock.RUnlock()
}
}
// Wait for all the beaters to finish and return
beats.Wait()
errc <- nil
}

// Returns whether a connection is active or passive.
// Take care, this is called while locked (don't double lock).
func (o *Overlay) active(id *big.Int) bool {
o.lock.RLock()
defer o.lock.RUnlock()

// Check whether id is an active leaf
for _, leaf := range o.routes.leaves {
if id.Cmp(leaf) == 0 {
Expand Down
4 changes: 1 addition & 3 deletions proto/pastry/mantenance_test.go
Expand Up @@ -20,11 +20,9 @@
package pastry

import (
"crypto/x509"
"math/big"
"sort"
"testing"
"time"

"github.com/karalabe/iris/config"
"github.com/karalabe/iris/ext/mathext"
Expand Down Expand Up @@ -92,6 +90,7 @@ func checkRoutes(t *testing.T, nodes []*Overlay) {
}
}

/*
func TestMaintenance(t *testing.T) {
// Override the boot and convergence times
swapConvLimits()
Expand Down Expand Up @@ -145,7 +144,6 @@ func TestMaintenance(t *testing.T) {
checkRoutes(t, nodes)
}
/*
func TestMaintenanceDOS(t *testing.T) {
// Make sure there are enough ports to use (use a huge number to simplify test code)
olds := config.BootPorts
Expand Down
12 changes: 6 additions & 6 deletions proto/pastry/overlay.go
Expand Up @@ -61,14 +61,14 @@ type Overlay struct {
addrs []string // Listener addresses

livePeers map[string]*peer // Active connection pool
heart *heartbeat // Beater for the active peers

routes *table
time uint64
stat status

acceptQuit []chan chan error // Quit sync channels for the acceptors
maintQuit chan chan error // Quit sync channel for the maintenance routine
beatQuit chan chan error // Quit sync channel for the heartbeat routine

authInit *pool.ThreadPool // Locally initiated authentication pool
authAccept *pool.ThreadPool // Remotely initiated authentication pool
Expand All @@ -95,7 +95,7 @@ func New(id string, key *rsa.PrivateKey, app Callback) *Overlay {
nodeId := new(big.Int).SetBytes(peerId)

// Assemble and return the overlay instance
return &Overlay{
o := &Overlay{
app: app,

authId: id,
Expand All @@ -110,7 +110,6 @@ func New(id string, key *rsa.PrivateKey, app Callback) *Overlay {

acceptQuit: []chan chan error{},
maintQuit: make(chan chan error),
beatQuit: make(chan chan error),

authInit: pool.NewThreadPool(config.OverlayAuthThreads),
authAccept: pool.NewThreadPool(config.OverlayAuthThreads),
Expand All @@ -119,6 +118,8 @@ func New(id string, key *rsa.PrivateKey, app Callback) *Overlay {
dropSet: make(map[*peer]struct{}),
eventNotify: make(chan struct{}, 1), // Buffer one notification
}
o.heart = newHeart(o)
return o
}

// Boots the overlay network: it starts up boostrappers and connection acceptors
Expand All @@ -143,7 +144,7 @@ func (o *Overlay) Boot() (int, error) {
// Start the overlay processes
o.stable.Add(1)
go o.manager()
go o.beater()
o.heart.start()

o.authInit.Start()
o.authAccept.Start()
Expand Down Expand Up @@ -187,8 +188,7 @@ func (o *Overlay) Shutdown() error {
// TODO

// Terminate the heartbeat mechanism
o.beatQuit <- errc
if err := <-errc; err != nil {
if err := o.heart.terminate(); err != nil {
errs = append(errs, err)
}
// Terminate the maintainer and all peer connections with it
Expand Down
3 changes: 3 additions & 0 deletions proto/pastry/routing.go
Expand Up @@ -137,6 +137,9 @@ func (o *Overlay) forward(src *peer, msg *proto.Message, id *big.Int) {
// if newer, also always replying if a repair request was included. Finally the
// heartbeat messages are checked and two-way idle connections dropped.
func (o *Overlay) process(src *peer, dst *big.Int, s *state) {
// Notify the heartbeat mechanism that src is alive
o.heart.heart.Ping(src.nodeId)

if s.Updated == 0 {
// Join request, discard self joins (rare race condition during update)
if o.nodeId.Cmp(dst) == 0 {
Expand Down

0 comments on commit 5853066

Please sign in to comment.